Hazelcast Hot Cache with Striim

I previously introduced some articles about Hazelcast – an open source in memory data grid solution. In the first of them JPA caching with Hazelcast, Hibernate and Spring Boot I described how to set up 2nd level JPA cache with Hazelcast and MySQL. In the second In memory data grid with Hazelcast I showed more advanced sample how to use Hazelcast distributed queries to enable faster data access for Spring Boot application. Using Hazelcast as a cache level between your application and relational database is generally a very good solution under one condition – all changes are going across your application. If a data source is modified by other application which does not use your caching solution it causes problem with outdated data for your application. Did you have encountered this problem in your organization? In my organization we still use relational databases in almost all our systems. Sometimes it causes performance problems, even optimized queries are too slow for real time applications. Relational database is still required, so solutions like Hazelcast can help us.

Let’s return to the topic of outdated cache. That’s why we need Striim, a real-time data integration and streaming analytics software platform. The architecture of presented solution is visible on the figure below. We have two applications. The first one employee-service uses Hazelcast as a cache, the second one employee-app performs changes directly to the database. Without such a solution like Striim data changed by employee-app is not visible for employee-service. Striim enables real-time data integration without modifying or slowing down data source. It uses CDC (Change Data Capture) mechanisms for detecting changes performed on data source, by analizing binary logs. It has a support for the most popular transactional databases like Oracle, Microsoft SQL Server and MySQL. Striim has many interesting features, but also one serious drawback – it is not an open source. An alternative for the presented solution, especially when using Oracle database, can be Oracle In-Memory Data Grid with Golden Gate Hot Cache.

striim-figure-1

I prepared sample application for that article purpose, which is as usual available on GitHub under striim branch. The application employee-service is based on Spring Boot and has embedded Hazelcast client which connects to the cluster and Hazelcast Management Center. If data is not available in the cache the application connects to MySQL database.

1. Starting MySQL and enabling binary log

Let’s start MySQL database using docker.

docker run -d --name mysql -e MYSQL_DATABASE=hz -e MYSQL_USER=hz -e MYSQL_PASSWORD=hz123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

Binary log is disabled by default. We have to enable it by including the following lines into mysqld.cnf. The configuration file is available on docker container under /etc/mysql/mysql.conf.d/mysqld.cnf.

log_bin			 = /var/log/mysql/binary.log
expire-logs-days = 14
max-binlog-size  = 500M
server-id        = 1

If you are running MySQL on Docker you should restart your container using docker restart mysql.

2. Starting Hazelcast Dashboard and Striim

Same as for MySQL, I also used Docker.

docker run -d --name striim -p 39080:9080 striim/evalversion
docker run -d --name hazelcast-mgmt -p 38080:8080 hazelcast/management-center:3.7.7

I selected 3.7.7 version of Hazelcast Management Center, because this version is included by default into the Spring Boot release I used in the sample application. Now, you should be able to login into Hazelcast Dashboard available under http://192.168.99.100:38080/mancenter/ and to the Striim Dashboard which is available under http://192.168.99.100:39080/ (admin/admin).

3. Starting sample application

Build sample application with mvn clean install and start using java -jar employee-service-1.0-SNAPSHOT.jar. You can test it by calling one of endpoint:
/employees/person/{id}
/employees/company/{company}
/employees/{id}

Before testing create table employee in MySQL and insert some test data (you can run my test class pl.piomin.services.datagrid.employee.data.AddEmployeeRepositoryTest).

4. Configure entity mapping in Striim

Before creating our first application in Striim we have to provide mapping configuration. The first step is to copy your entity ORM mapping file into docker container filesystem. You can perform it using Striim dashboard or with docker cp command. Here’s my orm.xml file – it is used by Striim HazelcastWriter while putting data into cache.

<entity-mappings xmlns="http://www.eclipse.org/eclipselink/xsds/persistence/orm" 	version="2.4">
	<entity name="employee" class="pl.piomin.services.datagrid.employee.model.Employee">
<table name="hz.employee" />
		<attributes>
			<id name="id" attribute-type="Integer">
				<column nullable="false" name="id" />
				<generated-value strategy="AUTO" />
			</id>
			<basic name="personId" attribute-type="Integer">
				<column nullable="false" name="person_id" />
			</basic>
			<basic name="company" attribute-type="String">
				<column name="company" />
			</basic>
		</attributes>
	</entity>
</entity-mappings>

We also have to provide jar with entity class. It should be placed under /opt/Striim/lib directory on Striim docker container. What is important, the fields are public – do not make them private with setters, because it won’t work for HazelcastWriter. After all changes restart your container and proceed to the next steps. For the sample application just build employee-model module and upload to Striim.

public class Employee implements Serializable {

	private static final long serialVersionUID = 3214253910554454648L;
	public Integer id;
	public Integer personId;
	public String company;

	public Employee() {

	}

	@Override
	public String toString() {
		return "Employee [id=" + id + ", personId=" + personId + ", company=" + company + "]";
	}

}

5. Configuring MySQL CDC connection on Striim

If all the previous steps are completed we can finally begin to create our application in Striim. When creating a new app select Start with Template, and then MySQL CDC to Hazelcast. Put your MySQL connection data, security credentials and proceed. In addition to connection validation Striim also checks if binary log is enabled.

Then select tables for synchronization with cache.

striim-3

6. Configuring Hazelcast on Striim

After starting employee-service application you should see the following fragment in the file logs.

Members [1] {
	Member [192.168.8.205]:5701 - d568a38a-7531-453a-b7f8-db2be4715132 this
}

This address should be provided as a Hazelcast Cluster URL. We should also put ORM mapping file location and cluster credentials (by default these are dev/dev-pass).

striim-5

In the next screen you will see ORM mapping visualization and input selection. Your input is MySQL server you defined in the fifth step.

striim-7

7. Deploy application on Striim

After finishing previous steps you see the flow diagram. I suggest you create log file where all input events will be stored as a JSON. My diagram is visible in the figure below. If your configuration is finished deploy and start application.  At this stage I had some problems. For example, if I deploy application after Striim restart I always have to change something and save, otherwise exception during deploy occurs. However, after a long struggle with Striim, I finally succeeded in running the application! So we can start testing.

striim-8

8. Checking out

I created JUnit test to illustrate cache refresh performed by Striim. Inside this test I invoke employees/company/{company} REST API method and collect entities. Then I modified entities with EmployeeRepository which commits changes directly to the database bypassing Hazelcast cache. I invoke REST API again and compare results with entities collected with previous invoke. Field personId should not be equal with value for previously invoked entity. You also can test it manually by calling REST API endpoint and change something in the database using the client like MySQL Workbench.

@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@RunWith(SpringRunner.class)
public class CacheRefreshEmployeeTest {

	protected Logger logger = Logger.getLogger(CacheRefreshEmployeeTest.class.getName());

	@Autowired
	EmployeeRepository repository;

	TestRestTemplate template = new TestRestTemplate();

	@Test
	public void testModifyAndRefresh() {
		Employee[] e = template.getForObject("http://localhost:3333/employees/company/{company}", Employee[].class, "Test001");
		for (int i = 0; i < e.length; i++) {
			Employee eMod = repository.findOne(e[i].getId());
			eMod.setPersonId(eMod.getPersonId()+1);
			repository.save(eMod);
		}

		Employee[] e2 = template.getForObject("http://localhost:3333/employees/company/{company}", Employee[].class, "Test001");
		for (int i = 0; i < e2.length; i++) {
			Assert.assertNotEquals(e[i].getPersonId(), e2[i].getPersonId());
		}

	}
}

Here’s the picture with Striim dashboard monitor. We can check out how many events were processed, what is actual memory and CPU usage etc.

striim-1

Final Thoughts

I have no definite opinion about Striim. On the one hand it is an interesting solution with many integration possibilities and a nice dashboard for configuration and monitoring. But on the other hand it is not free from errors and bugs. My application crashed when an exception was thrown for the lack of a matching serializer for the entity in Hazelcast’s cache. This stopped processing any further events. It may be a deliberate action, but in my opinion subsequent events should be processed as they may affect other tables. The application management with web dashboard is not very comfortable at all. Every time I restarted the container, I had to change something in the configuration, because the application threw not intuitive exception on startup. From this type of application I would expect first of all reliability if the application would require updating of the data on the Hazelcast. However, despite some drawbacks, it is worth a closer look at Striim.

Testing REST API with Hoverfly

Hoverfly is an open source API simulation tool for automated tests. It is written in Go, but also has native support for Java and can be run inside JUnit test. Hoverfly can be used for testing REST API, but can also be useful for testing calls between microservices. We have two running modes available: simulating and capturing. In simulating mode we just simulate interaction with other service by creating response sources, in capturing mode requests will be made to the real service as normal, only they will be intercepted and recorded by Hoverfly.

In one of my previous article Testing Java Microservices I described the competitive tool for testing – Spring Cloud Contract. In the article about Hoverfly I will use the same sample application based on Spring Boot, which I created for the needs of that previous article. Source code is available on GitHub in hoverfly branch. We have some microservices which interact between each other and basing on this sample I’m going to show how to use Hoverfly for component testing.

To enable testing with Hoverfly we have to include the following dependency in pom.xml file.

<dependency>
	<groupId>io.specto</groupId>
	<artifactId>hoverfly-java</artifactId>
	<version>0.8.0</version>
	<scope>test</scope>
</dependency>

Hoverfly can be easily integrated with JUnit. We can orchestrate it using JUnit @ClassRule. Like I mentioned before we can switch between two different modes. In the code fragment below I decided two use mixed strategy inCaptureOrSimulationMode, where Hoverfly Rule is started in capture mode if the simulation file does not exist and in simulate mode if the file does exist. The default location of output JSON file is src/test/resources/hoverfly. By calling printSimulationData on HoverflyRule we are printing all simulation data on the console.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { Application.class }, webEnvironment = WebEnvironment.DEFINED_PORT)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class AccountApiFullTest {

	protected Logger logger = Logger.getLogger(AccountApiFullTest.class.getName());

	@Autowired
	TestRestTemplate template;

	@ClassRule
	public static HoverflyRule hoverflyRule = HoverflyRule
			.inCaptureOrSimulationMode("account.json", HoverflyConfig.configs().proxyLocalHost()).printSimulationData();

	@Test
	public void addAccountTest() {
		Account a = new Account("1234567890", 1000, "1");
		ResponseEntity<Account> r = template.postForEntity("/accounts", a, Account.class);
		Assert.assertNotNull(r.getBody().getId());
		logger.info("New account: " + r.getBody().getId());
	}

	@Test
	public void findAccountByNumberTest() {
		Account a = template.getForObject("/accounts/number/{number}", Account.class, "1234567890");
		Assert.assertNotNull(a);
		logger.info("Found account: " + a.getId());
	}

	@Test
	public void findAccountByCustomerTest() {
		Account[] a = template.getForObject("/accounts/customer/{customer}", Account[].class, "1");
		Assert.assertTrue(a.length > 0);
		logger.info("Found accounts: " + a);
	}

}

Now, let’s run our JUnit test class twice. During first attempt all requests are forwarded to the Spring @RestController which connects to embedded Mongo database. At the same time all requests and responses are recorded by Hoverfly and saved in the account.json file. This file fragment is visible below. During the second attempt all data is loaded from source file, there is no interaction with AccountController.

  "request" : {
	"path" : {
	  "exactMatch" : "/accounts/number/1234567890"
	},
	"method" : {
	  "exactMatch" : "GET"
	},
	"destination" : {
	  "exactMatch" : "localhost:2222"
	},
	"scheme" : {
	  "exactMatch" : "http"
	},
	"query" : {
	  "exactMatch" : ""
	},
	"body" : {
	  "exactMatch" : ""
	}
  },
  "response" : {
	"status" : 200,
	"body" : "{\"id\":\"5980642bc96045216447023b\",\"number\":\"1234567890\",\"balance\":1000,\"customerId\":\"1\"}",
	"encodedBody" : false,
	"templated" : false,
	"headers" : {
	  "Content-Type" : [ "application/json;charset=UTF-8" ],
	  "Date" : [ "Tue, 01 Aug 2017 11:21:15 GMT" ],
	  "Hoverfly" : [ "Was-Here" ]
	}
  }

Now, let’s take a look on customer-service tests. Inside GET /customer/{id} we are invoking method GET /accounts/customer/{customerId} from account-service. This method is simulating by Hoverfly with success response as you can see below.

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class CustomerControllerTest {

	@Autowired
	TestRestTemplate template;

	@ClassRule
	public static HoverflyRule hoverflyRule = HoverflyRule
			.inSimulationMode(dsl(service("account-service:2222").get(startsWith("/accounts/customer/"))
					.willReturn(success("[{\"id\":\"1\",\"number\":\"1234567890\"}]", "application/json"))))
			.printSimulationData();

	@Test
	public void addCustomerTest() {
		Customer c = new Customer("1234567890", "Jan Testowy", CustomerType.INDIVIDUAL);
		c = template.postForObject("/customers", c, Customer.class);
	}

	@Test
	public void findCustomerWithAccounts() {
		Customer c = template.getForObject("/customers/pesel/{pesel}", Customer.class, "1234567890");
		Customer cc = template.getForObject("/customers/{id}", Customer.class, c.getId());
		Assert.assertTrue(cc.getAccounts().size() > 0);
	}
}

To run this test successfully we should override some properties from application.yml in src/test/resources/application.yml. Eureka discovery from Ribbon client should be disabled and the same for Hystrix in @FeignClient. Ribbon listOfServers property should has same value as service address inside HoverflyRule.

eureka:
  client:
    enabled: false

ribbon:
  eureka:
    enable: false
  listOfServers: account-service:2222

feign:
  hystrix:
    enabled: false

Here’s @FeignClient implementation for invoking API method from account-service.

@FeignClient("account-service")
public interface AccountClient {

	@RequestMapping(method = RequestMethod.GET, value = "/accounts/customer/{customerId}", consumes = {MediaType.APPLICATION_JSON_VALUE})
	List<Account> getAccounts(@PathVariable("customerId") String customerId);

}

When using simulation mode there is no need to start @SpringBootTest. Hoverfly has also some interesting capabilities like response templating, for example basing on path parameter, like in the fragment below.

public class AccountApiTest {

	TestRestTemplate template = new TestRestTemplate();

	@ClassRule
	public static HoverflyRule hoverflyRule = HoverflyRule.inSimulationMode(dsl(service("http://account-service")
			.post("/accounts").anyBody().willReturn(success("{\"id\":\"1\"}", "application/json"))
			.get(startsWith("/accounts/")).willReturn(success("{\"id\":\"{{Request.Path.[1]}}\",\"number\":\"123456789\"}", "application/json"))));

	@Test
	public void addAccountTest() {
		Account a = new Account("1234567890", 1000, "1");
		ResponseEntity<Account> r = template.postForEntity("http://account-service/accounts", a, Account.class);
		System.out.println(r.getBody().getId());
	}

	@Test
	public void findAccountByIdTest() {
		Account a = template.getForObject("http://account-service/accounts/{id}", Account.class, new Random().nextInt(10));
		Assert.assertNotNull(a.getId());
	}

}

We can simulate fixed method delay using DSL. Delay be set for all requests or for a particular HTTP method. Our delayed @ClassRule for CustomerControllerTest will now look like in the fragment below.

	@ClassRule
	public static HoverflyRule hoverflyRule = HoverflyRule
			.inSimulationMode(dsl(service("account-service:2222").andDelay(3000, TimeUnit.MILLISECONDS).forMethod("GET").get(startsWith("/accounts/customer/"))
			.willReturn(success("[{\"id\":\"1\",\"number\":\"1234567890\"}]", "application/json"))));

And now you can add ReadTimeout property into your Ribbon client configuration and run JUnit test again. You should receive the follwoing exception: java.net.SocketTimeoutException: Read timed out

ribbon:
  eureka:
    enable: false
  ReadTimeout: 1000
  listOfServers: account-service:2222

Conclusion

In the post I showed you the most typical usage of Hoverfly library in microservices tests. However, this library is not dedicated to microservice testing as opposed to the Spring Cloud Contract previously described by me. For example, there is no mechanisms for sharing test stubs between different microservices like in Spring Cloud Contract (@AutoConfigureStubRunner). But there is an interesting feature for delaying responses thanks to which we can simulate some timeouts for Ribbon client or Hystrix fallback.

JavaEE MicroProfile with KumuluzEE

Preface

Enterprise Java seems to be a step back from the others when it comes to microservices architecture. Some weeks ago I took a part in Code Europe – the programming conference in Warsaw. One of the speakers was Ivar Grimstad who was talking about MicroProfile – an open initiative for optimizing Enterprise Java for a microservices architecture. This idea is very interesting, but at the moment it is rather at the beginning of the road.
However, while I was reading about the microprofile initiative I came across information about JavaEE framework developed by Slovenian company – KumuluzEE. The solution seemed to be interesting enough that I decided to take a closer look on it. Well, we can read on the web site that KumuluzEE is the Java Duke’s Choice Award Winner, so there is still a hope for JavaEE and microservices 🙂

What’s KumuluzEE

Can KumuluzEE be a competitor for the Spring Cloud framework? He is certainly not as popular and advanced in the solutions for microservices like Spring Cloud, but has basic modules for service registration, discovery, distributed configuration propagation, circuit breaking, metrics and support for Docker and Kubernetes. It uses CDI on JBoss Weld container for dependency injection and Jersey as a REST API provider. Modules for configuration and discovery basing on Consul or etcd and they are rather on early stage of development (1.0.0-SNAPSHOT), but let’s try it out.

Preparation

I’ll show you sample application which consists of two independent microservices account-service and customer-service. Both of them exposes REST API and one of customer-service methods invokes method from account-service. Every microservice registers itself in Consul and is able to get configuration properties from Consul. Sample application source code is available on GitHub. Before we begin let’s start Consul instance using Docker container.

docker run -d --name consul -p 8500:8500 -p 8600:8600 consul

We should also add some KumuluzEE dependencies to Maven pom.xml.

<dependency>
	<groupId>com.kumuluz.ee</groupId>
	<artifactId>kumuluzee-core</artifactId>
</dependency>
<dependency>
	<groupId>com.kumuluz.ee</groupId>
	<artifactId>kumuluzee-servlet-jetty</artifactId>
</dependency>
<dependency>
	<groupId>com.kumuluz.ee</groupId>
	<artifactId>kumuluzee-jax-rs-jersey</artifactId>
</dependency>
<dependency>
	<groupId>com.kumuluz.ee</groupId>
	<artifactId>kumuluzee-cdi-weld</artifactId>
</dependency>

Service Registration

To enable service registration we should add one additional dependency to our pom.xml. I chose Consul as a registration and discovery server, but you can also use etcd (kumuluzee-discovery-consul).

<dependency>
	<groupId>com.kumuluz.ee.discovery</groupId>
	<artifactId>kumuluzee-discovery-consul</artifactId>
	<version>1.0.0-SNAPSHOT</version>
</dependency>

Inside application configuration file we should set discovery properties and server URL. For me it is 192.168.99.100.

kumuluzee:
  service-name: account-service
  env: dev
  version: 1.0.0
  discovery:
    consul:
      agent: http://192.168.99.100:8500
      hosts: http://192.168.99.100:8500
    ttl: 20
    ping-interval: 15

Here’s account microservice main class. As you probably guess annotation @RegisterService enables registration on server.

@RegisterService("account-service")
@ApplicationPath("v1")
public class AccountApplication extends Application {

}

We are starting application by running java -cp target/classes;target/dependency/* com.kumuluz.ee.EeApplication. Remember to override default port by setting environment property PORT. I started two instances of account and one of customer microservice.

kumuluzee-1

Service Discovery

Microservice customer exposes API, but also invokes API method from account-service, so it has to discover and connect this service. Maven dependencies and configuration settings are the same as for account-service. The only difference is the resource class. Here’s CustomerResource fragment where we are invoking enpoint GET /customer/{id}.

@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("customers")
@RequestScoped
public class CustomerResource {

	private List<Customer> customers;

	@Inject
	@DiscoverService(value = "account-service", version = "1.0.x", environment = "dev")
	private WebTarget target;

	...

	@GET
	@Path("{id}")
	@Log(value = LogParams.METRICS, methodCall = true)
	public Customer findById(@PathParam("id") Integer id) {
		Customer customer = customers.stream().filter(it -> it.getId().intValue() == id.intValue()).findFirst().get();
		WebTarget t = target.path("v1/accounts/customer/" + customer.getId());
		List<Account> accounts = t.request().buildGet().invoke(List.class);
		customer.setAccounts(accounts);
		return customer;
	}

}

There is one pretty cool thing in discovery with KumuluzEE. As you see in the @DiscoverService we can specify version and environment for account-service instance. Version and environment for microservice is read automatically from config.yml during registration in discovery server. So we can maintain many versions of single microservice and freely invoke them from other microservices. Requests are automatically load balanced between all microservices matches conditions from annotation @ServiceDiscovery.

We can also monitor metrics such as response time by declaring @Log(value = LogParams.METRICS, methodCall = true) on API method. Here’s log fragment for account-service.

2017-07-28 13:57:01,114 TRACE ENTRY[ METHOD ] Entering method. {class=pl.piomin.services.kumuluz.account.resource.AccountResource, method=findByCustomer, parameters=[1]}
2017-07-28 13:57:01,118 TRACE EXIT[ METHOD ] Exiting method. {class=pl.piomin.services.kumuluz.account.resource.AccountResource, method=findByCustomer, parameters=[1], response-time=3, result=[pl.piomin.services.kumuluz.account.model.Account@1eb26fe3, pl.piomin.services.kumuluz.account.model.Account@2dda41c5]}

Distributed configuration

To enable KumuluzEE Config include Consul implementation by adding the following dependency to pom.xml.

<dependency>
	<groupId>com.kumuluz.ee.config</groupId>
	<artifactId>kumuluzee-config-consul</artifactId>
	<version>1.0.0-SNAPSHOT</version>
</dependency>

I do not use Consul agent running on localhost, so I need to override some properties in config.yml. I also defined one configuration property blacklist

kumuluzee:
  config:
    start-retry-delay-ms: 500
    max-retry-delay-ms: 900000
    consul:
      agent: http://192.168.99.100:8500

rest-config:
  blacklist:

Here’s the class that loads configuration properties and enables dynamically updated on any change in configuration source by declaring @ConfigValue(watch = true) on property.

@ApplicationScoped
@ConfigBundle("rest-config")
public class AccountConfiguration {

	@ConfigValue(watch = true)
	private String blacklist;

	public String getBlacklist() {
		return blacklist;
	}

	public void setBlacklist(String blacklist) {
		this.blacklist = blacklist;
	}

}

We use configution property blacklist in the resource class for filtering all accounts by blacklisted ids.

@GET
@Log(value = LogParams.METRICS, methodCall = true)
public List<Account> findAll() {
	final String blacklist = ConfigurationUtil.getInstance().get("rest-config.blacklist").orElse("nope");
	final String[] ids = blacklist.split(",");
	final List<Integer> blacklistIds = Arrays.asList(ids).stream().map(it -> new Integer(it)).collect(Collectors.toList());
	return accounts.stream().filter(it -> !blacklistIds.contains(it.getId())).collect(Collectors.toList());
}

Configuration property should be defined in Consul UI Dashboard under KEY/VALUE tab. KumuluzEE enforces a certain format of key name. In this case it has to be environments/dev/services/account-service/1.0.0/config/rest-config/blacklist. You can update property value and test changes by invoking http://localhost:2222/v1/accounts.

kumuluzee-2

Final Words

Creating microservices with KumuluzEE is pretty easy. I showed you the main capabilities of this framework. KumulezEE has also modules for bircuit breaker with Hystrix, streaming with Apache Kafka and security with OAuth2/OpenID. I will keep a close eye on this library and I hope it will continue to be developed.

Code Quality with SonarQube

Source code quality analysis is an essential part of the Continuous Integration process. Together with automated tests it is the key element to deliver reliable software without many bugs, security vulnerabilities or performance leaks. Probably the best static code analyzer you can find on the market is SonarQube. It has a support for more than 20 programming languages. It can be easily integrated with the most popular Continuous Integration engines like Jenkins or TeamCity. Finally, it has many features and plugins which can be easily managed from extensive web dashboard.

However, before we proceed to discuss about the most powerful capabilities of this solution it is well worth to ask Why we do it? Would it be productive for us to force developers to focus on code quality? Probably most of us are programmers and we exactly know that everyone else expect from us to deliver code which meet business demands rather than looks nice 🙂 After all do we really want to break the build by not fulfilling not important rule like maximum line length – rather a little pleasure. On the other hand taking over source code from someone else who was not paying attention to any of good programming practice is also not welcome if you know what I mean. But be calm, SonarQube is the right solution for you. In this article I’ll to show you that carrying about high code quality can be a good fun and above all you can learn more how to develop better code, while other team members spend time on fixing their bugs 🙂

Enough talk go to action. I suggest you to run your test instance of SonarQube using Docker. Here’s SonarQube run command. Then you can login to web dashboard available under http://192.168.99.100:9000 with admin/admin credentials.

docker run -d --name sonarqube -p 9000:9000 -p 9092:9092 sonarqube

You are signed in to the web dashboard but there are no projects created yet. To perform source code scanning you should just run one command mvn sonar:sonar if you are using maven in the building process. Don’t forget to add SonarQube server address in settings.xml file as you on the fragment below.

<profile>
	<id>sonar</id>
	<activation>
		<activeByDefault>true</activeByDefault>
	</activation>
	<properties>
		<sonar.host.url>http://192.168.99.100:9000</sonar.host.url>
	</properties>
</profile>

When SonarQube analyse finishes you will see new project with the same name as maven artifact name with your code metrics and statistics. I created sample Spring Boot application where I tried to perform some most popular mistakes which impact on code quality. Source code is available on GitHub. The right module for analyse is named person-service. However, the code with many bugs and vulnerabilities is pushed to v0.1 branch. Master branch has a latest version with the corrections performed basing on SonarQube analyse what I’m going to describe on the next section of that article. Ok, let’s start analyse with mvn command. We can be surprised a little – the code analyse result for 0.1 version is rather not satisfying. Although I spend much time on making important mistakes SonarQube reported only some bugs and code smells were detected and quality gate status is ‘Passed’.

sonar-1

Let’s take a closer look on quality gates in SonarQube. Like I mentioned before we would not like to break the build by not fulfiling one or group of not very important rules. We can achieve it by creating quality gate. This is a set of requirements that tells us whether or not going to deployment with new version od project. There is default quality gate for Java but we can change its thresholds or create the new one. The default quality gate has thresholds set only for new code, so I decided to create the one for my sample application minimum test coverage set on 50 percent, unit test success detection and ratings basic on full code. Now, scanning result looks a little different 🙂

sonar-3sonar-2

To enable scanning test coverage in SonarQube we should add jacoco plugin to maven pom.xml. During maven build mvn clean test -Dmaven.test.failure.ignore=true sonar:sonar the report would be automatically generated and uploaded to SonarQube.

<plugin>
	<groupId>org.jacoco</groupId>
	<artifactId>jacoco-maven-plugin</artifactId>
	<version>0.7.9</version>
	<executions>
		<execution>
			<id>default-prepare-agent</id>
			<goals>
				<goal>prepare-agent</goal>
			</goals>
		</execution>
		<execution>
			<id>default-report</id>
			<phase>prepare-package</phase>
			<goals>
				<goal>report</goal>
			</goals>
		</execution>
	</executions>
</plugin>

The last change that has to be done before application rescan is to installing some plugins and enabling rules disabled by default. The list of all active and inactive rules can be displayed in Quality Profiles section. In the default profile for Java the are more than 400 rules available and 271 active on start. I suggest you install FindBugs and Checkstyle plugins. Those plugins has many additional rules for Java which can be activated for our profile. Now there are about 1.1k inactive rules in many categories. Which of them should be activated depends on you, you can activate them in the default profile, create your new profile or use one of predefined profile, which were automatically created by plugins we installed before.  In my opinion the best way to select right rules is to create simple project and check which rules are suitable for you. Then you can check out the detailed description and disable the rule if needed. After activating some rules provided by Checkstyle plugin I have a report with 5 bugs and 77 code smells. The most important errors are visible in the pictures below.

sonar-5sonar-6sonar-7

All issues reported by SonarQube can be easily reviewed using UI dashboard for each project in the Issue tab. We can also install plugin SonarLint which integrates with most popular IDEs like Eclipse or IntelliJ and all those issue will be displayed there. Now, we can proceed to fix errors. All changes which I performed to resolve issues can be display on GitHub repository from branches v0.1 to v0.6. I resolved all problems except some checked exception warnings which I set to Resolved (Won’t fix). Those issues won’t be reported after next scans.

sonar-7

Finally my project looks as you could see in the picture below. All ratings have a score ‘A’, test coverage is greater that 60% and quality gate is ‘Passed’. Final person-service version is commited into master branch.

sonar-8

Like you see there are many rules which can be applied to your project during SonarQube scanning, but sometimes it would be not enough for your organization needs. In that case you may search for some additional plugins or create your own plugin with the rules that meet your specific requirements. In my sample available on GitHub there is module sonar-rules where I defined the rule checking whether all public classes have javadoc comments including @author field. To create SonarQube plugin add the following fragment to your pom.xml and change packaging type to sonar-plugin.

<plugin>
	<groupId>org.sonarsource.sonar-packaging-maven-plugin</groupId>
	<artifactId>sonar-packaging-maven-plugin</artifactId>
	<version>1.17</version>
	<extensions>true</extensions>
	<configuration>
		<pluginKey>piotjavacustom</pluginKey>
		<pluginName>PiotrCustomRules</pluginName>
		<pluginDescription>For test purposes</pluginDescription>
		<pluginClass>pl.piomin.sonar.plugin.CustomRulesPlugin</pluginClass>
		<sonarLintSupported>true</sonarLintSupported>
		<sonarQubeMinVersion>6.0</sonarQubeMinVersion>
	</configuration>
</plugin>

Here’s the class with custom rule definition. First we have to get a scanned class node (Kind.CLASS), a then process first comment (Kind.TRIVIA) in the class file. The rule parameters like name or priority are set inside @Role annotation.

@Rule(key = "CustomAuthorCommentCheck",
		name = "Javadoc comment should have @author name",
		description = "Javadoc comment should have @author name",
		priority = Priority.MAJOR,
		tags = {"style"})
public class CustomAuthorCommentCheck extends IssuableSubscriptionVisitor {

	private static final String MSG_NO_COMMENT = "There is no comment under class";
	private static final String MSG_NO_AUTHOR = "There is no author inside comment";

	private Tree actualTree = null;

	@Override
	public List<Kind> nodesToVisit() {
		return ImmutableList.of(Kind.TRIVIA, Kind.CLASS);
	}

	@Override
	public void visitTrivia(SyntaxTrivia syntaxTrivia) {
		String comment = syntaxTrivia.comment();
		if (syntaxTrivia.column() != 0)
			return;
		if (comment == null) {
			reportIssue(actualTree, MSG_NO_COMMENT);
			return;
		}
		if (!comment.contains("@author")) {
			reportIssue(actualTree, MSG_NO_AUTHOR);
			return;
		}
	}

	@Override
	public void visitNode(Tree tree) {
		if (tree.is(Kind.CLASS)) {
			actualTree = tree;
		}
	}

}

Before building and deploying plugin into SonarQube server it can be easily tested using junit. Inside the src/test/file directory we should place test data – java files which are scanned during junit test. For failure test we should also create file CustomAuthorCommentCheck_java.json in the /org/sonar/l10n/java/rules/squid/ directory with rule definition.

@Test
public void testOk() {
	JavaCheckVerifier.verifyNoIssue("src/test/files/CustomAuthorCommentCheck.java", new CustomAuthorCommentCheck());
}

@Test
public void testFail() {
	JavaCheckVerifier.verify("src/test/files/CustomAuthorCommentCheckFail.java", new CustomAuthorCommentCheck());
}

Finally, build maven project and copy generated JAR artifact from target directory to SonarQube docker container into $SONAR_HOME/extensions/plugins directory. Then restart your docker container.

docker cp target/sonar-plugins-1.0-SNAPSHOT sonarqube:/opt/sonarqube/extensions/plugins

After SonarQube restart your plugin’s rules are visible under Rules section.

sonar-4

The last thing to do is to run SonarQube scanning in the Continuous Integration process. SonarQube can be easily integrated with the most popular CI server – Jenkins. Here’s the fragment of Jenkins pipeline where we perform source code scanning and then waiting for quality gate result. If you interested in more details about Jenkins pipelines, Continuous Integration and Delivery read my previous post How to setup Continuous Delivery environment.

stage('SonarQube analysis') {
	withSonarQubeEnv('My SonarQube Server') {
		sh 'mvn clean package sonar:sonar'
	}
}
stage("Quality Gate") {
	timeout(time: 1, unit: 'HOURS') {
		def qg = waitForQualityGate()
		if (qg.status != 'OK') {
			error "Pipeline aborted due to quality gate failure: ${qg.status}"
		}
	}
}

Microservices Configuration With Spring Cloud Config

Preface

Although every microservice instance is an independent unit, we usually manage them from one central location. We are talking about watching the application logs (Kibana), metrics ans statistics (Zipkin, Grafana), instance monitoring and configuration management. I’m going to say a little more about configuration management with Spring Cloud Config framework.

Spring Cloud Config provides server and client-side support for externalized configuration in a distributed system. With the Config Server you have a central place to manage external properties for applications across all environments.

The concept of using configuration server inside microservices architecture is visualized on the figure below. The configuration is stored in the version control system (in the most cases it is Git) as a YAML or properties files. Spring Cloud Config Server pulls configuration from VCS and exposes it as RESTful endpoints. Configuration server registers itself at a discovery service. Every microservice application connects to registration service to discover an address of configuration server using its name. Then it invokes REST endpoint to download the newest configuration settings on startup.

config-server

Sample application

Sample application source code is available on GitHub. For the purpose of this example, I also created a repository for storing configuration files, which is available here. Let’s begin from configuration server. To enable configuration server and its registration in the discovery service we have to add following dependencies into pom.xml.

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>

In the application main class we should add the following annotations.

@SpringBootApplication
@EnableConfigServer
@EnableDiscoveryClient
public class ConfigServer {

	public static void main(String[] args) {
		SpringApplication.run(ConfigServer.class, args);
	}

}

The last thing to do is to define configuration in application.yml. I set default port, application name (for discovery) and Git repository address and credentials. Spring Cloud Config Server by default makes a clone of the remote git repository and if the local copy gets dirty it cannot update the local copy from remote repository.  To solve this problem I set a force-pull property to force Spring Cloud Config Server pull from remote repository every time a new request is incoming.

server:
  port: ${PORT:9999}

spring:
  application:
    name: config-server
  cloud:
    config:
      server:
        git:
          uri: https://github.com/piomin/sample-config-repo.git
          force-pull: true
          username: ${github.username}
          password: ${github.password}

It’s everything that had to be done on the server side. If you run your Spring Boot application it should be visible in discovery service as config-server. To enable interaction with config server on the client side we should add one dependency in pom.xml.

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-config</artifactId>
</dependency>

According to theory we should not have basic configuration defined in application.yml file but in bootstrap.yml. Why we need have anything there? At least application has to know discovery server address to be able to invoke configuration server. In addition, we can override default parameters for configuration invoking, such as config server discovery name (the default is configserver), configuration name, profile and label. By default microservice tries to detect configuration with name equal to ${spring.application.name}, label equal to ‘master’ and profiles read from ${spring.profiles.active} property.

spring:
  application:
    name: account-service
  cloud:
    config:
      discovery:
        enabled: true
        serviceId: config-server
      name: account
      profile: development
      label: develop

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
  instance:
    leaseRenewalIntervalInSeconds: 1
    leaseExpirationDurationInSeconds: 2

The further part of the application configuration is located in the dedicated repository in account-development.yml file. Application tries to find this file in ‘develop’ branch. Such a file is cloned by configuration server and exposed in all the following REST endpoints:
/{application}/{profile}[/{label}]
/{application}-{profile}.yml
/{label}/{application}-{profile}.yml
/{application}-{profile}.properties
/{label}/{application}-{profile}.properties

If you call in your web browser our example configuration available under first endpoint http://localhost:9999/account/development/develop you should see full configuration in JSON format, where properties are available inside propertySources. Let me say some words about account-service configuration. Here’s YAML file where I set server port, mongo database connection settings, ribbon client configuration and specific application settings – the list of test accounts.

server:
  port: ${PORT:2222}

spring:
  data:
    mongodb:
      host: 192.168.99.100
      port: 27017
      username: micro
      password: micro

ribbon:
  eureka:
    enabled: true

test:
  accounts:
    - id: 1
      number: '0654321789'
      balance: 2500
      customerId: 1
    - id: 2
      number: '0654321780'
      balance: 0
      customerId: 1
    - id: 3
      number: '0650981789'
      balance: 12000
      customerId: 2

Before running application you should start mongo database.

docker run -d --name mongo -p 27017:27017 mongodb

All the find endpoints can be switched to connect mongodb repository or test accounts repository read form remote configuration by passing parameter ‘true’ in the end of each REST path. Test data is read from configuration file which is stored under ‘test’ key.

@Repository
@ConfigurationProperties(prefix = "test")
public class TestAccountRepository {

	private List<Account> accounts;

	public List<Account> getAccounts() {
		return accounts;
	}

	public void setAccounts(List<Account> accounts) {
		this.accounts = accounts;
	}

	public Account findByNumber(String number) {
		return accounts.stream().filter(it -> it.getNumber().equals(number)).findFirst().get();
	}

}

Dynamic configuration reload

Ok, now our application configuration is loaded from server on startup. But let’s imagine we need to dynamically reload it without application restart. It is also possible with Spring Cloud Config. To enable this feature we need to add a dependency on the spring-cloud-config-monitor library and activate the Spring Cloud Bus. In the presented sample I used AMQP message broker RabbitMQ as cloud bus provider.

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-config-monitor</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

To enable monitor for configuration server set the following property in application.yml file.

spring:
  application:
    name: config-server
  cloud:
    config:
      server:
        monitor:
          github:
            enabled: true

Now we have /monitor endpoint available on config server. The library spring-cloud-starter-bus-amqp should also be added on the client side. Monitor endpoint can be invoked by webhook configured on Git repository manager like Github, Bitbucket or Gitlab. We can also easily simulate such a webhook by calling POST /monitor manually. For example GitHub command should has the header X-Github-Event: push and JSON body with changes information like {"commits": [{"modified": ["account-service.yml"]}]}.

Like I mentioned before for the sample we use RabbitMQ server. It can be launched using its docker image.

docker run -d --name rabbit -p 30000:5672 -p 30001:15672 rabbitmq:management

To override spring auto configuration for RabbitMQ put following lines in your configuration on the both client and server side.

spring:
  rabbitmq:
    host: 192.168.99.100
    port: 30000
    username: guest
    password: guest

I also have to modify a little client service configuration to make it works with push notifications. Now it looks like as you can see below. When I overrided default application name using spring.cloud.config.* properties the event RefreshRemoteApplicationEvent has not been reveived by account service.

spring:
  application:
    name: account-service
  cloud:
    config:
      discovery:
        enabled: true
        serviceId: config-server
      profile: default

To enable dynamic configuration refreshing add @RefreshScope annotation to Spring bean. I enabled refresh on the client’s side beans: AccountController and TestAccountRepository. Finally we can test our configuration.

1. I changed and committed one property inside account-service.yml, for example balance for test.accounts with id=1.

2. Then I called POST request on /monitor endpoint with payload {"commits": [{"modified": ["account-service.yml"]}]}

3. If account service received refresh event from configuration server you should see in your logs the following fragment:
Received remote refresh request. Keys refreshed [test.accounts[0].balance]

4. Now, you can invoke test endpoint for modified account number, for me it was http://localhost:2222/accounts/0654321789/true.

Conclusion

With the Config Server you have a central place to manage configuration for applications across all environments. You can take advantage of the benefits offered by VCS systems such as branching or versioning or define native support for local files. The configuration can be reloaded only at application startup or dynamically after each change committed in the VCS repository. Spring Cloud Config Server is available for discovery and can be autodetected by all microservices registered at register server like Eureka. There are several alternative mechanisms for automatic configuration management for Spring Boot applications like Spring Cloud Consul Config or Spring Cloud Zookeeper Config.

Custom metrics visualization with Grafana and InfluxDB

If you need a solution for querying and visualizing time series and metrics probably your first choice will be Grafana. Grafana is a visualization dashboard and it can collect data from some different databases like MySQL, Elasticsearch and InfluxDB. At present it is becoming very popular to integrate with InfluxDB as a data source. This is a solution specifically designed for storing real-time metrics and events and is very fast and scalable for time-based data. Today, I’m going to show an example Spring Boot application of metrics visualization based on Grafana, InfluxDB and alerts using Slack communicator.

Spring Boot Actuator exposes some endpoint useful for monitoring and interacting with application. It also includes a metrics service with gauge and counter support. Gauge records a single value, counter records incremented or decremented value in all previous steps. The full list of basic metrics is available in Spring Boot documentation here and these are for example free memory, heap usage, datasource pool usage or thread information. We can also define our own custom metrics. To allow exporting such values into InfluxDB we need to declare bean @ExportMetricWriter. Spring Boot has not build-in metrics exporter for InfluxDB, so we have add influxdb-java library into pom.xml dependencies and define connection properties.

	@Bean
	@ExportMetricWriter
	GaugeWriter influxMetricsWriter() {
		InfluxDB influxDB = InfluxDBFactory.connect("http://192.168.99.100:8086", "root", "root");
		String dbName = "grafana";
		influxDB.setDatabase(dbName);
		influxDB.setRetentionPolicy("one_day");
		influxDB.enableBatch(10, 1000, TimeUnit.MILLISECONDS);

		return new GaugeWriter() {

			@Override
			public void set(Metric<?> value) {
				Point point = Point.measurement(value.getName()).time(value.getTimestamp().getTime(), TimeUnit.MILLISECONDS)
						.addField("value", value.getValue()).build();
				influxDB.write(point);
				logger.info("write(" + value.getName() + "): " + value.getValue());
			}
		};
	}

The metrics should be read from Actuator endpoint, so we should declare MetricsEndpointMetricReader bean.

	@Bean
	public MetricsEndpointMetricReader metricsEndpointMetricReader(final MetricsEndpoint metricsEndpoint) {
		return new MetricsEndpointMetricReader(metricsEndpoint);
	}

We can customize exporting process by declaring properties inside application.yml file. In the code fragment below there are two parameters: delay-millis which set metrics export interval to 5 seconds and includes, where we can define which metric should be exported.

spring:
  metrics:
    export:
      delay-millis: 5000
      includes: heap.used,heap.committed,mem,mem.free,threads,datasource.primary.active,datasource.primary.usage,gauge.response.persons,gauge.response.persons.id,gauge.response.persons.remove

To easily run Grafana and InfluxDB let’s use docker.

docker run -d --name grafana -p 3000:3000 grafana/grafana
docker run -d --name influxdb -p 8086:8086 influxdb

Grafana is available under default security credentials admin/admin. The first step is to create InfluxDB data source.

grafana-3
Now, we can create our new dashboard and add some graphs. Before it run Spring Boot sample application to export metrics some data into InfluxDB. Grafana has user friendly support for InfluxDB queries, where you can click the entire configuration and have a hint of syntax. Of course there is also a possibility of writing text queries, but not all of query language features are available.

grafana-4

Here’s the picture with my Grafana dashboard for metrics passed in includes property. On the second picture below you can see enlarged graph with average REST methods processing time.

grafana-1

grafana-2

We can always implement our custom service which generates metrics sent to InfluxDB. Spring Boot Actuator provides two classes for that purpose: CounterService and GaugeService. Below, there is example of GaugeService usage, where the random value between 0 and 100 is generated in 100ms intervals.

@Service
public class FirstService {

    private final GaugeService gaugeService;

    @Autowired
    public FirstService(GaugeService gaugeService) {
        this.gaugeService = gaugeService;
    }

    public void exampleMethod() {
    	Random r = new Random();
    	for (int i = 0; i < 1000000; i++) {
    		this.gaugeService.submit("firstservice", r.nextDouble()*100);
    		try {
			Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
    }

}

The sample bean FirstService is starting after application startup.

@Component
public class Start implements ApplicationListener<ContextRefreshedEvent> {

	@Autowired
	private FirstService service1;

	@Override
	public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
		service1.exampleMethod();
	}

}

Now, let’s configure alert notification using Grafana dashboard and Slack. This feature is available from 4.0 version. I’m going to define a threshold for statistics sent by FirstService bean. If you have already created graph for gauge.firstservice (you need to add this metric name into includes property inside application.yml) go to edit section and then to Alert tab. There you can define alerting condition by selecting aggregating function (for example avg, min, max), evaluation interval and threshold value. For my sample visible in the picture below I selected alerting when maximum value is bigger than 95 and conditions should be evaluated in 5 minute intervals.

grafana-5

After creating alert configuration we should define notification channel. There are some interesting supported notification types like email, Hip Chat, webhook or Slack. When configuring Slack notification we need to pass recipient’s address or channel name and incoming webhook URL. Then, add new notification for your alert sent to Slack in Notifications section.

grafana-6

I created dedicated channel #grafana for Grafana notification on my Slack account and attached incoming webhook to this channel by searching it in Channel Settings -> Add app or integration.

grafana-7

Finally, run my sample application and don’t forget to logout from Grafana Dashboard in case you would like to receive alert on Slack.

Serverless on AWS with DynamoDB, SNS and CloudWatch

In one of my previous posts Serverless on AWS Lambda I presented an example of creating REST API based on AWS Lambda functions. However, we should keep in mind that this mechanism is also used to exchange events between services (SaaS) provided by AWS. Now I will show such an example of using object database like DynamoDB, sending messages with Simple Notification Service (SNS) and monitoring logs with CloudWatch.

Let’s begin from our sample application. For our test purposes I designed simple system which grants some bonuses basing on incoming orders. First, we are invoking service which put order record into DynamoDB table. Basing on insert event which triggers Lambda function we are processing this event and perform transaction on customer account which id is stored in another DynamoDB table. Afterwards we are sending message to the topic with order information. This topic is created using Amazon SNS service and there are three Lambda functions listening for incoming messages. Each of them grants a bonus that recharges customer account basing on different input data. System architecture is visualized on the figure below. Sample application source code is available on GitHub.

aws

Every AWS Lambda function needs to implement RequestHandler interface. For more details about basic rules, deployment process and usable tools go to my first article about that subject Serverless on AWS Lambda. Coming back to our sample below you can see implementation of first lambda function PostOrder. It does nothing more saving incoming Order object in DynamoDB table. For storing data in DynamoDB we can use ORM mechanism available inside AWS Java libraries. How to use basic DynamoDB annotations you can also read in my first article about serverless.

public class PostOrder implements RequestHandler<Order, Order> {

	private DynamoDBMapper mapper;

	public PostOrder() {
		AmazonDynamoDBClient client = new AmazonDynamoDBClient();
		mapper = new DynamoDBMapper(client);
	}

	@Override
	public Order handleRequest(Order o, Context ctx) {
		LambdaLogger logger = ctx.getLogger();
		mapper.save(o);
		Order r = o;
		logger.log("Order saved: " + r.getId());
		return r;
	}

}

Assuming we have our first function implemented and deployed on AWS we should configure API gateway which expose it outside. To achieve it go to Lambda Management Console on AWS, select PostOrder function. Then go to Triggers section and select API Gateway as a trigger for calling your function.

lambda-1

Unfortunately it’s not all we need to have our API gateway redirecting requests to Lambda function. Go to API Gateway section and select OrderService. We should remove ANY method and configure POST invoking our function as you see on the picture below.

lambda-3

Then you should see diagram visible below where all the steps of calling lambda function from API Gateway are visualized.

lambda-4

What’s worth doing is to create a model object in Model section. For Order class it should look like in the picture below, which is with compatible JSON schema notation. After creating model definition set it as a request body inside Method Request panel and response body inside Method Response panel.

lambda-5

Finally, deploy the resource using Deploy API action and try to call it on your which can be checked in Stages section.

lambda-6

Let’s see the second implementation of lambda function – ProcessOrderFunction. It is triggered by insert event received from DynamoDB order table. This function is responsible for reading data from incoming event, then create and send message to the target topic. DynamodbEvent stores data as a map, where the key is column name in order table. To get value from map we have to pass data type, for example string is collected using getS method and integer using getN method. The message send to SNS topic is serialized to JSON string with Jackson library.

public class ProcessOrder implements RequestHandler<DynamodbEvent, String> {

	private AmazonSNSClient client;
	private ObjectMapper jsonMapper;

	public ProcessOrder() {
		client = new AmazonSNSClient();
		jsonMapper = new ObjectMapper();
	}

	public String handleRequest(DynamodbEvent event, Context ctx) {
		LambdaLogger logger = ctx.getLogger();
		final List<DynamodbStreamRecord> records = event.getRecords();

		for (DynamodbStreamRecord record : records) {
			try {
				logger.log(String.format("DynamoEvent: %s, %s", record.getEventName(), record.getDynamodb().getNewImage().values().toString()));
				Map<String, AttributeValue> m = record.getDynamodb().getNewImage();
				Order order = new Order(m.get("id").getS(), m.get("accountId").getS(), Integer.parseInt(m.get("amount").getN()));
				String msg = jsonMapper.writeValueAsString(order);
				logger.log(String.format("SNS message: %s", msg));
				PublishRequest req = new PublishRequest("arn:aws:sns:us-east-1:658226682183:order", jsonMapper.writeValueAsString(new OrderMessage(msg)), "Order");
				req.setMessageStructure("json");
				PublishResult res = client.publish(req);
				logger.log(String.format("SNS message sent: %s", res.getMessageId()));
			} catch (JsonProcessingException e) {
				logger.log(e.getMessage());
			}
		}

		return "OK";
	}
}

Same as for PostOrder function we also should add trigger for ProcessOrder – but this time the trigger is DynamoDB table.

lambda-10

In the Simple Notification Service section create order topic. Amazon SNS client uses ARN address for identifying the right topic. As you can see on the picture below there is also topic for DynamoDB which was created with database trigger.

lambda-11

The last implementation step in the sample is to create lambda functions which are listening on SNS topic for incoming order messages. Here’s OrderAmountHandler function code. The logic is simple. After message receive it needs to perform deserialization from JSON, then check order amount and modify balance value in account table using accountId from order object.

public class OrderAmountHandler implements RequestHandler<SNSEvent, Object> {

	private final static int AMOUNT_THRESHOLD = 1500;
	private final static int AMOUNT_BONUS_PERCENTAGE = 10;

	private DynamoDBMapper mapper;
	private ObjectMapper jsonMapper;

	public OrderAmountHandler() {
		AmazonDynamoDBClient client = new AmazonDynamoDBClient();
		mapper = new DynamoDBMapper(client);
		jsonMapper = new ObjectMapper();
	}

	@Override
	public Object handleRequest(SNSEvent event, Context context) {
		final LambdaLogger logger = context.getLogger();
		final List<SNSRecord> records = event.getRecords();

		for (SNSRecord record : records) {
			logger.log(String.format("SNSEvent: %s, %s", record.getSNS().getMessageId(), record.getSNS().getMessage()));
			try {
				Order o = jsonMapper.readValue(record.getSNS().getMessage(), Order.class);
				if (o.getAmount() >= AMOUNT_THRESHOLD) {
					logger.log(String.format("Order allowed: id=%s, amount=%d", o.getId(), o.getAmount()));
					Account a = mapper.load(Account.class, o.getId());
					a.setBalance(a.getBalance() + o.getAmount() * AMOUNT_BONUS_PERCENTAGE);
					mapper.save(a);
					logger.log(String.format("Account balande update: id=%s, amount=%d", a.getId(), a.getBalance()));
				}
			} catch (IOException e) {
				logger.log(e.getMessage());
			}
		}

		return "OK";
	}

}

After creating and deploying all our three functions we have to subscribe them into the order topic.

lambda-7

All logs from your lambda functions can be inspected with CloudWatch service.

lambda-8

lambda-9

Don’t forget about permissions in My Security Credentials section. For my example I had to attach the following policies to my default execution role: AmazonAPIGatewayInvokeFullAccess, AmazonDynamoDBFullAccess, AWSLambdaDynamoDBExecutionRole and AmazonSNSFullAccess.