In-memory data grid with Apache Ignite

Apache Ignite is a relatively new solution, but quickly increasing its popularity. It is hard to assigned to a single area of database engines division, because it has characteristics typical for some of them. The primary purpose of this solution is an in memory data grid and a key-value storage. It also has some common RDBMS features like support for SQL queries and ACID transactions. But that’s not to say it is full SQL and transactional database. It does not support foreign key constraints and transactions are available only at key-value level. Despite that Apache Ignite seems to be very interesting solution.

Apache Ignite may be easily started as a node embedded to Spring Boot application. The simplest way to achieve that is by using Spring Data Ignite library. Apache Ignite implements Spring Data CrudRepository interface that supports basic CRUD operations and also provides access to the Apache Ignite SQL Grid using the unified Spring Data interfaces. Although it has a support for distributed, ACID and SQL-compliant disk store persistence we design a solution which store in-memory cache objects in MySQL database. The architecture of presented solution is visible on the figure below and you can see it is very simple. The application put data to the in-memory cache on Apache Ignite. Apache Ignite automatically synchronizes this changes with database in an asynchronous, background task. The way of reading data by application also should not surprise you. If an entity is not cached it is read from database and put to the cache for a future use.

ignite

I’m going to guide you through the process of the sample application development. The result of this development is available on GitHub. I have found a few examples on the web, but there were only the basics. I’ll show you how to configure Apache Ignite to write objects from cache in database and create some more complex cross-cache join queries. Let’s begin from running database.

1. Setup MySQL database

The best way to start MySQL database locally is of course by Docker container. For Docker on Windows, MySQL database is now available on 192.168.99.100:33306.

docker run -d --name mysql -e MYSQL_DATABASE=ignite -e MYSQL_USER=ignite -e MYSQL_PASSWORD=ignite123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

The next step is to create tables used by application entities to store the data: PERSON, CONTACT. Those to tables are in 1…N relation where table CONTACT holds the foreign key referenced to PERSON id.

CREATE TABLE `person` (
  `id` int(11) NOT NULL,
  `first_name` varchar(45) DEFAULT NULL,
  `last_name` varchar(45) DEFAULT NULL,
  `gender` varchar(10) DEFAULT NULL,
  `country` varchar(10) DEFAULT NULL,
  `city` varchar(20) DEFAULT NULL,
  `address` varchar(45) DEFAULT NULL,
  `birth_date` date DEFAULT NULL,
  PRIMARY KEY (`id`)
);

CREATE TABLE `contact` (
  `id` int(11) NOT NULL,
  `location` varchar(45) DEFAULT NULL,
  `contact_type` varchar(10) DEFAULT NULL,
  `person_id` int(11) NOT NULL,
  PRIMARY KEY (`id`)
);

ALTER TABLE `ignite`.`contact` ADD INDEX `person_fk_idx` (`person_id` ASC);
ALTER TABLE `ignite`.`contact`
ADD CONSTRAINT `person_fk` FOREIGN KEY (`person_id`) REFERENCES `ignite`.`person` (`id`) ON DELETE CASCADE ON UPDATE CASCADE;

2. Maven configuration

The easiest way to start working with Apache Ignite’s Spring Data repository is by adding the following Maven dependency to an application’s pom.xml file. All the other Ignite dependencies would be automatically included. We also need MySQL JDBC driver, Spring JDBC dependencies to configure connection to database. They are required, because we are embedding Apache Ignite to the application and it has to establish connection with MySQL in order to be able to synchronize cache with database tables.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <scope>runtime</scope>
</dependency>
<dependency>
   <groupId>org.apache.ignite</groupId>
   <artifactId>ignite-spring-data</artifactId>
   <version>${ignite.version}</version>
</dependency>

3. Configure Ignite node

Using IgniteConfiguration class we are able to configure all available Ignite’s node settings. The most important thing here is a cache configuration (1). We should add primary key and entity classes as an indexed types (2). Then we have to enable export cache updates to database (3) and read data not found in a cache from database (4). The interaction between Ignite’s node and MySQL may be configured using CacheJdbcPojoStoreFactory class (5). We should pass there DataSource @Bean (6), dialect (7) and mapping between object fields and table columns (8).

@Bean
public Ignite igniteInstance() {
   IgniteConfiguration cfg = new IgniteConfiguration();
   cfg.setIgniteInstanceName("ignite-1");
   cfg.setPeerClassLoadingEnabled(true);

   CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache"); // (1)
   ccfg2.setIndexedTypes(Long.class, Contact.class); // (2)
   ccfg2.setWriteBehindEnabled(true);
   ccfg2.setWriteThrough(true); // (3)
   ccfg2.setReadThrough(true); // (4)
   CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>(); // (5)
   f2.setDataSource(datasource); // (6)
   f2.setDialect(new MySQLDialect()); // (7)
   JdbcType jdbcContactType = new JdbcType(); // (8)
   jdbcContactType.setCacheName("ContactCache");
   jdbcContactType.setKeyType(Long.class);
   jdbcContactType.setValueType(Contact.class);
   jdbcContactType.setDatabaseTable("contact");
   jdbcContactType.setDatabaseSchema("ignite");
   jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"), new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"), new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
   f2.setTypes(jdbcContactType);
   ccfg2.setCacheStoreFactory(f2);

   CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache");
   ccfg.setIndexedTypes(Long.class, Person.class);
   ccfg.setWriteBehindEnabled(true);
   ccfg.setReadThrough(true);
   ccfg.setWriteThrough(true);
   CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>();
   f.setDataSource(datasource);
   f.setDialect(new MySQLDialect());
   JdbcType jdbcType = new JdbcType();
   jdbcType.setCacheName("PersonCache");
   jdbcType.setKeyType(Long.class);
   jdbcType.setValueType(Person.class);
   jdbcType.setDatabaseTable("person");
   jdbcType.setDatabaseSchema("ignite");
   jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"), new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"), new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"), new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"), new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"), new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"), new JdbcTypeField(Types.DATE, "birth_date", Date.class, "birthDate"));
   f.setTypes(jdbcType);
   ccfg.setCacheStoreFactory(f);

   cfg.setCacheConfiguration(ccfg, ccfg2);
   return Ignition.start(cfg);
}

Here’s Spring datasource configuration for MySQL running as Docker container.

spring:
  datasource:
    name: mysqlds
    url: jdbc:mysql://192.168.99.100:33306/ignite?useSSL=false
    username: ignite
    password: ignite123

On that occasion it should be mentioned that Apache Ignite has still has some definencies. For example, it maps Enum to integer taking its ordinal value although it has configured VARCHAR as JDCB type. When reading such a row from database it is not mapped properly to Enum in object – you would have null in this response field.

new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type")

4. Model objects

Like I mentioned before we have two tables in the database schema. There are also two model classes and two cache configuration one per each model class. Here’s model class implementation. One of the few interesting things here is ID generation with AtomicLong class. It is one of basic Ignite’s component acting as sequence generator. We can also see a specific annotation @QuerySqlField, which marks the field as available for usage as a query parameter in SQL.

@QueryGroupIndex.List(
   @QueryGroupIndex(name="idx1")
)
public class Person implements Serializable {

   private static final long serialVersionUID = -1271194616130404625L;
   private static final AtomicLong ID_GEN = new AtomicLong();

   @QuerySqlField(index = true)
   private Long id;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 0)
   private String firstName;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 1)
   private String lastName;
   private Gender gender;
   private Date birthDate;
   private String country;
   private String city;
   private String address;
   private List<Contact> contacts = new ArrayList<>();

   public void init() {
	  this.id = ID_GEN.incrementAndGet();
   }

   public Long getId() {
	  return id;
   }

   public void setId(Long id) {
	  this.id = id;
   }

   public String getFirstName() {
	  return firstName;
   }

   public void setFirstName(String firstName) {
	  this.firstName = firstName;
   }

   public String getLastName() {
	  return lastName;
   }

   public void setLastName(String lastName) {
	  this.lastName = lastName;
   }

   public Gender getGender() {
	  return gender;
   }

   public void setGender(Gender gender) {
	  this.gender = gender;
   }

   public Date getBirthDate() {
	  return birthDate;
   }

   public void setBirthDate(Date birthDate) {
	  this.birthDate = birthDate;
   }

   public String getCountry() {
	  return country;
   }

   public void setCountry(String country) {
	  this.country = country;
   }

   public String getCity() {
	  return city;
   }

   public void setCity(String city) {
	  this.city = city;
   }

   public String getAddress() {
	  return address;
   }

   public void setAddress(String address) {
	  this.address = address;
   }

   public List<Contact> getContacts() {
	  return contacts;
   }

   public void setContacts(List<Contact> contacts) {
	  this.contacts = contacts;
   }

}

5. Ignite repositories

I assume that you are familiar with Spring Data JPA concept of creating repositories. A repository handling should be enabled on the main or @Configuration class.

@SpringBootApplication
@EnableIgniteRepositories
public class IgniteRestApplication {

   @Autowired
   DataSource datasource;

   public static void main(String[] args) {
	SpringApplication.run(IgniteRestApplication.class, args);
   }

   // ...
}

Then we have to extend our @Repository interface with base CrudRepository interface. It supports only inherited methods with id parameter. In the PersonRepository fragment visible below I defined some find methods using Spring Data naming convention and Ignite’s queries. In those samples you can see that we can return full object or selected fields as a query result – according to the needs.

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> {

	List<Person> findByFirstNameAndLastName(String firstName, String lastName);

	@Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
	List<Contact> selectContacts(String firstName, String lastName);

	@Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
	List<List<?>> selectContacts2(String firstName, String lastName);
}

6. API and testing

Finally, we can inject the repository beans to the REST controller classes. API would expose methods for adding new object to the cache, updating or removing existing objects and some for searching using the primary key or the other more complex indices.

@RestController
@RequestMapping("/person")
public class PersonController {

	private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

	@Autowired
	PersonRepository repository;

	@PostMapping
	public Person add(@RequestBody Person person) {
		person.init();
		return repository.save(person.getId(), person);
	}

	@PutMapping
	public Person update(@RequestBody Person person) {
		return repository.save(person.getId(), person);
	}

	@DeleteMapping("/{id}")
	public void delete(Long id) {
		repository.delete(id);
	}

	@GetMapping("/{id}")
	public Person findById(@PathVariable("id") Long id) {
		return repository.findOne(id);
	}

	@GetMapping("/{firstName}/{lastName}")
	public List<Person> findByName(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
		return repository.findByFirstNameAndLastName(firstName, lastName);
	}

	@GetMapping("/contacts/{firstName}/{lastName}")
	public List<Person> findByNameWithContacts(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
		List<Person> persons = repository.findByFirstNameAndLastName(firstName, lastName);
		List<Contact> contacts = repository.selectContacts(firstName, lastName);
		persons.stream().forEach(it -> it.setContacts(contacts.stream().filter(c -> c.getPersonId().equals(it.getId())).collect(Collectors.toList())));
		LOGGER.info("PersonController.findByIdWithContacts: {}", contacts);
		return persons;
	}

	@GetMapping("/contacts2/{firstName}/{lastName}")
	public List<Person> findByNameWithContacts2(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
		List<List<?>> result = repository.selectContacts2(firstName, lastName);
		List<Person> persons = new ArrayList<>();
		for (List<?> l : result) {
			persons.add(mapPerson(l));
		}
		LOGGER.info("PersonController.findByIdWithContacts: {}", result);
		return persons;
	}

	private Person mapPerson(List<?> l) {
		Person p = new Person();
		Contact c = new Contact();
		p.setId((Long) l.get(0));
		p.setFirstName((String) l.get(1));
		p.setLastName((String) l.get(2));
		c.setId((Long) l.get(3));
		c.setType((ContactType) l.get(4));
		c.setLocation((String) l.get(4));
		p.addContact(c);
		return p;
	}

}

It is certainly important to test the performance of the implementated solution, especially when it is related with in-memory data grid and databases. For that purpose I created some junit tests which put a large number of objects into the cache and then invoke some find methods using random input data to test queries performance. Here’s method which generates many Person and Contact objects and puts them into cache using API endpoints.

@Test
public void testAddPerson() throws InterruptedException {
	ExecutorService es = Executors.newCachedThreadPool();
	for (int j = 0; j < 10; j++) { es.execute(() -> {
		TestRestTemplate restTemplateLocal = new TestRestTemplate();
			Random r = new Random();
			for (int i = 0; i < 1000000; i++) {
				Person p = restTemplateLocal.postForObject("http://localhost:8090/person", createTestPerson(), Person.class);
				int x = r.nextInt(6);
				for (int k = 0; k < x; k++) {
					restTemplateLocal.postForObject("http://localhost:8090/contact", createTestContact(p.getId()), Contact.class);
				}
			}
		});
	}
	es.shutdown();
	es.awaitTermination(60, TimeUnit.MINUTES);
}

Spring Boot provides methods for capturing basic metrics of API response times. To enable that feature we have to include Spring Actuator to the dependencies. Metrics endpoint is available under http://localhost:8090/metrics address. In addition to each API method processing time it also prints such statistics like number of running threads or free memory.

7. Running application

Let’s run our sample application with embedded Apache Ignite’s node. Following some performance suggestions available in the Ignite’s docs I defined JVM configuration visible below.

java -jar -Xms512m -Xmx1024m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC target/ignite-rest-service-1.0-SNAPSHOT.jar

Now, we can run JUnit test class IgniteRestControllerTest. It puts some data into the cache and then calls find methods. The metrics for the tests with 1M Person objects and 2.5M Contact objects in the cache are visible below. All find methods have taken about 1ms on average.

{
"mem": 624886,
"mem.free": 389701,
"processors": 4,
"instance.uptime": 2446038,
"uptime": 2466661,
"systemload.average": -1,
"heap.committed": 524288,
"heap.init": 524288,
"heap.used": 133756,
"heap": 1048576,
"threads.peak": 107,
"threads.daemon": 25,
"threads.totalStarted": 565,
"threads": 80,
...
"gauge.response.person.contacts.firstName.lastName": 1,
"gauge.response.contact": 1,
"gauge.response.person.firstName.lastName": 1,
"gauge.response.contact.location.location": 1,
"gauge.response.person.id": 1,
"gauge.response.person": 0,
"counter.status.200.person.id": 1000,
"counter.status.200.person.contacts.firstName.lastName": 1000,
"counter.status.200.person.firstName.lastName": 1000,
"counter.status.200.contact": 2500806,
"counter.status.200.person": 1000000,
"counter.status.200.contact.location.location": 1000
}
Advertisements

Envoy Proxy with Microservices

Introduction

I came across Envoy proxy for the first time a couple weeks ago, when one of my blog readers suggested me to write an article about it. I had never heard about it before and my first thought was that it is not my area of experience. In fact, this tool is not as popular as its competition like nginx or haproxy, but it provides some interesting features among which we can distinguish out-of-the-box support for MongoDB, Amazon RDS, flexibility around discovery and load balancing or generating a lot of useful traffic statistics. Ok, we know a little about its advantages but what exactly is Envoy proxy? ‘Envoy is an open source edge and service proxy, designed for cloud-native applications’. It was originally developed by Lift as a high performance C++ distributed proxy designed for standalone services and applications, as well as for large microservices service mesh. It sounds really good right now. That’s why I decided to take a closer look on it and prepare a sample of service discovery and distributed tracing realized with Envoy and microservices based on Spring Boot.

Envoy Configuration

In the most of previous samples basing on Spring Cloud we have used Zuul as edge and proxy. Zuul is popular Netflix OSS tool acting as API Gateway in your microservices architecture. As it turns out, it can be successfully replaced by Envoy proxy. One of the things I really like in Envoy is the way to create configuration. The default format is JSON and is validated against JSON schema. This JSON properties and schema are documented well and can be easily understood. Just what you’d expect from modern solution the recomended way to get started with it is by using the pre-built Docker images. So, in the beginning we have to create Dockerfile for bulding Docker image with Envoy and provide configuration file in JSON format. Here’s my Dockerfile. Parameters service-cluster and service-node are optional and has to do with provided configuration for service discovery, which I’ll say more about in a minute.

FROM lyft/envoy:latest
RUN apt-get update
COPY envoy.json /etc/envoy.json
CMD /usr/local/bin/envoy -c /etc/envoy.json --service-cluster samplecluster --service-node sample1

I assume you have a basic knowledge about Docker and its commands, which is mandatory at this point. After providing envoy.json configuration file we can proceed with building Docker image.

docker build -t envoy:v1 .

Then just run it using docker run command. Useful ports should be exposes outside.

docker run -d --name envoy -p 9901:9901 -p 10000:10000 envoy:v1

The first pretty helpful feature is local HTTP administrator server. It can be configured in JSON file inside admin property. For the example purpose I selected port 9901 and as you probably noticed I also had exposed that port outside Envoy Docker container. Now, admin console is available under http://192.168.99.100:9901/. If you invoke that address it prints all available commands. For me the most helpful were stats, which print all important statistics related with proxy and logging, where I could changed logging level dynamically for some of defined categories. So, first if you had any problems with Envoy try to change logging level by calling /logging?name=level and watch them on Docker container after running docker logs envoy command.

"admin": {
    "access_log_path": "/tmp/admin_access.log",
    "address": "tcp://0.0.0.0:9901"
}

The next required configuration property is listeners. There we define routing settings and the address on which Envoy will listen for incoming TCP connection. The notation tcp://0.0.0.0:10000 is the wild card match for any IPv4 address with port 10000. This port is also exposed outside Envoy Docker container. In this case it will therefore be our API gateway available under http://192.168.99.100:10000/ address. We will come back to the proxy configuration details at a ltare stage and now let’s take a closer look on the architecture of presented example.

"listeners": [{
    "address": "tcp://0.0.0.0:10000",
    ...
}]

Architecture

The architecture of described solution is visible on the figure below. We have Envoy proxy as API Gateway, which is an entry point to our system. Envoy integrates with Zipkin and sends there tracing messages with information about incoming HTTP requests and responses sent back. Two sample microservices Person and Product register itself in service discovery on startup and deregister on shutdown. They are hidden from external clients behind API Gateway . Envoy has to fetch actual configuration with addresses of registered services and route incoming HTTP request properly. If there are multiple instances of each service available it should perform load balancing.

envoy-arch

As it turns out Envoy does not support well known discovery servers like Consul or Zookeeper, but defines its own generic REST based API, which needs to be implemented to enable cluster members fetching. The main method of this API is GET /v1/registration/:service used for fetching the list of currently registered instances of service. Lyft’s provides its default implementation in Python, but for the example purpose we develope our own solution using Java and Spring Boot. Sample application source code is available on GitHub. In addition to service discovery implementation you would also find there two sample microservices.

Service Discovery

Our custom discovery implementation does nothing more than exposing REST based API with methods for registration, unregistration and fetching service’s instances. GET method needs to return specific JSON structure which matches the following schema.

{
    "hosts": [{
        "ip_address": "...",
        "port": "...",
        ...
    }]
}

Here’s REST controller class with discovery API implementation.

@RestController
public class EnvoyDiscoveryController {

    private static final Logger LOGGER = LoggerFactory.getLogger(EnvoyDiscoveryController.class);

    private Map<String, List<DiscoveryHost>> hosts = new HashMap<>();

    @GetMapping(value = "/v1/registration/{serviceName}")
    public DiscoveryHosts getHostsByServiceName(@PathVariable("serviceName") String serviceName) {
        LOGGER.info("getHostsByServiceName: service={}", serviceName);
        DiscoveryHosts hostsList = new DiscoveryHosts();
        hostsList.setHosts(hosts.get(serviceName));
        LOGGER.info("getHostsByServiceName: hosts={}", hostsList);
        return hostsList;
    }

    @PostMapping("/v1/registration/{serviceName}")
    public void addHost(@PathVariable("serviceName") String serviceName, @RequestBody DiscoveryHost host) {
        LOGGER.info("addHost: service={}, body={}", serviceName, host);
        List<DiscoveryHost> tmp = hosts.get(serviceName);
        if (tmp == null)
            tmp = new ArrayList<>();
        tmp.add(host);
        hosts.put(serviceName, tmp);
    }

    @DeleteMapping("/v1/registration/{serviceName}/{ipAddress}")
    public void deleteHost(@PathVariable("serviceName") String serviceName, @PathVariable("ipAddress") String ipAddress) {
        LOGGER.info("deleteHost: service={}, ip={}", serviceName, ipAddress);
        List<DiscoveryHost> tmp = hosts.get(serviceName);
        if (tmp != null) {
            Optional<DiscoveryHost> optHost = tmp.stream().filter(it -> it.getIpAddress().equals(ipAddress)).findFirst();
            if (optHost.isPresent())
                tmp.remove(optHost.get());
            hosts.put(serviceName, tmp);
        }
    }

}

Let’s get back to the Envoy configuration settings. Assuming we have built an image from Dockerfile visible below and then ran the container on default port we can invoke it under address http://192.168.99.100:9200. That address should be placed in envoy.json configuration file. Service discovery connection settings should be provided inside Cluster Manager section.

FROM openjdk:alpine
MAINTAINER Piotr Minkowski <piotr.minkowski@gmail.com>
ADD target/envoy-discovery.jar envoy-discovery.jar
ENTRYPOINT ["java", "-jar", "/envoy-discovery.jar"]
EXPOSE 9200

Here’s fragment from envoy.json file. Cluster for service discovery should be defined as a global SDS configuration, which must be specified inside sds property (1). The most important thing is to provide a correct URL (2) and on the basis of that Envoy automatically tries to call endpoint GET /v1/registration/{service_name}. The last interesting configuration field for that section is refresh_delay_ms, which is responsible for setting a delay between fetches a list of services registered in a discovery server. That’s not all. We also have to define cluster members. They are identified by the name (4). Their type is sds (5), what means that this cluster uses service discovery server for locating network addresses of calling microservice with the name defined in the service-name property.

"cluster_manager": {
    "clusters": [{
        "name": "service1", (4)
        "type": "sds", // (5)
	"connect_timeout_ms": 5000,
	"lb_type": "round_robin",
	"service_name": "person-service" // (6)
    }, {
        "name": "service2",
        "type": "sds",
        "connect_timeout_ms": 5000,
        "lb_type": "round_robin",
        "service_name": "product-service"
    }],
    "sds": { // (1)
	"cluster": {
		"name": "service_discovery",
		"type": "strict_dns",
		"connect_timeout_ms": 5000,
		"lb_type": "round_robin",
		"hosts": [{
			"url": "tcp://192.168.99.100:9200" // (2)
		}]
	},
	"refresh_delay_ms": 3000 // (3)
    }
}

Routing configuration is defined for every single listener inside route_config property (1). The first route is configured for person-service, which is processing by cluster service1 (2), second for product-service processing by service2 cluster. So, our services are available under http://192.168.99.100:10000/person and http://192.168.99.100:10000/product adresses.

{
    "name": "http_connection_manager",
    "config": {
        "codec_type": "auto",
        "stat_prefix": "ingress_http",
        "route_config": { // (1)
            "virtual_hosts": [{
		"name": "service",
		"domains": ["*"],
		"routes": [{
			"prefix": "/person", // (2)
			"cluster": "service1"
		}, {
			"prefix": "/product", // (3)
			"cluster": "service2"
		}]
            }]
        },
	"filters": [{
		"name": "router",
		"config": {}
        }]
    }
}

Building Microservices

The routing on Envoy proxy has been already configured. We still don’t have running microservices. Their implementation is based on Spring Boot framework and do nothing more than expose REST API providing simple operations on the object’s list and registering/unregistering service on discovery server. Here’s @Service bean responsible for that registration. The onApplicationEvent method is fired after application startup and destroy method just before gracefully shutdown.

@Service
public class PersonRegister implements ApplicationListener<ApplicationReadyEvent> {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonRegister.class);

    private String ip;
    @Value("${server.port}")
    private int port;
    @Value("${spring.application.name}")
    private String appName;
    @Value("${envoy.discovery.url}")
    private String discoveryUrl;

    @Autowired
    RestTemplate template;

	@Override
	public void onApplicationEvent(ApplicationReadyEvent event) {
		LOGGER.info("PersonRegistration.register");
		try {
			ip = InetAddress.getLocalHost().getHostAddress();
			DiscoveryHost host = new DiscoveryHost();
			host.setPort(port);
			host.setIpAddress(ip);
			template.postForObject(discoveryUrl + "/v1/registration/{service}", host, DiscoveryHosts.class, appName);
		} catch (Exception e) {
			LOGGER.error("Error during registration", e);
		}
	}

	@PreDestroy
	public void destroy() {
		try {
			template.delete(discoveryUrl + "/v1/registration/{service}/{ip}/", appName, ip);
			LOGGER.info("PersonRegister.unregistered: service={}, ip={}", appName, ip);
		} catch (Exception e) {
			LOGGER.error("Error during unregistration", e);
		}
	}

}

The best way to shutdown Spring Boot application gracefully is by its Actuator endpoint. To enable such endpoints for the service include spring-boot-starter-actuator to your project dependencies. Shutdown is disabled by default, so we should add the following properties to application.yml to enable it and additionally disable default security (endpoints.shutdown.sensitive=false). Now, just by calling POST /shutdown we can stop our Spring Boot application and test unregister method.

endpoints:
  shutdown:
    enabled: true
    sensitive: false

Same as before for microservices we also build docker images. Here’s person-service Dockerfile, which allows to override default service and SDS port.

FROM openjdk:alpine
MAINTAINER Piotr Minkowski <piotr.minkowski@gmail.com>
ADD target/person-service.jar person-service.jar
ENV DISCOVERY_URL http://192.168.99.100:9200
ENTRYPOINT ["java", "-jar", "/person-service.jar"]
EXPOSE 9300

To build image and run container of the service with custom listen port type the following docker commands.

docker build -t piomin/person-service .
docker run -d --name person-service -p 9301:9300 piomin/person-service

Distributed Tracing

It is time for the last piece of the puzzle – Zipkin tracing. Statistics related to all incoming requests should be sent there. The first part of configuration in Envoy proxy is inside tracing property which specifies global settings for the HTTP tracer.

"tracing": {
    "http": {
        "driver": {
            "type": "zipkin",
            "config": {
                "collector_cluster": "zipkin",
                "collector_endpoint": "/api/v1/spans"
            }
        }
    }
}

Network location and settings for Zipkin connection should be defined as a cluster member.

"clusters": [{
    "name": "zipkin",
    "connect_timeout_ms": 5000,
    "type": "strict_dns",
    "lb_type": "round_robin",
    "hosts": [
      {
        "url": "tcp://192.168.99.100:9411"
      }
    ]
}]

We should also add new section tracing in HTTP connection manager configuration (1). Field operation_name is required and sets a span name. Only ‘ingress’ and ‘egress’ values are supported.

"listeners": [{
	"filters": [{
        "name": "http_connection_manager",
        "config": {
			"tracing": { // (1)
				"operation_name": "ingress" // (2)
			}
			// ...
		}
	}]
}]

Zipkin server can be started using its Docker image.

docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin

Summary

Here’s a list of running Docker containers for the test purpose. As you probably remember we have Zipkin, Envoy, custom discovery, two instances of person-service and one of product-service. You can add some person objects by calling POST /person and that display a list of all persons by calling GET /person. The requests should be load balanced between two instances basing on entries in the service discovery.

envoy-1

Information about every request is sent to Zipkin with a service name taken –service-cluster Envoy proxy running parameter.

envoy-2

Building Secure APIs with Vert.x and OAuth2

Preface

Today I would like to get back to the subject touched on in the one of my previous articles – Vert.x toolkit. In the post Asynchronous Microservices With Vert.x I described how to develop microservices using Vert.x modules for web application, service discovery, circuit breaker and distributed configuration. I did not mentioned there anything about security aspects, which are usually important when talking about open APIs. It is a time to take a closer look on some Vert.x modules for authentication and authorization. Following description available on vert.io site it provides some simple out of the box implementations for authentication in our applications. There are modules providing auth implementation backed by JDBC, MongoDB and also some supporting solutions like JSON web tokens (JWT), Apache Shiro and OAuth2. Like you probably know OAuth2 is the most common authentication method for APIs provided by Facebook, Twitter or LinkedIn. If you are interested in more details about that authentication method read my article Microservices security with Oauth2, where I described the basics and introduced the simple sample with Spring Security in conjunction with OAuth2 usage.

In the sample application which is available on GitHub under security branch I’m going to present how to provide Oauth2 security for Vertx application using Keycloak and Vert.x OAuth2 module.

Keycloak

For authentication and authorization management we use Keycloak. It is an open source identity and access management solution, which provides mechanisms supporting i.a. OAuth2. Keycloak has web admin console where administrators can manage all aspects of the server. We can easily run it using docker container.

docker run -d --name keycloak -p 38080:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin -e KEYCLOAK_LOGLEVEL=DEBUG jboss/keycloak

Management dashboard is available under http://192.168.99.100:38080/. Let’s begin from creating Client. Client will be used by our application (or rather service) for authenticate itself against Keycloak. In the first step we have to set Client ID and Root URL. Root URL is not needed while using OAuth2 Password Credentials Flow, but rather for Authorization Code Flow. I put there our sample application localhost address.

vertx-sec-1

We should enable options Direct Access Grants and Authorization in the Settings section of newly created client. Also Access Type should be set to confidential and Valid Redirect URIs to the callback address routed inside application (it is explained in the later section).

vertx-sec-2

The last information needed from Client section is a Secret available under Credentials tab.

vertx-sec-3

Now we can proceed to create user with credentials. In the sample I’ll present in the next section we use password credentials flow, so don’t forget to change password on newly created user.

vertx-sec-5

vertx-sec-7

Finally, we set authorities for our user. First, let’s create some roles in Roles section. For me it is view-account, modify-account. For these roles I also enabled Scope Param Required. It means that if client need to obtain that authority it has to send role name in the request scope.

vertx-sec-4

The last step is to assign the roles to our test user piotr.minkowski.

vertx-sec-6

Building application

Vert.x provides the module supporting OAuth2 authorization. We should include the following dependency into our pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-auth-oauth2</artifactId>
	<version>${vertx.version}</version>
</dependency>

We have to begin from defining Keycloak OAuth2Auth provider. We use default realm (1). In additional to the realm name we should set realm public key (2) which is available in the Realm Settings section under Keys tab. We should also set Keycloak Client ID (3) as resource and client secret as credentials (4).

JsonObject keycloakJson = new JsonObject()
	.put("realm", "master") // (1)
	.put("realm-public-key", "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1xVBifXfS1uVM8S14JlyLpXck+0+hBQX258IiL5Fm2rZpkQ5lN9N1tadQdXBKk8V/0SxdTyoX7cpYQkcOs0Rj0XXmX7Lnk56euZwel+3MKAZWA20ld8BCfmDtX4/+VP311USUqR/W8Fd2p/gugKWF6VDMkri92qob1DdrcUiRlD8XYC0pwHwSvyW/3JvE5HeTy3U4vxC+19wHcwzLGNlVOlYPk9mzJHXN+LhZr/Tc7HeAsvVxYDXwOOh+/UWweMkvKy+OSNKG3aWLb92Ni3HejFn9kd4TRHfaapwWg1m5Duf3uqz8WDHbS/LeS4g3gQS0SvcCYI0huSoG3NA/z4K7wIDAQAB") // (2)
	.put("auth-server-url", "http://192.168.99.100:38080/auth")
	.put("ssl-required", "external")
	.put("resource", "vertx-account") // (3)
	.put("credentials", new JsonObject().put("secret", "73b55e04-e562-41ea-b39c-263b7b36945d")); // (4)

OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);

vertx-sec-8

I exposed API method for login which retrieves token from Keycloak using OAuth2FlowType.PASSWORD authentication method.

router.post("/login").produces("application/json").handler(rc -> {
	User u = Json.decodeValue(rc.getBodyAsString(), User.class);
	oauth2.getToken(u.toJson(), res -> {
		if (res.failed()) {
			LOGGER.error("Access token error: {}", res.cause().getMessage());
			rc.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
		} else {
			AccessToken token = res.result();
			LOGGER.info("Access Token: {}", KeycloakHelper.rawAccessToken(token.principal()));
			User user = new User(KeycloakHelper.rawAccessToken(token.principal()));
			rc.response().end(user.toString());
		}
	});
});

I sent the following message to POST /login service.

{"username":"piotr.minkowski", "password":"Piot_123", "scope":"modify-account view-account"}

That is an equivalent to the following Vert.x JsonObject passed as a parameter to OAuth2 getToken method.

new JsonObject().put("username", "piotr.minkowski").put("password", "Piot_123").put("scope", "modify-account view-account")

POST /login method return access token inside JSON object. That token should be passed as Authorization header parameter for every call of a protected resource. Here’s main class with API methods definitions. We begin from creating OAuth2AuthHandler object (1) which is responsible for token validation. It takes OAuth2Auth Keycloak object as a parameter. Then we should set OAuth2AuthHandler as a handler for all methods matching /account/* path (2). If token has been successfully validated we can proceed to authorization. We check if view-account role is assigned to user when calling GET method (3), and modify-account role when calling POST method (4). If using Keycloak for authorization we always have to set prefix to “realm” while invoking isAuthorised method. If the role is realm then the lookup happens in global roles list.

OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);
OAuth2AuthHandler oauth2Handler = (OAuth2AuthHandler) OAuth2AuthHandler.create(oauth2, "http://localhost:2222"); // (1)
Router router = Router.router(vertx);
router.route("/account/*").handler(ResponseContentTypeHandler.create());
router.route("/account/*").handler(oauth2Handler); // (2)
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create());
router.route(HttpMethod.POST, "/login").handler(BodyHandler.create());
oauth2Handler.setupCallback(router.get("/callback"));
router.get("/account/:id").produces("application/json").handler(rc -> {
	rc.user().isAuthorised("realm:view-account", authRes -> { // (3)
		LOGGER.info("Auth: {}", authRes.result());
		if (authRes.result() == Boolean.TRUE) {
			repository.findById(rc.request().getParam("id"), res -> {
				Account account = res.result();
				LOGGER.info("Found: {}", account);
				rc.response().end(account.toString());
			});
		} else {
			rc.response().setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()).end();
		}
	});
});
router.post("/account").produces("application/json").handler(rc -> {
	rc.user().isAuthorised("realm:modify-account", authRes -> { // (4)
		LOGGER.info("Auth: {}", authRes.result());
		if (authRes.result() == Boolean.TRUE) {
			Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
			repository.save(a, res -> {
				Account account = res.result();
				LOGGER.info("Created: {}", account);
				rc.response().end(account.toString());
			});
		} else {
			rc.response().setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()).end();
		}
	});
});

Testing

I created JUnit test case to check if OAuth2 authentication works fine. Vert.x provides library which can be used for testing. It is especially design to work well with asynchronous code. Include the following dependency to your pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-unit</artifactId>
	<version>${vertx.version}</version>
	<scope>test</scope>
</dependency>

Then annotate your JUnit test class with @RunWith(VertxUnitRunner.class). Before running our test method we should deploy verticles. Verticle with REST API is deployed on port 2222.

Vertx vertx;

@Before
public void before(TestContext context) throws IOException {
	vertx = Vertx.vertx();
	vertx.deployVerticle(MongoVerticle.class.getName(), context.asyncAssertSuccess());
	DeploymentOptions options = new DeploymentOptions().setConfig(new JsonObject().put("http.port", 2222));
	vertx.deployVerticle(AccountServer.class.getName(), options, context.asyncAssertSuccess());
}

Here’s JUnit test method. We use WebClient for calling HTTP methods and Vert.x-Unit Async for complete test case on asynchronous calls finishes (3). First, we are calling POST \login method te retrieve access token from Keycloak (1). Then we are calling one of API method and setting Authorization header with access token string retrieved from POST \login method (2). During test case execution verticle with MongoDB (MongoVerticle) and API definition (AccountServer) are deployed and started, but you need to start manually MongoDB database, Consul and Keycloak. I suggest running it with Docker.

@Test
public void testAuth(TestContext context) {
	Async async = context.async();
	WebClient client = WebClient.create(vertx);
	User u = new User("piotr.minkowski", "Piot_123", "modify-account view-account");
	client.post(2222, "localhost", "/login").sendJson(u, ar -> { // (1)
		LOGGER.info("Response code: {}", ar.result().statusCode());
		LOGGER.info("Response: {}", ar.result().bodyAsString());
		if (ar.result().statusCode() == 200) {
			User user = ar.result().bodyAsJson(User.class);
			client.get(2222, "localhost", "/account").putHeader("Authorization", "Bearer " + user.getAccessToken()).send(r -> { // (2)
				LOGGER.info("GET result: {}", r.result().bodyAsString());
				async.complete(); // (3)
			});
		} else {
			async.complete();
		}
	});
}

Final Thoughts

To be honest I have never dealt with Vert.x before the start of work on a series of articles published on my blog. From those couple of days spending on that toolkit recognition I’ll definitely recommend using it when working on REST APIs. Vert.x provides the smart implementation for security with OAuth2. Additionally you can use it in combination with the solution like Keycloak, which is used for identity and access management. As usual there are also some drawbacks. I had a problem with understanding how the authorities exactly work in Vert.x. When I created a role inside Keycloak client it didn’t work in my application. Only global realm role worked fine. However, those problems does not overshadow Vert.x advantages.

Spring Cloud Apps Memory Management

Today’s topic is about memory management in Java as well as about microservices architecture. The inspiration to write this post was the situation when our available memory on test environment for the Spring Cloud based applications was exhausted. Without going into the details what was the cause of such a situation, the problem related with memory consumption by monolith based architecture in comparison with microservices is obvious. For example, supposing we have quite a large monolithic application, often 1GB or 2 GB of RAM will be enough for it, especially if we are talking about not a production environment. If we divide this application into 20 or 30 independent microservices it is hard to expect that the RAM will still remain around 1GB or 2GB. Especially if we use Spring Cloud 🙂

When running sample microservices I will use an earlier example prepared for the purpose of the one of previous articles, which is available on GitHub. I’m going to launch three microservices. First, for service discovery which uses Netflix Eureka server and two simple microservices which provide REST API, communicate with each other and register themselves in discovery server. I will not limit in any way the memory usage by those applications.

Like you see in the figure below those three running microservices have occupied about 1.5GB RAM memory on my computer. This is not the best message considering that we are dealing with very simple applications which do not even have a data persistence layer. The lowest RAM usage is for discovery service and the biggest for customer service which initializes declarative feign client for invoking account service API. Before making that screen I send some test requests to every microservice and run Eureka web console.

micro-ram-1

A lot about memory usage is shown on the charts visible below made using JProfiler. As we see most of such a memory usage is affected by heap, in comparison to non-heap it does take up much space.

micro-ram-2

micro-ram-3

Of course, the first obvious question is whether we need as much space on the heap to run our microservice application. The answer is no, we do not. Now, let’s take a brief look at how the memory management process takes place in Java 8.

We can devide JVM memory into two different parts: Heap and Non-Heap. I have already mentioned a little about Heap. As you could see on the graphs above the heap commited size for our microservices was really big (~600MB). In turn, JVM Memory consists of Young Generation and Old Generation. All the newly created objects are located in the Young Generation. When young generation is filled, garbage collection (Minor GC) is performed. To be more precise, those objects are located in the part of Young Generation which is called Eden Space. Minor GC moves all still used objects from Eden Space into Survivor 0. The same process is performed for Survivor 0 and Survivor 1 spaces. All objects that survived many cycles of GC, are moved to the Old Generation memory space. For removing objects from there is Major GC process is responsible. So, these are the most important information about Java Heap. In order to better understand the figure below. The memory limits for Java Heap can be set with the following parameters during running java -jar command:

  • -Xms – initial heap size when JVM starts
  • -Xmx – maximum heap size
  • -Xmn – size of the Young Generation, rest of the space goes for Old Generation

jvm memory

The second part of the JVM Memory, looking at the graphs above slightly less important from our point of view, is Non-Heap. Non-Heap consists of the following parts:

  • Thread Stacks – space for all running threads. The maximum thread size can be set using -Xss parameter.
  • Metaspace – it replaced PermGem, which was in Java 7 the part of JVM Heap. In Metaspace there are located all classes and methods load by application. Looking at the number of libraries included for Spring Cloud we won’t save much memory here. Metaspace size can be managed by setting -XX:MetaspaceSize and -XX:MaxMetaspaceSize parameters.
  • Code Cache – this is the space for native code (like JNI) or Java methods that are compiled into native code by JIT (just-in-time) compiler. The maximum size is determined by setting -XX:ReservedCodeCacheSize parameter.
  • Compressed Class Space – the maximum memory reserved for compressed class space is set with -XX:CompressedClassSpaceSize
  • Direct NIO Buffers

To put it more simply, Heap is for objects and Non-Heap is for classes. As you can imagine we can end up with the situation when non-heap is larger than heap for our application. First, let’s run our service discovery with the parameters below. In my opinion these are the lowest values if you are starting Eureka with embedded Tomcat on Spring Boot.

-Xms16m -Xmx32m -XX:MaxMetaspaceSize=48m -XX:CompressedClassSpaceSize=8m -Xss256k -Xmn8m -XX:InitialCodeCacheSize=4m -XX:ReservedCodeCacheSize=8m -XX:MaxDirectMemorySize=16m

If we are running microservice with REST API and Eureka, Feign and Ribbon clients we need to increase values a little.

-Xms16m -Xmx48m -XX:MaxMetaspaceSize=64m -XX:CompressedClassSpaceSize=8m -Xss256k -Xmn8m -XX:InitialCodeCacheSize=4m -XX:ReservedCodeCacheSize=8m -XX:MaxDirectMemorySize=16m

Here are charts from JProfiler for the settings above and Customer service. The difference is in starting and requests processing time. The application is working slower in comparison with earlier settings (or rather lack of them :)). Well, I wouldn’t set such a parameters in production mode. Treat them rather as a minimum requirements for service discovery and microservice apps.

micro-ram-5

micro-ram-6

The current total memory usage is as follows. It is still the biggest for Customer service and the lowest for Discovery.

micro-ram-4

I have also tried to run Discovery application using different web containers. You can easily change web container by including in your pom.xml file the dependencies visible below.

For Jetty.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>

For Undertow.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>

The best result was for Undertow (116MB), second place for Tomcat (122MB) and third for Jetty (128MB). This tests were performed only for Eureka server without registering there any microservices.

Visualizing Jenkins Pipeline Results in Grafana

This time I describe a slightly lighter topic in comparison to the some previous posts. Personally, I think Grafana is a very cool tool for visualizing any timeline data. As it turns out it is quite easy to store and visualize Jenkins build results with InfluxDB plugin.

1. Starting docker containers

Let’s begin from starting needed docker containers with Grafana, InfluxDB and Jenkins.

docker run -d --name grafana -p 3000:3000 grafana/grafana
docker run -d --name influxdb -p 8086:8086 influxdb
docker run -d --name jenkins -p 38080:8080 -p 50000:50000 jenkins

Then you can run client container which links to InfluxDB container. Using this container you can create new database with command CREATE DATABASE grafana.

docker run --rm --link=influxdb -it influxdb influx -host influxdb

2. Configuring Jenkins

After starting Jenkins you need to download some plugins. For this sample it should be the following plugins:

If you are interested in more details about Jenkins configuration and Continuous Delivery take a look on my previous article about that topic How to setup Continuous Delivery environment.

In Manage Jenkins -> Configure System section add new InfluxDB target.

grafana-2

3. Building pipeline in Jenkins

With Jenkins Pipeline Plugin we are building pipelines using Groovy syntax. In the first step (1) we checkout project from GitHub, and then build it with Maven (2). Then we publish JUnit and JaCoCo reports (3) and finally send the whole report to InfluxDB (4).

node {
	def mvnHome
	try {
		stage('Checkout') { //(1)
			git 'https://github.com/piomin/sample-code-for-ci.git'
			mvnHome = tool 'maven3'
		}
		stage('Build') { //(2)
			dir('service-1') {
				sh "'${mvnHome}/bin/mvn' -Dmaven.test.failure.ignore clean package"
			}
		}
		stage('Tests') { //(3)
			junit '**/target/surefire-reports/TEST-*.xml'
			archive 'target/*.jar'
			step([$class: 'JacocoPublisher', execPattern: '**/target/jacoco.exec'])
		}
		stage('Report') { //(4)
			if (currentBuild.currentResult == 'UNSTABLE') {
				currentBuild.result = "UNSTABLE"
			} else {
				currentBuild.result = "SUCCESS"
			}
			step([$class: 'InfluxDbPublisher', customData: null, customDataMap: null, customPrefix: null, target: 'grafana'])
		}
	} catch (Exception e) {
		currentBuild.result = "FAILURE"
		step([$class: 'InfluxDbPublisher', customData: null, customDataMap: null, customPrefix: null, target: 'grafana'])
	}
}

I defined three pipelines for one per every module from the sample.

grafana-5

4. Building services

Add jacoco-maven-plugin Maven plugin to your pom.xml to enable code coverage reporting.

<plugin>
	<groupId>org.jacoco</groupId>
	<artifactId>jacoco-maven-plugin</artifactId>
	<version>0.7.9</version>
	<executions>
		<execution>
			<id>default-prepare-agent</id>
			<goals>
				<goal>prepare-agent</goal>
			</goals>
		</execution>
		<execution>
			<id>default-report</id>
			<phase>prepare-package</phase>
			<goals>
				<goal>report</goal>
			</goals>
		</execution>
	</executions>
</plugin>

Sample application source code is available on GitHub. It consists of three simple modules, which does not do anything important, but only has JUnit tests needed for build results visualization.

5. Configuring Grafana

First, configure Grafana data source as your InfluxDB Docker container instance.

grafana-1

With InfluxDB Plugin we can report metrics generated by JUnit, Cobertura, JaCoCo, Robot Framework and Performance Plugin. In the sample application I’ll show you the reports from JUnit and JaCoCo. Let’s configure our graphs in Grafana. As you can see on the picture below I defined the graph with pipeline Build Time data. The result are grouped by project name.

grafana-4

Here are two graphs. The first illustrating every pipeline build time data in milliseconds, and second percentage test code coverage. For test coverage we need to select from jacoco_data table instead of jenkins_data and then choose field jacoco_method_coverage_rate.

grafana-3

For more details about visualizing metrics with Grafana and InfluxDB you can refer to my previous article Custom metrics visualization with Grafana and InfluxDB.

Asynchronous Microservices with Vert.x

Preface

I must admit that as soon as I saw Vert.x documentation I liked this concept. This may have happened because I had previously use with very similar framework which I used to create simple and lightweight applications exposing REST APIs – Node.js. It is really fine framework, but has one big disadvantage for me – it is JavaScript runtime. What is worth mentioning Vert.x is polyglot, it supports all the most popular JVM based languages like Java, Scala, Groovy, Kotlin and even JavaScript. These are not all of its advantages. It’s lightweight, fast and modular. I was pleasantly surprised when I added the main Vert.x dependencies to my pom.xml and there was not downloaded many of other dependencies, as is often the case when using Spring Boot framework.
Well, I will not elaborate about the advantages and key concepts of this toolkit. I think you can read more about it in other articles. The most important thing for us is that using Vert.x we can can create high performance and asynchronous microservices based on Netty framework. In addition, we can use standardized microservices mechanisms such as service discovery, configuration server or circuit breaking.

Sample application source code is available on Github. It consists of two modules account-vertx-service and customer-vertx-service. Customer service retrieves data from Consul registry and invokes acccount service API. Architecture of the sample solution is visible on the figure below.

vertx

Building services

To be able to create HTTP service exposing REST API we need to include the following dependency into pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-web</artifactId>
	<version>${vertx.version}</version>
</dependency>

Here’s the fragment from account service where I defined all API methods. The first step (1) was to declare Router which is one of the core concepts of Vert.x-Web. A router takes an HTTP request, finds the first matching route for that request, and passes the request to that route. The next step (2), (3) is to add some handlers, for example BodyHandler, which allows you to retrieve request bodies and has been added to POST method. Then we can begin to define API methods (4), (5), (6), (7), (8). And finally (9) we are starting HTTP server on the port retrieved from configuration.

Router router = Router.router(vertx); // (1)
router.route("/account/*").handler(ResponseContentTypeHandler.create()); // (2)
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create()); // (3)
router.get("/account/:id").produces("application/json").handler(rc -> { // (4)
	repository.findById(rc.request().getParam("id"), res -> {
		Account account = res.result();
		LOGGER.info("Found: {}", account);
		rc.response().end(account.toString());
	});
});
router.get("/account/customer/:customer").produces("application/json").handler(rc -> { // (5)
	repository.findByCustomer(rc.request().getParam("customer"), res -> {
		List<Account> accounts = res.result();
		LOGGER.info("Found: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.get("/account").produces("application/json").handler(rc -> { // (6)
	repository.findAll(res -> {
		List<Account> accounts = res.result();
		LOGGER.info("Found all: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.post("/account").produces("application/json").handler(rc -> { // (7)
	Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
	repository.save(a, res -> {
		Account account = res.result();
		LOGGER.info("Created: {}", account);
		rc.response().end(account.toString());
	});
});
router.delete("/account/:id").handler(rc -> { // (8)
	repository.remove(rc.request().getParam("id"), res -> {
		LOGGER.info("Removed: {}", rc.request().getParam("id"));
		rc.response().setStatusCode(200);
	});
});
...
vertx.createHttpServer().requestHandler(router::accept).listen(conf.result().getInteger("port")); // (9)

All API methods uses repository object to communicate with datasource. In this case I decided to use Mongo. Vert.x has a module for interacting with that database, we need to include as new dependency.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-mongo-client</artifactId>
	<version>${vertx.version}</version>
</dependency>

Mongo client, same as all other Vert.x modules, works asynchronously. That’s why we need to use AsyncResult Handler to pass result from repository object. To be able to pass custom object as AsyncResult we have to annotate it with @DataObject and add toJson method.

public AccountRepositoryImpl(final MongoClient client) {
	this.client = client;
}

@Override
public AccountRepository save(Account account, Handler<AsyncResult<Account>> resultHandler) {
	JsonObject json = JsonObject.mapFrom(account);
	client.save(Account.DB_TABLE, json, res -> {
		if (res.succeeded()) {
			LOGGER.info("Account created: {}", res.result());
			account.setId(res.result());
			resultHandler.handle(Future.succeededFuture(account));
		} else {
			LOGGER.error("Account not created", res.cause());
			resultHandler.handle(Future.failedFuture(res.cause()));
		}
	});
	return this;
}

@Override
public AccountRepository findAll(Handler<AsyncResult<List<Account>>> resultHandler) {
	client.find(Account.DB_TABLE, new JsonObject(), res -> {
		if (res.succeeded()) {
			List<Account> accounts = res.result().stream().map(it -> new Account(it.getString("_id"), it.getString("number"), it.getInteger("balance"), it.getString("customerId"))).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		} else {
			LOGGER.error("Account not found", res.cause());
			resultHandler.handle(Future.failedFuture(res.cause()));
		}
	});
	return this;
}

Here’s Account model class.

@DataObject
public class Account {

	public static final String DB_TABLE = "account";

	private String id;
	private String number;
	private int balance;
	private String customerId;

	public Account() {

	}

	public Account(String id, String number, int balance, String customerId) {
		this.id = id;
		this.number = number;
		this.balance = balance;
		this.customerId = customerId;
	}

	public Account(JsonObject json) {
		this.id = json.getString("id");
		this.number = json.getString("number");
		this.balance = json.getInteger("balance");
		this.customerId = json.getString("customerId");
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getNumber() {
		return number;
	}

	public void setNumber(String number) {
		this.number = number;
	}

	public int getBalance() {
		return balance;
	}

	public void setBalance(int balance) {
		this.balance = balance;
	}

	public String getCustomerId() {
		return customerId;
	}

	public void setCustomerId(String customerId) {
		this.customerId = customerId;
	}

	public JsonObject toJson() {
		return JsonObject.mapFrom(this);
	}

	@Override
	public String toString() {
		return Json.encodePrettily(this);
	}

}

Verticles

It is worth to mention a few words about running an application written in Vert.x. It is based on verticles. Verticles are chunks of code that get deployed and run by Vert.x. A Vert.x instance maintains N event loop threads by default. When creating a verticle we have to extend abstract class AbstractVerticle.

public class AccountServer extends AbstractVerticle {

	@Override
	public void start() throws Exception {
		...
	}
}

I created two verticles per microservice. First for HTTP server and second for communication with Mongo. Here’s main application method where I’m deploying verticles.

public static void main(String[] args) throws Exception {
	Vertx vertx = Vertx.vertx();
	vertx.deployVerticle(new MongoVerticle());
	vertx.deployVerticle(new AccountServer());
}

Well, now we should obtain the reference inside AccountServer verticle to the service running on MongoVerticle. To achieve it we have to generate proxy classes using vertx-codegen module.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-proxy</artifactId>
	<version>${vertx.version}</version>
</dependency>
<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-codegen</artifactId>
	<version>${vertx.version}</version>
	<scope>provided</scope>
</dependency>

First, annotate repository interface with @ProxyGen ad all public methods with @Fluent.

@ProxyGen
public interface AccountRepository {

	@Fluent
	AccountRepository save(Account account, Handler<AsyncResult<Account>> resultHandler);

	@Fluent
	AccountRepository findAll(Handler<AsyncResult<List<Account>>> resultHandler);

	@Fluent
	AccountRepository findById(String id, Handler<AsyncResult<Account>> resultHandler);

	@Fluent
	AccountRepository findByCustomer(String customerId, Handler<AsyncResult<List<Account>>> resultHandler);

	@Fluent
	AccountRepository remove(String id, Handler<AsyncResult<Void>> resultHandler);

	static AccountRepository createProxy(Vertx vertx, String address) {
		return new AccountRepositoryVertxEBProxy(vertx, address);
	}

	static AccountRepository create(MongoClient client) {
		return new AccountRepositoryImpl(client);
	}

}

Generator needs additional configuration inside pom.xml file. After running command mvn clean install on the parent project all generated classes should be available under src/main/generated directory for every microservice module.

<plugin>
	<groupId>org.apache.maven.plugins</groupId>
	<artifactId>maven-compiler-plugin</artifactId>
	<version>3.6.2</version>
	<configuration>
		<encoding>${project.build.sourceEncoding}</encoding>
		<source>${java.version}</source>
		<target>${java.version}</target>
		<useIncrementalCompilation>false</useIncrementalCompilation>
		<annotationProcessors>		<annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
		</annotationProcessors>
		<generatedSourcesDirectory>${project.basedir}/src/main/generated</generatedSourcesDirectory>
		<compilerArgs>
			<arg>-AoutputDirectory=${project.basedir}/src/main</arg>
		</compilerArgs>
	</configuration>
</plugin>

Now we are able to obtain AccountRepository reference by calling createProxy with account-service name.

AccountRepository repository = AccountRepository.createProxy(vertx, "account-service");

Service Discovery

To use the Vert.x service discovery, we have to add the following dependencies into pom.xml. In the first of them there are mechanisms for built-in Vert.x discovery, which is rather not usable if we would like to invoke microservices running on different hosts. Fortunately, there are also available some additional bridges, for example Consul bridge.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-discovery</artifactId>
	<version>${vertx.version}</version>
</dependency>
<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-discovery-bridge-consul</artifactId>
	<version>${vertx.version}</version>
</dependency>

Great, we only have to declare service discovery and register service importer. Now, we can retrieve configuration from Consul, but I assume we also would like to register our service. Unfortunately, problems start here… Like the toolkit authors say It (Vert.x) does not export to Consul and does not support service modification. Maybe somebody will explain me why this library can not also export data to Consul – I just do not understand it. I had the same problem with Apache Camel some months ago and I will use the same solution I developed that time. Fortunately, Consul has simple API for service registration and deregistration. To use it in our appplication we need to include Vert.x HTTP client to our dependencies.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-web-client</artifactId>
	<version>${vertx.version}</version>
</dependency>

Then using declared WebClient while starting application we can register service by invoking Consul PUT method.

WebClient client = WebClient.create(vertx);
...
JsonObject json = new JsonObject().put("ID", "account-service-1").put("Name", "account-service").put("Address", "127.0.0.1").put("Port", 2222).put("Tags", new 		JsonArray().add("http-endpoint"));
client.put(discoveryConfig.getInteger("port"), discoveryConfig.getString("host"), "/v1/agent/service/register").sendJsonObject(json, res -> {
	LOGGER.info("Consul registration status: {}", res.result().statusCode());
});

Once the account-service have registered itself on discovery server we can invoke it from another microservice – in this case from customer-service. We only have to create ServiceDiscovery object and register Consul service importer.

ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
...
discovery.registerServiceImporter(new ConsulServiceImporter(), new JsonObject().put("host", discoveryConfig.getString("host")).put("port", discoveryConfig.getInteger("port")).put("scan-period", 2000));

Here’s AccountClient fragment, which is responsile for invoking GET /account/customer/{customerId} from account-service. It obtains service reference from discovery object and cast it to WebClient instance. I don’t know if you have noticed that apart from the standard fields such as ID, Name or Port, I also set the Tags field to the value of the type of service that we register. In this case it will be an http-endpoint. Whenever Vert.x reads data from Consul, it will be able to automatically assign a service reference to WebClient object.

public AccountClient findCustomerAccounts(String customerId, Handler<AsyncResult<List<Account>>> resultHandler) {
	discovery.getRecord(r -> r.getName().equals("account-service"), res -> {
		LOGGER.info("Result: {}", res.result().getType());
		ServiceReference ref = discovery.getReference(res.result());
		WebClient client = ref.getAs(WebClient.class);
		client.get("/account/customer/" + customerId).send(res2 -> {
			LOGGER.info("Response: {}", res2.result().bodyAsString());
			List<Account> accounts = res2.result().bodyAsJsonArray().stream().map(it -> Json.decodeValue(it.toString(), Account.class)).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		});
	});
	return this;
}

Config

For configuration management within the application Vert.x Config module is responsible.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-config</artifactId>
	<version>${vertx.version}</version>
</dependency>

There are many configuration stores, which can be used as configuration data location:

  • File
  • Environment Variables
  • HTTP
  • Event Bus
  • Git
  • Redis
  • Consul
  • Kubernetes
  • Spring Cloud Config Server

I selected the simplest one – file. But it can be easily changed only by defining another type on ConfigStoreOptions object. For loading configuration data from the store ConfigRetriever is responsible. It reads configuration as JsonObject.

ConfigStoreOptions file = new ConfigStoreOptions().setType("file").setConfig(new JsonObject().put("path", "application.json"));
ConfigRetriever retriever = ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(file));
retriever.getConfig(conf -> {
	JsonObject discoveryConfig = conf.result().getJsonObject("discovery");
	vertx.createHttpServer().requestHandler(router::accept).listen(conf.result().getInteger("port"));
	JsonObject json = new JsonObject().put("ID", "account-service-1").put("Name", "account-service").put("Address", "127.0.0.1").put("Port", 2222).put("Tags", new JsonArray().add("http-endpoint"));
	client.put(discoveryConfig.getInteger("port"), discoveryConfig.getString("host"), "/v1/agent/service/register").sendJsonObject(json, res -> {
		LOGGER.info("Consul registration status: {}", res.result().statusCode());
	});
});

Configuration file application.json is available under src/main/resources and it contains application port, service discovery and datasource adresses.

{
	"port" : 2222,
	"discovery" : {
		"host" : "192.168.99.100",
		"port" : 8500
	},
	"datasource" : {
		"host" : "192.168.99.100",
		"port" : 27017,
		"db_name" : "test"
	}
}

Final thoughts

Vert.x authors wouldn’t like to define their solution as a framework, but as a tool-kit. They don’t tell you what is a correct way to write an application, but only give you a lot of useful bricks helping to create your app. With Vert.x you can create fast and lightweight APIs basing on non-blocking, asynchronous I/O. It gives you a lot of possibilities, as you can see on the Config module example, where you can even use Spring Cloud Config Server as a configuration store. But it is also not free from drawbacks, as I showed on the service registration with Consul example. Vert.x also allows to create reactive microservices with RxJava, what seems to be interesting option, I hope to describe in the future.

Hazelcast Hot Cache with Striim

I previously introduced some articles about Hazelcast – an open source in memory data grid solution. In the first of them JPA caching with Hazelcast, Hibernate and Spring Boot I described how to set up 2nd level JPA cache with Hazelcast and MySQL. In the second In memory data grid with Hazelcast I showed more advanced sample how to use Hazelcast distributed queries to enable faster data access for Spring Boot application. Using Hazelcast as a cache level between your application and relational database is generally a very good solution under one condition – all changes are going across your application. If a data source is modified by other application which does not use your caching solution it causes problem with outdated data for your application. Did you have encountered this problem in your organization? In my organization we still use relational databases in almost all our systems. Sometimes it causes performance problems, even optimized queries are too slow for real time applications. Relational database is still required, so solutions like Hazelcast can help us.

Let’s return to the topic of outdated cache. That’s why we need Striim, a real-time data integration and streaming analytics software platform. The architecture of presented solution is visible on the figure below. We have two applications. The first one employee-service uses Hazelcast as a cache, the second one employee-app performs changes directly to the database. Without such a solution like Striim data changed by employee-app is not visible for employee-service. Striim enables real-time data integration without modifying or slowing down data source. It uses CDC (Change Data Capture) mechanisms for detecting changes performed on data source, by analizing binary logs. It has a support for the most popular transactional databases like Oracle, Microsoft SQL Server and MySQL. Striim has many interesting features, but also one serious drawback – it is not an open source. An alternative for the presented solution, especially when using Oracle database, can be Oracle In-Memory Data Grid with Golden Gate Hot Cache.

striim-figure-1

I prepared sample application for that article purpose, which is as usual available on GitHub under striim branch. The application employee-service is based on Spring Boot and has embedded Hazelcast client which connects to the cluster and Hazelcast Management Center. If data is not available in the cache the application connects to MySQL database.

1. Starting MySQL and enabling binary log

Let’s start MySQL database using docker.

docker run -d --name mysql -e MYSQL_DATABASE=hz -e MYSQL_USER=hz -e MYSQL_PASSWORD=hz123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

Binary log is disabled by default. We have to enable it by including the following lines into mysqld.cnf. The configuration file is available on docker container under /etc/mysql/mysql.conf.d/mysqld.cnf.

log_bin			 = /var/log/mysql/binary.log
expire-logs-days = 14
max-binlog-size  = 500M
server-id        = 1

If you are running MySQL on Docker you should restart your container using docker restart mysql.

2. Starting Hazelcast Dashboard and Striim

Same as for MySQL, I also used Docker.

docker run -d --name striim -p 39080:9080 striim/evalversion
docker run -d --name hazelcast-mgmt -p 38080:8080 hazelcast/management-center:3.7.7

I selected 3.7.7 version of Hazelcast Management Center, because this version is included by default into the Spring Boot release I used in the sample application. Now, you should be able to login into Hazelcast Dashboard available under http://192.168.99.100:38080/mancenter/ and to the Striim Dashboard which is available under http://192.168.99.100:39080/ (admin/admin).

3. Starting sample application

Build sample application with mvn clean install and start using java -jar employee-service-1.0-SNAPSHOT.jar. You can test it by calling one of endpoint:
/employees/person/{id}
/employees/company/{company}
/employees/{id}

Before testing create table employee in MySQL and insert some test data (you can run my test class pl.piomin.services.datagrid.employee.data.AddEmployeeRepositoryTest).

4. Configure entity mapping in Striim

Before creating our first application in Striim we have to provide mapping configuration. The first step is to copy your entity ORM mapping file into docker container filesystem. You can perform it using Striim dashboard or with docker cp command. Here’s my orm.xml file – it is used by Striim HazelcastWriter while putting data into cache.

<entity-mappings xmlns="http://www.eclipse.org/eclipselink/xsds/persistence/orm" 	version="2.4">
	<entity name="employee" class="pl.piomin.services.datagrid.employee.model.Employee">
<table name="hz.employee" />
		<attributes>
			<id name="id" attribute-type="Integer">
				<column nullable="false" name="id" />
				<generated-value strategy="AUTO" />
			</id>
			<basic name="personId" attribute-type="Integer">
				<column nullable="false" name="person_id" />
			</basic>
			<basic name="company" attribute-type="String">
				<column name="company" />
			</basic>
		</attributes>
	</entity>
</entity-mappings>

We also have to provide jar with entity class. It should be placed under /opt/Striim/lib directory on Striim docker container. What is important, the fields are public – do not make them private with setters, because it won’t work for HazelcastWriter. After all changes restart your container and proceed to the next steps. For the sample application just build employee-model module and upload to Striim.

public class Employee implements Serializable {

	private static final long serialVersionUID = 3214253910554454648L;
	public Integer id;
	public Integer personId;
	public String company;

	public Employee() {

	}

	@Override
	public String toString() {
		return "Employee [id=" + id + ", personId=" + personId + ", company=" + company + "]";
	}

}

5. Configuring MySQL CDC connection on Striim

If all the previous steps are completed we can finally begin to create our application in Striim. When creating a new app select Start with Template, and then MySQL CDC to Hazelcast. Put your MySQL connection data, security credentials and proceed. In addition to connection validation Striim also checks if binary log is enabled.

Then select tables for synchronization with cache.

striim-3

6. Configuring Hazelcast on Striim

After starting employee-service application you should see the following fragment in the file logs.

Members [1] {
	Member [192.168.8.205]:5701 - d568a38a-7531-453a-b7f8-db2be4715132 this
}

This address should be provided as a Hazelcast Cluster URL. We should also put ORM mapping file location and cluster credentials (by default these are dev/dev-pass).

striim-5

In the next screen you will see ORM mapping visualization and input selection. Your input is MySQL server you defined in the fifth step.

striim-7

7. Deploy application on Striim

After finishing previous steps you see the flow diagram. I suggest you create log file where all input events will be stored as a JSON. My diagram is visible in the figure below. If your configuration is finished deploy and start application.  At this stage I had some problems. For example, if I deploy application after Striim restart I always have to change something and save, otherwise exception during deploy occurs. However, after a long struggle with Striim, I finally succeeded in running the application! So we can start testing.

striim-8

8. Checking out

I created JUnit test to illustrate cache refresh performed by Striim. Inside this test I invoke employees/company/{company} REST API method and collect entities. Then I modified entities with EmployeeRepository which commits changes directly to the database bypassing Hazelcast cache. I invoke REST API again and compare results with entities collected with previous invoke. Field personId should not be equal with value for previously invoked entity. You also can test it manually by calling REST API endpoint and change something in the database using the client like MySQL Workbench.

@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@RunWith(SpringRunner.class)
public class CacheRefreshEmployeeTest {

	protected Logger logger = Logger.getLogger(CacheRefreshEmployeeTest.class.getName());

	@Autowired
	EmployeeRepository repository;

	TestRestTemplate template = new TestRestTemplate();

	@Test
	public void testModifyAndRefresh() {
		Employee[] e = template.getForObject("http://localhost:3333/employees/company/{company}", Employee[].class, "Test001");
		for (int i = 0; i < e.length; i++) {
			Employee eMod = repository.findOne(e[i].getId());
			eMod.setPersonId(eMod.getPersonId()+1);
			repository.save(eMod);
		}

		Employee[] e2 = template.getForObject("http://localhost:3333/employees/company/{company}", Employee[].class, "Test001");
		for (int i = 0; i < e2.length; i++) {
			Assert.assertNotEquals(e[i].getPersonId(), e2[i].getPersonId());
		}

	}
}

Here’s the picture with Striim dashboard monitor. We can check out how many events were processed, what is actual memory and CPU usage etc.

striim-1

Final Thoughts

I have no definite opinion about Striim. On the one hand it is an interesting solution with many integration possibilities and a nice dashboard for configuration and monitoring. But on the other hand it is not free from errors and bugs. My application crashed when an exception was thrown for the lack of a matching serializer for the entity in Hazelcast’s cache. This stopped processing any further events. It may be a deliberate action, but in my opinion subsequent events should be processed as they may affect other tables. The application management with web dashboard is not very comfortable at all. Every time I restarted the container, I had to change something in the configuration, because the application threw not intuitive exception on startup. From this type of application I would expect first of all reliability if the application would require updating of the data on the Hazelcast. However, despite some drawbacks, it is worth a closer look at Striim.