Versioning REST API with Spring Boot and Swagger

One thing’s for sure. If you don’t have to version your API, do not try to do that. However, sometimes you have to. A large part of the most popular services like Twitter, Facebook, Netflix or PayPal is versioning their REST APIs. The advantages and disadvantages of that approach are obvious. On the one hand you don’t have to worry about making changes in your API even if many external clients and applications consume it. But on the other hand, you have maintain different versions of API implementation in your code, what sometimes may be troublesome.

In this article I’m going to show you how to maintain the several versions of REST API in your application in the most comfortable way. We will base on the sample application written on the top of Spring Boot framework and exposing API documentation using Swagger2 and SpringFox libraries.

Spring Boot does not provide any dedicated solutions for versioning APIs. The situation is different for SpringFox Swagger2 library, which provides grouping mechanism from version 2.8.0, which is perfect for generating documentation of versioned REST API.

I have already introduced Swagger2 together with Spring Boot application in one of my previous posts. In the article Microservices API Documentation with Swagger2 you may read how to use Swagger2 for generating API documentation for all the independent microservices and publishing it in one place – on API Gateway.

Different approaches to API versioning

There are some different ways to provide an API versioning in your application. The most popular of them are:

  1. Through an URI path – you include the version number in the URL path of the endpoint, for example /api/v1/persons
  2. Through query parameters – you pass the version number as a query parameter with specified name, for example /api/persons?version=1
  3. Through custom HTTP headers – you define a new header that contains the version number in the request
  4. Through a content negotiation – the version number is included to the “Accept” header together with accepted content type. The request with cURL would look like in the following sample: curl -H "Accept: application/vnd.piomin.v1+json" http://localhost:8080/api/persons

The decision, which of that approach implement in the application is up to you. We would discuss the advantages and disadvantages of every single approach, however it is not the main purpose of that article. The main purpose is to show you how to implement versioning in Spring Boot application and then publish the API documentation automatically using Swagger2. The sample application source code is available on GitHub (https://github.com/piomin/sample-api-versioning.git). I have implemented two of the approaches described above – in point 1 and 4.

Enabling Swagger for Spring Boot

Swagger2 can be enabled in Spring Boot application by including SpringFox library. In fact, this is the suite of java libraries used for automating the generation of machine and human readable specifications for JSON APIs written using Spring Framework. It supports such formats like swagger, RAML and jsonapi. To enable it for your application include the following Maven dependencies to the project: io.springfox:springfox-swagger-ui, io.springfox:springfox-swagger2, io.springfox:springfox-spring-web. Then you will have to annotate the main class with @EnableSwagger2 and define Docker object. Docket is a Springfox’s primary configuration mechanism for Swagger 2.0. We will discuss the details about it in the next section along with the sample for each way of versioning API.

Sample API

Our sample API is very simple. It exposes basic CRUD methods for Person entity. There are three versions of API available for external clients: 1.0, 1.1 and 1.2. In the version 1.1 I have changed the method for updating Person entity. In version 1.0 it was available under /person path, while now it is available under /person/{id} path. This is the only difference between versions 1.0 and 1.1. There is also one only difference in API between versions 1.1 and 1.2. Instead of field birthDate it returns age as integer parameter. This change affects to all the endpoints except DELETE /person/{id}. Now, let’s proceed to the implementation.

Versioning using URI path

Here’s the full implementation of URI path versioning inside Spring @RestController.

@RestController
@RequestMapping("/person")
public class PersonController {

	@Autowired
	PersonMapper mapper;
	@Autowired
	PersonRepository repository;

	@PostMapping({"/v1.0", "/v1.1"})
	public PersonOld add(@RequestBody PersonOld person) {
		return (PersonOld) repository.add(person);
	}

	@PostMapping("/v1.2")
	public PersonCurrent add(@RequestBody PersonCurrent person) {
		return mapper.map((PersonOld) repository.add(person));
	}

	@PutMapping("/v1.0")
	@Deprecated
	public PersonOld update(@RequestBody PersonOld person) {
		return (PersonOld) repository.update(person);
	}

	@PutMapping("/v1.1/{id}")
	public PersonOld update(@PathVariable("id") Long id, @RequestBody PersonOld person) {
		return (PersonOld) repository.update(person);
	}

	@PutMapping("/v1.2/{id}")
	public PersonCurrent update(@PathVariable("id") Long id, @RequestBody PersonCurrent person) {
		return mapper.map((PersonOld) repository.update(person));
	}

	@GetMapping({"/v1.0/{id}", "/v1.1/{id}"})
	public PersonOld findByIdOld(@PathVariable("id") Long id) {
		return (PersonOld) repository.findById(id);
	}

	@GetMapping("/v1.2/{id}")
	public PersonCurrent findById(@PathVariable("id") Long id) {
		return mapper.map((PersonOld) repository.findById(id));
	}

	@DeleteMapping({"/v1.0/{id}", "/v1.1/{id}", "/v1.2/{id}"})
	public void delete(@PathVariable("id") Long id) {
		repository.delete(id);
	}

}

If you would like to have three different versions available in the single generated API specification you should declare three Docket @Beans – one per single version. In this case the swagger group concept, which has been already introduced by SpringFox, would be helpful for us. The reason this concept bas been introduced is a necessity for support applications which require more than one swagger resource listing. Usually you need more than one resource listing in order to provide different versions of the same API. We can assign group to every Docket just by invoking groupName DSL method on it. Because different versions of API method are implemented within the same controller, we have to distinguish them by declaring path regex matching the selected version. All other settings are standard.

@Bean
public Docket swaggerPersonApi10() {
	return new Docket(DocumentationType.SWAGGER_2)
		.groupName("person-api-1.0")
		.select()
			.apis(RequestHandlerSelectors.basePackage("pl.piomin.services.versioning.controller"))
			.paths(regex("/person/v1.0.*"))
		.build()
		.apiInfo(new ApiInfoBuilder().version("1.0").title("Person API").description("Documentation Person API v1.0").build());
}

@Bean
public Docket swaggerPersonApi11() {
	return new Docket(DocumentationType.SWAGGER_2)
		.groupName("person-api-1.1")
		.select()
			.apis(RequestHandlerSelectors.basePackage("pl.piomin.services.versioning.controller"))
			.paths(regex("/person/v1.1.*"))
		.build()
		.apiInfo(new ApiInfoBuilder().version("1.1").title("Person API").description("Documentation Person API v1.1").build());
}

@Bean
public Docket swaggerPersonApi12() {
	return new Docket(DocumentationType.SWAGGER_2)
		.groupName("person-api-1.2")
		.select()
			.apis(RequestHandlerSelectors.basePackage("pl.piomin.services.versioning.controller"))
			.paths(regex("/person/v1.2.*"))
		.build()
		.apiInfo(new ApiInfoBuilder().version("1.2").title("Person API").description("Documentation Person API v1.2").build());
}

Now, we may display Swagger UI for our API just by calling URL in the web browser path /swagger-ui.html. You can switch between all available versions of API as you can see on the picture below.

api-1
Switching between available versions of API

Specification is generated by the exact version of API. Here’s documentation for version 1.0. Because method PUT /person is annotated with @Deprecated it is crossed out on the generated HTML documentation page.

api-2
Person API 1.0 specification

If you switch to group person-api-1 you will see all the methods that contains v1.1 in the path. Along them you may recognize the current version of PUT method with {id} field in the path.

api-3
Person API 1.1 specification

When using documentation generated by Swagger you may easily call every method after expanding it. Here’s the sample of calling method PUT /person/{id} from implemented for version 1.2.

api-5
Updating Person entity by calling method PUT from 1.2 version

Versioning using ‘Accept’ header

To access the implementation of versioning witt ‘Accept’ header you should switch to branch header (https://github.com/piomin/sample-api-versioning/tree/header). Here’s the full implementation of content negotiation using ‘Accept’ header versioning inside Spring @RestController.

@RestController
@RequestMapping("/person")
public class PersonController {

	@Autowired
	PersonMapper mapper;
	@Autowired
	PersonRepository repository;

	@PostMapping(produces = {"application/vnd.piomin.app-v1.0+json", "application/vnd.piomin.app-v1.1+json"})
	public PersonOld add(@RequestBody PersonOld person) {
		return (PersonOld) repository.add(person);
	}

	@PostMapping(produces = "application/vnd.piomin.app-v1.2+json")
	public PersonCurrent add(@RequestBody PersonCurrent person) {
		return mapper.map((PersonOld) repository.add(person));
	}

	@PutMapping(produces = "application/vnd.piomin.app-v1.0+json")
	@Deprecated
	public PersonOld update(@RequestBody PersonOld person) {
		return (PersonOld) repository.update(person);
	}

	@PutMapping(value = "/{id}", produces = "application/vnd.piomin.app-v1.1+json")
	public PersonOld update(@PathVariable("id") Long id, @RequestBody PersonOld person) {
		return (PersonOld) repository.update(person);
	}

	@PutMapping(value = "/{id}", produces = "application/vnd.piomin.app-v1.2+json")
	public PersonCurrent update(@PathVariable("id") Long id, @RequestBody PersonCurrent person) {
		return mapper.map((PersonOld) repository.update(person));
	}

	@GetMapping(name = "findByIdOld", value = "/{idOld}", produces = {"application/vnd.piomin.app-v1.0+json", "application/vnd.piomin.app-v1.1+json"})
	@Deprecated
	public PersonOld findByIdOld(@PathVariable("idOld") Long id) {
		return (PersonOld) repository.findById(id);
	}

	@GetMapping(name = "findById", value = "/{id}", produces = "application/vnd.piomin.app-v1.2+json")
	public PersonCurrent findById(@PathVariable("id") Long id) {
		return mapper.map((PersonOld) repository.findById(id));
	}

	@DeleteMapping(value = "/{id}", produces = {"application/vnd.piomin.app-v1.0+json", "application/vnd.piomin.app-v1.1+json", "application/vnd.piomin.app-v1.2+json"})
	public void delete(@PathVariable("id") Long id) {
		repository.delete(id);
	}

}

We still have to define three Docker @Beans, but the filtering criterias are slightly different. The simple filtering by path is not an option here. We have to crate Predicate for RequestHandler object and pass it to apis DSL method. The predicate implementation should filter every method in order to find only those which have produces field with required version number. Here’s sample Docket implementation for version 1.2.

@Bean
public Docket swaggerPersonApi12() {
	return new Docket(DocumentationType.SWAGGER_2)
		.groupName("person-api-1.2")
		.select()
			.apis(p -> {
				if (p.produces() != null) {
					for (MediaType mt : p.produces()) {
						if (mt.toString().equals("application/vnd.piomin.app-v1.2+json")) {
							return true;
						}
					}
				}
				return false;
			})
		.build()
		.produces(Collections.singleton("application/vnd.piomin.app-v1.2+json"))
		.apiInfo(new ApiInfoBuilder().version("1.2").title("Person API").description("Documentation Person API v1.2").build());
}

As you can see on the picture below the generated methods does not have the version number in the path.

api-6
Person API 1.2 specification for a content negotiation approach

When calling method for the selected version of API the only difference is in the response’s required content type.

api-7
Updating person and setting response content type

Summary

Versioning is one of the most important concept around HTTP APIs designing. No matter which approach to versioning you choose you should do everything to describe your API well. This seems to be especially important in the era of microservices, where your interface may be called by many other independent applications. In this case creating documentation in isolation from the source code could be troublesome. Swagger solves all of described problems. It may be easily integrated with your application, supports versioning. Thanks to SpringFox project it also can be easily customized in your Spring Boot application to meet more advanced demands.

Advertisements

Partitioning with Apache Kafka and Vert.x

Apache Kafka is a distributed streaming platform. It also may act as a messaging system in your architecture. Traditional message brokers provides two models of communication: queuing and publish-subscribe (topics). Queues are used for point-to-point messaging, while topics allows you broadcast data to multiple target consumers. Kafka does not provide queuing mechanism directly. However, it introduces the consumer group concept, which generalizes both queuing and publish-subscribe models. The consumer group mechanisms guarantees that a single message would be processed by the only one consumer that belongs to the given group. It is especially useful when you have more than one instance of your service, which listens for messages incoming to the topic. That feature makes your consumers to behave as queuing clients within the same group.

Eclipse Vert.x is a lightweight and fast toolkit for building reactive applications on the JVM. I have already introduced that solution is the some of my previous posts, for example Asynchronous Microservices with Vert.x. Vert.x does not force you to implement a reactive application. You may create a standard service, which processes the HTTP requests asynchronously in accordance with Asynchronous I/O concept.

The purpose of this article

The main purpose of this article is to show you the main features of Apache Kafka, that may be useful when creating applications consuming messages. The Java client’s library choice is not a key point here. However, in my opinion Vert.x that is asynchronous, high performance framework perfectly matches to Apache Kafka. It provides Vert.x Kafka client, which allows you to read and send messages from/to an Kafka cluster. Before we proceed to the sample, let’s first dive into the core abstraction of Kafka.

Kafka topic

I’m assuming you excellent know what topic is and what is its main role. The every message incoming to the topic goes to every subscriber. What is the main difference between Kafka and standard topic provided by other message brokers? Kafka topic is partitioned. Each partition is an ordered, immutable sequence of records. Every record can be uniquecly identified within the partition by a sequential id number called the offset. The Kafka cluster retains all published records according to the configured retention period.

Consumer may subscribe to the whole topic or only to the selected partition. It can also control the offset from where it starts processing data. For example, it is able to reset offset in order reprocess data from the past or just or skip ahead to the most recent record to consume only messages currently sent to the topic. Here’s the figure that illustrates a single partition structure with producer and consumers listening for the incoming data.

Kafka-1

Sample architecture

Let me say some words about the sample system architecture. Its source code is available on GitHub (https://github.com/piomin/sample-vertx-kafka-messaging.git). In accordance of the principle that one picture speaks more than a thousand words, the diagram illustrating the architecture of our system is visible below. We have one topic created on Kafka platform, that consists of two partitions. There is one client application that exposes REST API allowing to send orders into the system and then forwarding them into the topic. The target partition is calculated basing on the type of order. We may create orders with types SINGLE and MULTIPLE. There are also some applications that consumes data from topic. First of them single-order-processor reads data from partition 0, the second multiple-order-processor from partition 1, and the last all-order-processor does not choose any partition.

kafka-2

Running Kafka

To run Apache Kafka on the local machine we may use its Docker image. The image shared by Spotify also starts ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100.

docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.99.100 --env ADVERTISED_PORT=9092 spotify/kafka

However, that option assumes the topics would be automatically created during application startup. I’ve got some problems with it while creating multi-partitions topic. There is also another image ches/kafka, which requires starting ZooKeeper separately, but provides Kafka client interface.

docker run -d --name zookeeper -p 2181:2181 zookeeper
docker run -d --name kafka -p 9092:9092 -p 7203:7203 --network kafka-net --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env ZOOKEEPER_IP=192.168.99.100 ches/kafka

Finally, we can run ches/kafka container in client mode and then create topic orders-out with two partitions.

docker run --rm --network kafka-net ches/kafka kafka-topics.sh --create --topic orders-out --replication-factor 1 --partitions 2 --zookeeper 192.168.99.100:2181
Created topic "orders-out".

Building producer application

First, we need to include Maven dependencies to enable Vert.x framework for the application. If the application exposes RESTful HTTP API you should include vertx-web. Library vertx-kafka-client has to be included to all the sample modules.

To start Vert.x as Java application we have to create verticle by extending AbstractVerticle. Then the verticle needs to be deployed in the main method using Vertx object. For more details about Vert.x and verticles concept you may refer to one of my previous article mentioned in the preface.

public class OrderVerticle extends AbstractVerticle {

	public static void main(String[] args) {
		Vertx vertx = Vertx.vertx();
		vertx.deployVerticle(new OrderVerticle());
	}

}

The next step is to define producer using KafkaProducer interface. We have to provide connection settings and serializer implementation class. You can choose between various built-in serializer implemementations. The most suitable for me was JsonObjectSerializer, which requires JsonObject as an input parameter.

Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer producer = KafkaProducer.create(vertx, config);

The procuder is invoked inside POST method route definition. It returnes an asynchronous response with a status after sending message to the topic. The message is created using KafkaProducerRecord interface. It takes topic’s name, request object and partition number as the parameters. As you may see in the fragment of code below, partition number is calculated on the basis order type (o.getType().ordinal()).

Router router = Router.router(vertx);
router.route("/order/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/order").handler(BodyHandler.create());
router.post("/order").produces("application/json").handler(rc -> {
	Order o = Json.decodeValue(rc.getBodyAsString(), Order.class);
	KafkaProducerRecord record = KafkaProducerRecord.create("orders", null, rc.getBodyAsJson(), o.getType().ordinal());
	producer.write(record, done -> {
		if (done.succeeded()) {
			RecordMetadata recordMetadata = done.result();
			LOGGER.info("Record sent: msg={}, destination={}, partition={}, offset={}", record.value(), recordMetadata.getTopic(), recordMetadata.getPartition(), recordMetadata.getOffset());
			o.setId(recordMetadata.getOffset());
			o.setStatus(OrderStatus.PROCESSING);
		} else {
			Throwable t = done.cause();
			LOGGER.error("Error sent to topic: {}", t.getMessage());
			o.setStatus(OrderStatus.REJECTED);
		}
		rc.response().end(Json.encodePrettily(o));
	});
});
vertx.createHttpServer().requestHandler(router::accept).listen(8090);

Building consumer applications

The consumer configuration is very similar to that for producer. We also have to set connection settings and class using for deserialization. There is one interesting setting, which has been defined for the consumer in the fragment of code visible below. It is auto.offset.reset (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG). It sets the initial offset in Kafka for the customer during initialization. If you would like to read all records from the beginning of stream use value earliest. If you would like to processes only the newest records (received after application startup) set that property to latest. Because in our case Kafka acts as a message broker, it is set to latest.

Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = KafkaConsumer.create(vertx, config);

As you probably remember we have three different application that subscribe to the topic. The first of them, implemented under the module all-order-processor consumes all the events incoming to the the topic. This implemementation is relatively the simplest. We only need to invoke subscribe method and pass the name of topic as a parameter. Then every incoming message is processed by handler method.

consumer.subscribe("orders-out", ar -> {
	if (ar.succeeded()) {
		LOGGER.info("Subscribed");
	} else {
		LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
	}
});

consumer.handler(record -> {
	LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
	Order order = Json.decodeValue(record.value(), Order.class);
	order.setStatus(OrderStatus.DONE);
	LOGGER.info("Order processed: id={}, price={}", order.getId(), order.getPrice());
});

The implementation of consuming method for the other applications is a little more complicated. Besides defining target topic, every consumer can ask for a specific partition. The application multiple-order-processor subscribes to partition 1, while multiple-order-processor to partition 0.

TopicPartition tp = new TopicPartition().setPartition(1).setTopic("orders-out");
consumer.assign(tp, ar -> {
	if (ar.succeeded()) {
		LOGGER.info("Subscribed");
		consumer.assignment(done1 -> {
			if (done1.succeeded()) {
				for (TopicPartition topicPartition : done1.result()) {
					LOGGER.info("Partition: topic={}, number={}", topicPartition.getTopic(), topicPartition.getPartition());
				}
			} else {
				LOGGER.error("Could not assign partition: err={}", done1.cause().getMessage());
			}
		});
	} else {
		LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
	}
});

The implamentation of handle method inside multiple-order-processor is pretty interesting. If it receives order with non-empty field relatedOrderId it tries to find it in the historical records stored in topic. It may achieved by calling seek method on KafkaConsumer.

consumer.handler(record -> {
	LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
	Order order = Json.decodeValue(record.value(), Order.class);
	if (ordersWaiting.containsKey(record.offset())) {
		LOGGER.info("Related order found: id={}, price={}", order.getId(), order.getPrice());
		LOGGER.info("Current price: price={}", order.getPrice() + ordersWaiting.get(record.offset()).getPrice());
		consumer.seekToEnd(tp);
	}

	if (order.getRelatedOrderId() != null && !ordersWaiting.containsKey(order.getRelatedOrderId())) {
		ordersWaiting.put(order.getRelatedOrderId(), order);
		consumer.seek(tp, order.getRelatedOrderId());
	}
});

Testing

Now it is time to launch our applications. You may run the main classes from your IDE or build the whole project using mvn clean install command and then run it with java -jar. Also run two instances of all-order-processor in order to check out how a consumer groups mechanism works in practice.

Let’s send some test requests to the order-service in the following sequence.

curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":200}' http://localhost:8090/order
{"id":0,"type":"SINGLE","status":"PROCESSING","price":200}
curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":300}' http://localhost:8090/order
{"id":1,"type":"SINGLE","status":"PROCESSING","price":300}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":400}' http://localhost:8090/order
{"id":0,"type":"MULTIPLE","status":"PROCESSING","price":400}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId" :0}' http://localhost:8090/order
{"id":1,"type":"MULTIPLE","status":"PROCESSING","price":500}

Here’s log from producer application.

2018-01-30 11:08:48 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":200}, destination=orders-out, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":300}, destination=orders-out, partition=0, offset=1
2018-01-30 11:09:08 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":400}, destination=orders-out, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, destination=orders-out, partition=1, offset=1

Here’s log from single-order-processor. It has processed only messages from partition 0.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from multiple-order-processor. It has processed only messages from partition 1.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Here’s log from first instance of all-order-processor.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from second instance of all-order-processor. It may be a little bit surprising for you. But, if you run two instances of consumer, which listens for the whole topic each instance would process message from the single partition.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Summary

In this article I was trying to give you a little bit of messaging with Apache Kafka. Such concepts like consumer groups or partitioning are something what makes it different from traditional messaging solutions. Kafka is widely adopted product, which can acts as storage, messaging system or stream processor. Together with popular JVM based toolkit Vert.x it may be really powerful, fast and lightweight solution for your applications that exchanges messages between each other. The key concepts introduced by Kafka has been adopted by Spring Cloud Stream, which makes them as an opinionated choice for creating messaging microservices.

Perfomance Testing with Gatling

How many of you have ever created automated performance tests before running application on production? Usually, developers attaches importance to the functional testing and tries to provide at least some unit and integration tests. However, sometimes a performance leak may turn out to be more serious than undetected business error, because it can affect the whole system, not the only the one business process.
Personally, I have been implementing performance tests for my application, but I have never run them as a part of the Continuous Integration process. Of course it took place some years, my knowledge and experience were a lot smaller… Anyway, recently I have became interested in topics related to performance testing, partly for the reasons of performance issues with the application in my organisation. As it happens, the key is to find the right tool. Probably many of you have heard about JMeter. Today I’m going to present the competitive solution – Gatling. I’ve read it generates rich and colorful reports with all the metrics collected during the test case. That feature seems to be better than in JMeter.
Before starting the discussion about Gatling let me say some words about theory. We can distinguish between two types of performance testing: load and stress testing. Load testing verifies how the system function under a heavy number of concurrent clients sending requests over a certain period of time. However, the main goal of that type of tests is to simulate the standard traffic similar to that, which may arise on production. Stress testing takes load testing and pushes your app to the limits to see how it handles an extremely heavy load.

What is Gatling?

Gatling is a powerful tool for load testing, written in Scala. It has a full support of HTTP protocols and can also be used for testing JDBC connections and JMS. When using Gatling you have to define test scenario as a Scala dsl code. It is worth to mention that it provides a comprehensive informative HTML load reports and has plugins for inteegration with Gradle, Maven and Jenkins.

Building sample application

Before we run any tests we need to have something for tests. Our sample application is really simple. Its source code is available as usual on GitHub. It exposes RESTful HTTP API with CRUD operations for adding and searching entity in the database. I use Postgres as a backend store for the application repository. The application is build on the top of Spring Boot framework. It also uses Spring Data project as a persistence layer implementation.

plugins {
    id 'org.springframework.boot' version '1.5.9.RELEASE'
}
dependencies {
	compile group: 'org.springframework.boot', name: 'spring-boot-starter-web'
	compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-jpa'
	compile group: 'org.postgresql', name: 'postgresql', version: '42.1.4'
	testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test'
}

There is one entity Person which is mapped to the table person.

@Entity
@SequenceGenerator(name = "seq_person", initialValue = 1, allocationSize = 1)
public class Person {
	@Id
	@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "seq_person")
	private Long id;
	@Column(name = "first_name")
	private String firstName;
	@Column(name = "last_name")
	private String lastName;
	@Column(name = "birth_date")
	private Date birthDate;
	@Embedded
	private Address address;
	// ...
}

Database connection settings and hibernate properties are configured in application.yml file.

spring:
  application:
    name: gatling-service
  datasource:
    url: jdbc:postgresql://192.168.99.100:5432/gatling
    username: gatling
    password: gatling123
  jpa:
    properties:
      hibernate:
        hbm2ddl:
          auto: update

server:
  port: 8090

Like I have already mentioned the application exposes API methods for adding and searching persons in database. Here’s our Spring REST controller implementation.

@RestController
@RequestMapping("/persons")
public class PersonsController {

	private static final Logger LOGGER = LoggerFactory.getLogger(PersonsController.class);

	@Autowired
	PersonsRepository repository;

	@GetMapping
	public List<Person> findAll() {
		return (List<Person>) repository.findAll();
	}

	@PostMapping
	public Person add(@RequestBody Person person) {
		Person p = repository.save(person);
		LOGGER.info("add: {}", p.toString());
		return p;
	}

	@GetMapping("/{id}")
	public Person findById(@PathVariable("id") Long id) {
		LOGGER.info("findById: id={}", id);
		return repository.findOne(id);
	}

}

Running database

The next after the sample application development is to run the database. The most suitable way of running it for the purposes is by Docker image. Here’s a Docker command that start Postgres containerand initializes gatling user and database.

docker run -d --name postgres -e POSTGRES_DB=gatling -e POSTGRES_USER=gatling -e POSTGRES_PASSWORD=gatling123 -p 5432:5432 postgres

Providing test scenario

Every Gatling test suite should extends Simulation class. Inside it you may declare a list of scenarios using Gatling Scala DSL. Our goal is to run 30 clients which simultaneously sends requests 1000 times. First, the clients adds new person into the database by calling POST /persons method. Then they try to search person using its id by calling GET /persons/{id} method. So, totally 60k would be sent to the application: 30k to POST endpoint and 30k to GET method. Like you see on the code below the test scenario is quite simple. ApiGatlingSimulationTest is available under directory src/test/scala.

class ApiGatlingSimulationTest extends Simulation {

  val scn = scenario("AddAndFindPersons").repeat(1000, "n") {
        exec(
          http("AddPerson-API")
            .post("http://localhost:8090/persons")
            .header("Content-Type", "application/json")
            .body(StringBody("""{"firstName":"John${n}","lastName":"Smith${n}","birthDate":"1980-01-01", "address": {"country":"pl","city":"Warsaw","street":"Test${n}","postalCode":"02-200","houseNo":${n}}}"""))
            .check(status.is(200))
        ).pause(Duration.apply(5, TimeUnit.MILLISECONDS))
  }.repeat(1000, "n") {
        exec(
          http("GetPerson-API")
            .get("http://localhost:8090/persons/${n}")
            .check(status.is(200))
        )
  }

  setUp(scn.inject(atOnceUsers(30))).maxDuration(FiniteDuration.apply(10, "minutes"))

}

To enable Gatling framework for the project we should also define the following dependency in the Gradle build file.

testCompile group: 'io.gatling.highcharts', name: 'gatling-charts-highcharts', version: '2.3.0'

Running tests

There are some Gradle plugins available, which provides support for running tests during project build. However, we may also define simple gradle task that just run tests using io.gatling.app.Gatling class.

task loadTest(type: JavaExec) {
   dependsOn testClasses
   description = "Load Test With Gatling"
   group = "Load Test"
   classpath = sourceSets.test.runtimeClasspath
   jvmArgs = [
        "-Dgatling.core.directory.binaries=${sourceSets.test.output.classesDir.toString()}"
   ]
   main = "io.gatling.app.Gatling"
   args = [
           "--simulation", "pl.piomin.services.gatling.ApiGatlingSimulationTest",
           "--results-folder", "${buildDir}/gatling-results",
           "--binaries-folder", sourceSets.test.output.classesDir.toString(),
           "--bodies-folder", sourceSets.test.resources.srcDirs.toList().first().toString() + "/gatling/bodies",
   ]
}

The Gradle task defined above may be run with command gradle loadTest. Of course, before running tests you should launch the application. You may perform it from your IDE by starting the main class pl.piomin.services.gatling.ApiApplication or by running command java -jar build/libs/sample-load-test-gatling.jar.

Test reports

After test execution the report is printed in a text format.

================================================================================
---- Global Information --------------------------------------------------------
> request count                                      60000 (OK=60000  KO=0     )
> min response time                                      2 (OK=2      KO=-     )
> max response time                                   1338 (OK=1338   KO=-     )
> mean response time                                    80 (OK=80     KO=-     )
> std deviation                                        106 (OK=106    KO=-     )
> response time 50th percentile                         50 (OK=50     KO=-     )
> response time 75th percentile                         93 (OK=93     KO=-     )
> response time 95th percentile                        253 (OK=253    KO=-     )
> response time 99th percentile                        564 (OK=564    KO=-     )
> mean requests/sec                                319.149 (OK=319.149 KO=-     )
---- Response Time Distribution ------------------------------------------------
> t < 800 ms                                         59818 (100%) > 800 ms < t < 1200 ms                                 166 (  0%) > t > 1200 ms                                           16 (  0%)
> failed                                                 0 (  0%)
================================================================================

But that what is really cool in Gatling is an ability to generate reports in a graphical form. HTML reports are available under directory build/gatling-results. The first report shows global information with total number of requests and maximum response time by percentiles. For example, you may see that maximum response time in 95% of responses for GetPerson-API is 206 ms.

gatling-1

We may check out such report for all requests or filter them to see only those generated by selected API. In the picture below there is visualization only for GetPerson-API.

gatling-2

Here’s the graph with percentage of requests grouped by average response time.

gatling-3

Here’s the graph which ilustrates timeline with average response times. Additionally, that timeline also shows the statistics by percentiles.

gatling-4

Here’s the graph with number of requests processed succesfully by the application in a second.

gatling-5

Part 2: Microservices security with OAuth2

I have been writing about security with OAuth2 in some articles before. This article is the continuation of samples previously described in the following posts:

Today I’m going to show you more advanced sample than before, where all authentication and OAuth2 data is stored on database. We also find out how to secure microservices, especially considering an inter-communication between them with Feign client. I hope this article will provide a guidance and help you with designing and implementing secure solutions with Spring Cloud. Let’s begin.

There are four services running inside our sample system, what is visualized on the figure below. There is nothing unusual here. We have a discovery server where our sample microservices account-service and customer-service are registered. Those microservices are both protected with OAuth2 authorization. Authorization is managed by auth-server. It stores not only OAuth2 tokens, but also users authentication data. The whole process is implemented using Spring Security and Spring Cloud libraries.

oauth2-1

1. Start database

All the authentication credentials and tokens are stored in MySQL database. So, the first step is to start MySQL. The most comfortable way to achieve it is through a Docker container. The command visible below in addition to starting database also creates schema and user oauth2.

docker run -d --name mysql -e MYSQL_DATABASE=oauth2 -e MYSQL_USER=oauth2 -e MYSQL_PASSWORD=oauth2 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

2. Configure data source in application

MySQL is now available on port host 192.168.99.100 if you run Docker on Windows and port 33306. Datasource properties should be set in application.yml of auth-server. Spring Boot is also able to run some SQL scripts on selected datasource after an application startup. It’s good news for us, because we have to create some tables on the schema dedicated for OAuth2 process.

spring:
  application:
    name: auth-server
  datasource:
    url: jdbc:mysql://192.168.99.100:33306/oauth2?useSSL=false
    username: oauth2
    password: oauth2
    driver-class-name: com.mysql.jdbc.Driver
    schema: classpath:/script/schema.sql
    data: classpath:/script/data.sql

3. Create schema in MySQL

Despite appearances, it is not so simple to find the SQL script with tables that needs to be created when using Spring Security for OAuth2. Here’s that script, which is available under /src/main/resources/script/schema.sql in auth-server module. We have to create six tables:

  • oauth_client_details
  • oauth_client_token
  • oauth_access_token
  • oauth_refresh_token
  • oauth_code
  • oauth_approvals
drop table if exists oauth_client_details;
create table oauth_client_details (
  client_id VARCHAR(255) PRIMARY KEY,
  resource_ids VARCHAR(255),
  client_secret VARCHAR(255),
  scope VARCHAR(255),
  authorized_grant_types VARCHAR(255),
  web_server_redirect_uri VARCHAR(255),
  authorities VARCHAR(255),
  access_token_validity INTEGER,
  refresh_token_validity INTEGER,
  additional_information VARCHAR(4096),
  autoapprove VARCHAR(255)
);
drop table if exists oauth_client_token;
create table oauth_client_token (
  token_id VARCHAR(255),
  token LONG VARBINARY,
  authentication_id VARCHAR(255) PRIMARY KEY,
  user_name VARCHAR(255),
  client_id VARCHAR(255)
);

drop table if exists oauth_access_token;
CREATE TABLE oauth_access_token (
  token_id VARCHAR(256) DEFAULT NULL,
  token BLOB,
  authentication_id VARCHAR(256) DEFAULT NULL,
  user_name VARCHAR(256) DEFAULT NULL,
  client_id VARCHAR(256) DEFAULT NULL,
  authentication BLOB,
  refresh_token VARCHAR(256) DEFAULT NULL
);

drop table if exists oauth_refresh_token;
CREATE TABLE oauth_refresh_token (
  token_id VARCHAR(256) DEFAULT NULL,
  token BLOB,
  authentication BLOB
);

drop table if exists oauth_code;
create table oauth_code (
  code VARCHAR(255), authentication LONG VARBINARY
);
drop table if exists oauth_approvals;
create table oauth_approvals (
    userId VARCHAR(255),
    clientId VARCHAR(255),
    scope VARCHAR(255),
    status VARCHAR(10),
    expiresAt DATETIME,
    lastModifiedAt DATETIME
);

4. Add some test data to database

There is also the second SQL script /src/main/resources/script/data.sql with some insert commands for the test purpose. The most important thing is to add some client id/client secret pairs.

INSERT INTO `oauth_client_details` (`client_id`, `client_secret`, `scope`, `authorized_grant_types`, `access_token_validity`, `additional_information`) VALUES ('account-service', 'secret', 'read', 'authorization_code,password,refresh_token,implicit', '900', '{}');
INSERT INTO `oauth_client_details` (`client_id`, `client_secret`, `scope`, `authorized_grant_types`, `access_token_validity`, `additional_information`) VALUES ('customer-service', 'secret', 'read', 'authorization_code,password,refresh_token,implicit', '900', '{}');
INSERT INTO `oauth_client_details` (`client_id`, `client_secret`, `scope`, `authorized_grant_types`, `access_token_validity`, `additional_information`) VALUES ('customer-service-write', 'secret', 'write', 'authorization_code,password,refresh_token,implicit', '900', '{}');

5. Bulding Authorization Server

Now, the most important thing in this article – authorization server configuration. The configuration class should be annotated with @EnableAuthorizationServer. Then we need to overwrite some methods from extended AuthorizationServerConfigurerAdapter class. The first important thing here is to set the default token storage to a database by providing bean JdbcTokenStore with default data source as a parameter. Although all tokens are now stored in a database we still want to generate them in JWT format. That’s why the second bean JwtAccessTokenConverter has to be provided in that class. By overriding different configure methods inherited from the base class we can set a default storage for OAuth2 client details and require authorization server to always verify the API key submitted in HTTP headers.

@Configuration
@EnableAuthorizationServer
public class OAuth2Config extends AuthorizationServerConfigurerAdapter {

   @Autowired
   private DataSource dataSource;
   @Autowired
   private AuthenticationManager authenticationManager;

   @Override
   public void configure(AuthorizationServerEndpointsConfigurer endpoints) throws Exception {
	  endpoints.authenticationManager(this.authenticationManager).tokenStore(tokenStore())
		   .accessTokenConverter(accessTokenConverter());
   }

   @Override
   public void configure(AuthorizationServerSecurityConfigurer oauthServer) throws Exception {
	  oauthServer.checkTokenAccess("permitAll()");
   }

   @Bean
   public JwtAccessTokenConverter accessTokenConverter() {
	  return new JwtAccessTokenConverter();
   }

   @Override
   public void configure(ClientDetailsServiceConfigurer clients) throws Exception {
	  clients.jdbc(dataSource);
   }

   @Bean
   public JdbcTokenStore tokenStore() {
	  return new JdbcTokenStore(dataSource);
   }

}

The main OAuth2 grant type, which is used in the current sample is Resource owner credentials grant. In that type of grant client application sends user login and password to authenticate against OAuth2 server. A POST request sent by the client contains the following parameters:

  • grant_type – with the value ‘password’
  • client_id – with the client’s ID
  • client_secret – with the client’s secret
  • scope – with a space-delimited list of requested scope permissions
  • username – with the user’s username
  • password – with the user’s password

The authorization server will respond with a JSON object containing the following parameters:

  • token_type – with the value ‘Bearer’
  • expires_in – with an integer representing the TTL of the access token
  • access_token – the access token itself
  • refresh_token – a refresh token that can be used to acquire a new access token when the original expires

Spring application provides a custom authentication mechanism by implementing UserDetailsService interface and overriding its method loadUserByUsername. In our sample application user credentials and authorities are also stored in the database, so we inject UserRepository bean to the custom UserDatailsService class.

@Component("userDetailsService")
public class UserDetailsServiceImpl implements UserDetailsService {

    private final Logger log = LoggerFactory.getLogger(UserDetailsServiceImpl.class);

    @Autowired
    private UserRepository userRepository;

    @Override
    @Transactional
    public UserDetails loadUserByUsername(final String login) {

        log.debug("Authenticating {}", login);
        String lowercaseLogin = login.toLowerCase();

        User userFromDatabase;
        if(lowercaseLogin.contains("@")) {
            userFromDatabase = userRepository.findByEmail(lowercaseLogin);
        } else {
            userFromDatabase = userRepository.findByUsernameCaseInsensitive(lowercaseLogin);
        }

        if (userFromDatabase == null) {
            throw new UsernameNotFoundException("User " + lowercaseLogin + " was not found in the database");
        } else if (!userFromDatabase.isActivated()) {
            throw new UserNotActivatedException("User " + lowercaseLogin + " is not activated");
        }

        Collection<GrantedAuthority> grantedAuthorities = new ArrayList<>();
        for (Authority authority : userFromDatabase.getAuthorities()) {
            GrantedAuthority grantedAuthority = new SimpleGrantedAuthority(authority.getName());
            grantedAuthorities.add(grantedAuthority);
        }

        return new org.springframework.security.core.userdetails.User(userFromDatabase.getUsername(), userFromDatabase.getPassword(), grantedAuthorities);
    }

}

That’s practically all what should be written about auth-service module. Let’s move on to the client microservices.

6. Bulding microservices

The REST API is very simple. It does nothing more than returning some data. However, there is one interesting thing in that implementation. That is preauthorization based on OAuth token scope, which is annotated on the API methods with @PreAuthorize("#oauth2.hasScope('read')").

@RestController
public class AccountController {

   @GetMapping("/{id}")
   @PreAuthorize("#oauth2.hasScope('read')")
   public Account findAccount(@PathVariable("id") Integer id) {
	  return new Account(id, 1, "123456789", 1234);
   }

   @GetMapping("/")
   @PreAuthorize("#oauth2.hasScope('read')")
   public List<Account> findAccounts() {
	  return Arrays.asList(new Account(1, 1, "123456789", 1234), new Account(2, 1, "123456780", 2500),
		new Account(3, 1, "123456781", 10000));
   }

}

Preauthorization is disabled by default. To enable it for API methods we should use @EnableGlobalMethodSecurity annotation. We should also declare that such a preauthorization would be based on OAuth2 token scope.

@Configuration
@EnableResourceServer
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class OAuth2ResourceServerConfig extends GlobalMethodSecurityConfiguration {

    @Override
    protected MethodSecurityExpressionHandler createExpressionHandler() {
        return new OAuth2MethodSecurityExpressionHandler();
    }

}

7. Feign client with OAuth2

The API method findAccounts implemented in AccountController is invoked by customer-service through a Feign client.

@FeignClient(name = "account-service", configuration = AccountClientConfiguration.class)
public interface AccountClient {

   @GetMapping("/")
   List<Account> findAccounts();

}

If you call account service endpoint via Feign client you get the following exception.

feign.FeignException: status 401 reading AccountClient#findAccounts(); content:{"error":"unauthorized","error_description":"Full authentication is required to access this resource"}

Why? Of course, account-service is protected with OAuth2 token authorization, but Feign client does not send an authorization token in the request header. That aproach may be customized by defining custom configuration class for Feign client. It allows to declare a request interceptor. In that case we can use an implementation for OAuth2 provided by OAuth2FeignRequestInterceptor from Spring Cloud OAuth2 library. We prefer password

public class AccountClientConfiguration {

   @Value("${security.oauth2.client.access-token-uri}")
   private String accessTokenUri;
   @Value("${security.oauth2.client.client-id}")
   private String clientId;
   @Value("${security.oauth2.client.client-secret}")
   private String clientSecret;
   @Value("${security.oauth2.client.scope}")
   private String scope;

   @Bean
   RequestInterceptor oauth2FeignRequestInterceptor() {
	  return new OAuth2FeignRequestInterceptor(new DefaultOAuth2ClientContext(), resource());
   }

   @Bean
   Logger.Level feignLoggerLevel() {
	  return Logger.Level.FULL;
   }

   private OAuth2ProtectedResourceDetails resource() {
	  ResourceOwnerPasswordResourceDetails resourceDetails = new ResourceOwnerPasswordResourceDetails();
	  resourceDetails.setUsername("piomin");
	  resourceDetails.setPassword("piot123");
	  resourceDetails.setAccessTokenUri(accessTokenUri);
	  resourceDetails.setClientId(clientId);
	  resourceDetails.setClientSecret(clientSecret);
	  resourceDetails.setGrantType("password");
	  resourceDetails.setScope(Arrays.asList(scope));
	  return resourceDetails;
   }

}

8. Testing

Finally, we may perform some tests. Let’s build a sample project using mvn clean install command. If you run all the services with the default settings they would be available under addresses:

The test method is visible below. We use OAuth2RestTemplate with ResourceOwnerPasswordResourceDetails to perform resource owner credentials grant operation and call GET /{id} API method from customer-service with OAuth2 token send in the request header.

	@Test
	public void testClient() {
        ResourceOwnerPasswordResourceDetails resourceDetails = new ResourceOwnerPasswordResourceDetails();
        resourceDetails.setUsername("piomin");
        resourceDetails.setPassword("piot123");
        resourceDetails.setAccessTokenUri("http://localhost:9999/oauth/token");
        resourceDetails.setClientId("customer-service");
        resourceDetails.setClientSecret("secret");
        resourceDetails.setGrantType("password");
        resourceDetails.setScope(Arrays.asList("read"));
        DefaultOAuth2ClientContext clientContext = new DefaultOAuth2ClientContext();
        OAuth2RestTemplate restTemplate = new OAuth2RestTemplate(resourceDetails, clientContext);
        restTemplate.setMessageConverters(Arrays.asList(new MappingJackson2HttpMessageConverter()));
        final Customer customer = restTemplate.getForObject("http://localhost:8083/{id}", Customer.class, 1);
        System.out.println(customer);
	}

 

In-memory data grid with Apache Ignite

Apache Ignite is a relatively new solution, but quickly increasing its popularity. It is hard to assigned to a single area of database engines division, because it has characteristics typical for some of them. The primary purpose of this solution is an in memory data grid and a key-value storage. It also has some common RDBMS features like support for SQL queries and ACID transactions. But that’s not to say it is full SQL and transactional database. It does not support foreign key constraints and transactions are available only at key-value level. Despite that Apache Ignite seems to be very interesting solution.

Apache Ignite may be easily started as a node embedded to Spring Boot application. The simplest way to achieve that is by using Spring Data Ignite library. Apache Ignite implements Spring Data CrudRepository interface that supports basic CRUD operations and also provides access to the Apache Ignite SQL Grid using the unified Spring Data interfaces. Although it has a support for distributed, ACID and SQL-compliant disk store persistence we design a solution which store in-memory cache objects in MySQL database. The architecture of presented solution is visible on the figure below and you can see it is very simple. The application put data to the in-memory cache on Apache Ignite. Apache Ignite automatically synchronizes this changes with database in an asynchronous, background task. The way of reading data by application also should not surprise you. If an entity is not cached it is read from database and put to the cache for a future use.

ignite

I’m going to guide you through the process of the sample application development. The result of this development is available on GitHub. I have found a few examples on the web, but there were only the basics. I’ll show you how to configure Apache Ignite to write objects from cache in database and create some more complex cross-cache join queries. Let’s begin from running database.

1. Setup MySQL database

The best way to start MySQL database locally is of course by Docker container. For Docker on Windows, MySQL database is now available on 192.168.99.100:33306.

docker run -d --name mysql -e MYSQL_DATABASE=ignite -e MYSQL_USER=ignite -e MYSQL_PASSWORD=ignite123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

The next step is to create tables used by application entities to store the data: PERSON, CONTACT. Those to tables are in 1…N relation where table CONTACT holds the foreign key referenced to PERSON id.

CREATE TABLE `person` (
  `id` int(11) NOT NULL,
  `first_name` varchar(45) DEFAULT NULL,
  `last_name` varchar(45) DEFAULT NULL,
  `gender` varchar(10) DEFAULT NULL,
  `country` varchar(10) DEFAULT NULL,
  `city` varchar(20) DEFAULT NULL,
  `address` varchar(45) DEFAULT NULL,
  `birth_date` date DEFAULT NULL,
  PRIMARY KEY (`id`)
);

CREATE TABLE `contact` (
  `id` int(11) NOT NULL,
  `location` varchar(45) DEFAULT NULL,
  `contact_type` varchar(10) DEFAULT NULL,
  `person_id` int(11) NOT NULL,
  PRIMARY KEY (`id`)
);

ALTER TABLE `ignite`.`contact` ADD INDEX `person_fk_idx` (`person_id` ASC);
ALTER TABLE `ignite`.`contact`
ADD CONSTRAINT `person_fk` FOREIGN KEY (`person_id`) REFERENCES `ignite`.`person` (`id`) ON DELETE CASCADE ON UPDATE CASCADE;

2. Maven configuration

The easiest way to start working with Apache Ignite’s Spring Data repository is by adding the following Maven dependency to an application’s pom.xml file. All the other Ignite dependencies would be automatically included. We also need MySQL JDBC driver, Spring JDBC dependencies to configure connection to database. They are required, because we are embedding Apache Ignite to the application and it has to establish connection with MySQL in order to be able to synchronize cache with database tables.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <scope>runtime</scope>
</dependency>
<dependency>
   <groupId>org.apache.ignite</groupId>
   <artifactId>ignite-spring-data</artifactId>
   <version>${ignite.version}</version>
</dependency>

3. Configure Ignite node

Using IgniteConfiguration class we are able to configure all available Ignite’s node settings. The most important thing here is a cache configuration (1). We should add primary key and entity classes as an indexed types (2). Then we have to enable export cache updates to database (3) and read data not found in a cache from database (4). The interaction between Ignite’s node and MySQL may be configured using CacheJdbcPojoStoreFactory class (5). We should pass there DataSource @Bean (6), dialect (7) and mapping between object fields and table columns (8).

@Bean
public Ignite igniteInstance() {
   IgniteConfiguration cfg = new IgniteConfiguration();
   cfg.setIgniteInstanceName("ignite-1");
   cfg.setPeerClassLoadingEnabled(true);

   CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache"); // (1)
   ccfg2.setIndexedTypes(Long.class, Contact.class); // (2)
   ccfg2.setWriteBehindEnabled(true);
   ccfg2.setWriteThrough(true); // (3)
   ccfg2.setReadThrough(true); // (4)
   CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>(); // (5)
   f2.setDataSource(datasource); // (6)
   f2.setDialect(new MySQLDialect()); // (7)
   JdbcType jdbcContactType = new JdbcType(); // (8)
   jdbcContactType.setCacheName("ContactCache");
   jdbcContactType.setKeyType(Long.class);
   jdbcContactType.setValueType(Contact.class);
   jdbcContactType.setDatabaseTable("contact");
   jdbcContactType.setDatabaseSchema("ignite");
   jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"), new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"), new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
   f2.setTypes(jdbcContactType);
   ccfg2.setCacheStoreFactory(f2);

   CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache");
   ccfg.setIndexedTypes(Long.class, Person.class);
   ccfg.setWriteBehindEnabled(true);
   ccfg.setReadThrough(true);
   ccfg.setWriteThrough(true);
   CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>();
   f.setDataSource(datasource);
   f.setDialect(new MySQLDialect());
   JdbcType jdbcType = new JdbcType();
   jdbcType.setCacheName("PersonCache");
   jdbcType.setKeyType(Long.class);
   jdbcType.setValueType(Person.class);
   jdbcType.setDatabaseTable("person");
   jdbcType.setDatabaseSchema("ignite");
   jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"), new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"), new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"), new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"), new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"), new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"), new JdbcTypeField(Types.DATE, "birth_date", Date.class, "birthDate"));
   f.setTypes(jdbcType);
   ccfg.setCacheStoreFactory(f);

   cfg.setCacheConfiguration(ccfg, ccfg2);
   return Ignition.start(cfg);
}

Here’s Spring datasource configuration for MySQL running as Docker container.

spring:
  datasource:
    name: mysqlds
    url: jdbc:mysql://192.168.99.100:33306/ignite?useSSL=false
    username: ignite
    password: ignite123

On that occasion it should be mentioned that Apache Ignite has still has some definencies. For example, it maps Enum to integer taking its ordinal value although it has configured VARCHAR as JDCB type. When reading such a row from database it is not mapped properly to Enum in object – you would have null in this response field.

new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type")

4. Model objects

Like I mentioned before we have two tables in the database schema. There are also two model classes and two cache configuration one per each model class. Here’s model class implementation. One of the few interesting things here is ID generation with AtomicLong class. It is one of basic Ignite’s component acting as sequence generator. We can also see a specific annotation @QuerySqlField, which marks the field as available for usage as a query parameter in SQL.

@QueryGroupIndex.List(
   @QueryGroupIndex(name="idx1")
)
public class Person implements Serializable {

   private static final long serialVersionUID = -1271194616130404625L;
   private static final AtomicLong ID_GEN = new AtomicLong();

   @QuerySqlField(index = true)
   private Long id;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 0)
   private String firstName;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 1)
   private String lastName;
   private Gender gender;
   private Date birthDate;
   private String country;
   private String city;
   private String address;
   private List<Contact> contacts = new ArrayList<>();

   public void init() {
	  this.id = ID_GEN.incrementAndGet();
   }

   public Long getId() {
	  return id;
   }

   public void setId(Long id) {
	  this.id = id;
   }

   public String getFirstName() {
	  return firstName;
   }

   public void setFirstName(String firstName) {
	  this.firstName = firstName;
   }

   public String getLastName() {
	  return lastName;
   }

   public void setLastName(String lastName) {
	  this.lastName = lastName;
   }

   public Gender getGender() {
	  return gender;
   }

   public void setGender(Gender gender) {
	  this.gender = gender;
   }

   public Date getBirthDate() {
	  return birthDate;
   }

   public void setBirthDate(Date birthDate) {
	  this.birthDate = birthDate;
   }

   public String getCountry() {
	  return country;
   }

   public void setCountry(String country) {
	  this.country = country;
   }

   public String getCity() {
	  return city;
   }

   public void setCity(String city) {
	  this.city = city;
   }

   public String getAddress() {
	  return address;
   }

   public void setAddress(String address) {
	  this.address = address;
   }

   public List<Contact> getContacts() {
	  return contacts;
   }

   public void setContacts(List<Contact> contacts) {
	  this.contacts = contacts;
   }

}

5. Ignite repositories

I assume that you are familiar with Spring Data JPA concept of creating repositories. A repository handling should be enabled on the main or @Configuration class.

@SpringBootApplication
@EnableIgniteRepositories
public class IgniteRestApplication {

   @Autowired
   DataSource datasource;

   public static void main(String[] args) {
	SpringApplication.run(IgniteRestApplication.class, args);
   }

   // ...
}

Then we have to extend our @Repository interface with base CrudRepository interface. It supports only inherited methods with id parameter. In the PersonRepository fragment visible below I defined some find methods using Spring Data naming convention and Ignite’s queries. In those samples you can see that we can return full object or selected fields as a query result – according to the needs.

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> {

	List<Person> findByFirstNameAndLastName(String firstName, String lastName);

	@Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
	List<Contact> selectContacts(String firstName, String lastName);

	@Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
	List<List<?>> selectContacts2(String firstName, String lastName);
}

6. API and testing

Finally, we can inject the repository beans to the REST controller classes. API would expose methods for adding new object to the cache, updating or removing existing objects and some for searching using the primary key or the other more complex indices.

@RestController
@RequestMapping("/person")
public class PersonController {

	private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

	@Autowired
	PersonRepository repository;

	@PostMapping
	public Person add(@RequestBody Person person) {
		person.init();
		return repository.save(person.getId(), person);
	}

	@PutMapping
	public Person update(@RequestBody Person person) {
		return repository.save(person.getId(), person);
	}

	@DeleteMapping("/{id}")
	public void delete(Long id) {
		repository.delete(id);
	}

	@GetMapping("/{id}")
	public Person findById(@PathVariable("id") Long id) {
		return repository.findOne(id);
	}

	@GetMapping("/{firstName}/{lastName}")
	public List<Person> findByName(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
		return repository.findByFirstNameAndLastName(firstName, lastName);
	}

	@GetMapping("/contacts/{firstName}/{lastName}")
	public List<Person> findByNameWithContacts(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
		List<Person> persons = repository.findByFirstNameAndLastName(firstName, lastName);
		List<Contact> contacts = repository.selectContacts(firstName, lastName);
		persons.stream().forEach(it -> it.setContacts(contacts.stream().filter(c -> c.getPersonId().equals(it.getId())).collect(Collectors.toList())));
		LOGGER.info("PersonController.findByIdWithContacts: {}", contacts);
		return persons;
	}

	@GetMapping("/contacts2/{firstName}/{lastName}")
	public List<Person> findByNameWithContacts2(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
		List<List<?>> result = repository.selectContacts2(firstName, lastName);
		List<Person> persons = new ArrayList<>();
		for (List<?> l : result) {
			persons.add(mapPerson(l));
		}
		LOGGER.info("PersonController.findByIdWithContacts: {}", result);
		return persons;
	}

	private Person mapPerson(List<?> l) {
		Person p = new Person();
		Contact c = new Contact();
		p.setId((Long) l.get(0));
		p.setFirstName((String) l.get(1));
		p.setLastName((String) l.get(2));
		c.setId((Long) l.get(3));
		c.setType((ContactType) l.get(4));
		c.setLocation((String) l.get(4));
		p.addContact(c);
		return p;
	}

}

It is certainly important to test the performance of the implementated solution, especially when it is related with in-memory data grid and databases. For that purpose I created some junit tests which put a large number of objects into the cache and then invoke some find methods using random input data to test queries performance. Here’s method which generates many Person and Contact objects and puts them into cache using API endpoints.

@Test
public void testAddPerson() throws InterruptedException {
	ExecutorService es = Executors.newCachedThreadPool();
	for (int j = 0; j < 10; j++) { es.execute(() -> {
		TestRestTemplate restTemplateLocal = new TestRestTemplate();
			Random r = new Random();
			for (int i = 0; i < 1000000; i++) {
				Person p = restTemplateLocal.postForObject("http://localhost:8090/person", createTestPerson(), Person.class);
				int x = r.nextInt(6);
				for (int k = 0; k < x; k++) {
					restTemplateLocal.postForObject("http://localhost:8090/contact", createTestContact(p.getId()), Contact.class);
				}
			}
		});
	}
	es.shutdown();
	es.awaitTermination(60, TimeUnit.MINUTES);
}

Spring Boot provides methods for capturing basic metrics of API response times. To enable that feature we have to include Spring Actuator to the dependencies. Metrics endpoint is available under http://localhost:8090/metrics address. In addition to each API method processing time it also prints such statistics like number of running threads or free memory.

7. Running application

Let’s run our sample application with embedded Apache Ignite’s node. Following some performance suggestions available in the Ignite’s docs I defined JVM configuration visible below.

java -jar -Xms512m -Xmx1024m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC target/ignite-rest-service-1.0-SNAPSHOT.jar

Now, we can run JUnit test class IgniteRestControllerTest. It puts some data into the cache and then calls find methods. The metrics for the tests with 1M Person objects and 2.5M Contact objects in the cache are visible below. All find methods have taken about 1ms on average.

{
"mem": 624886,
"mem.free": 389701,
"processors": 4,
"instance.uptime": 2446038,
"uptime": 2466661,
"systemload.average": -1,
"heap.committed": 524288,
"heap.init": 524288,
"heap.used": 133756,
"heap": 1048576,
"threads.peak": 107,
"threads.daemon": 25,
"threads.totalStarted": 565,
"threads": 80,
...
"gauge.response.person.contacts.firstName.lastName": 1,
"gauge.response.contact": 1,
"gauge.response.person.firstName.lastName": 1,
"gauge.response.contact.location.location": 1,
"gauge.response.person.id": 1,
"gauge.response.person": 0,
"counter.status.200.person.id": 1000,
"counter.status.200.person.contacts.firstName.lastName": 1000,
"counter.status.200.person.firstName.lastName": 1000,
"counter.status.200.contact": 2500806,
"counter.status.200.person": 1000000,
"counter.status.200.contact.location.location": 1000
}

Envoy Proxy with Microservices

Introduction

I came across Envoy proxy for the first time a couple weeks ago, when one of my blog readers suggested me to write an article about it. I had never heard about it before and my first thought was that it is not my area of experience. In fact, this tool is not as popular as its competition like nginx or haproxy, but it provides some interesting features among which we can distinguish out-of-the-box support for MongoDB, Amazon RDS, flexibility around discovery and load balancing or generating a lot of useful traffic statistics. Ok, we know a little about its advantages but what exactly is Envoy proxy? ‘Envoy is an open source edge and service proxy, designed for cloud-native applications’. It was originally developed by Lift as a high performance C++ distributed proxy designed for standalone services and applications, as well as for large microservices service mesh. It sounds really good right now. That’s why I decided to take a closer look on it and prepare a sample of service discovery and distributed tracing realized with Envoy and microservices based on Spring Boot.

Envoy Configuration

In the most of previous samples basing on Spring Cloud we have used Zuul as edge and proxy. Zuul is popular Netflix OSS tool acting as API Gateway in your microservices architecture. As it turns out, it can be successfully replaced by Envoy proxy. One of the things I really like in Envoy is the way to create configuration. The default format is JSON and is validated against JSON schema. This JSON properties and schema are documented well and can be easily understood. Just what you’d expect from modern solution the recomended way to get started with it is by using the pre-built Docker images. So, in the beginning we have to create Dockerfile for bulding Docker image with Envoy and provide configuration file in JSON format. Here’s my Dockerfile. Parameters service-cluster and service-node are optional and has to do with provided configuration for service discovery, which I’ll say more about in a minute.

FROM lyft/envoy:latest
RUN apt-get update
COPY envoy.json /etc/envoy.json
CMD /usr/local/bin/envoy -c /etc/envoy.json --service-cluster samplecluster --service-node sample1

I assume you have a basic knowledge about Docker and its commands, which is mandatory at this point. After providing envoy.json configuration file we can proceed with building Docker image.

docker build -t envoy:v1 .

Then just run it using docker run command. Useful ports should be exposes outside.

docker run -d --name envoy -p 9901:9901 -p 10000:10000 envoy:v1

The first pretty helpful feature is local HTTP administrator server. It can be configured in JSON file inside admin property. For the example purpose I selected port 9901 and as you probably noticed I also had exposed that port outside Envoy Docker container. Now, admin console is available under http://192.168.99.100:9901/. If you invoke that address it prints all available commands. For me the most helpful were stats, which print all important statistics related with proxy and logging, where I could changed logging level dynamically for some of defined categories. So, first if you had any problems with Envoy try to change logging level by calling /logging?name=level and watch them on Docker container after running docker logs envoy command.

"admin": {
    "access_log_path": "/tmp/admin_access.log",
    "address": "tcp://0.0.0.0:9901"
}

The next required configuration property is listeners. There we define routing settings and the address on which Envoy will listen for incoming TCP connection. The notation tcp://0.0.0.0:10000 is the wild card match for any IPv4 address with port 10000. This port is also exposed outside Envoy Docker container. In this case it will therefore be our API gateway available under http://192.168.99.100:10000/ address. We will come back to the proxy configuration details at a ltare stage and now let’s take a closer look on the architecture of presented example.

"listeners": [{
    "address": "tcp://0.0.0.0:10000",
    ...
}]

Architecture

The architecture of described solution is visible on the figure below. We have Envoy proxy as API Gateway, which is an entry point to our system. Envoy integrates with Zipkin and sends there tracing messages with information about incoming HTTP requests and responses sent back. Two sample microservices Person and Product register itself in service discovery on startup and deregister on shutdown. They are hidden from external clients behind API Gateway . Envoy has to fetch actual configuration with addresses of registered services and route incoming HTTP request properly. If there are multiple instances of each service available it should perform load balancing.

envoy-arch

As it turns out Envoy does not support well known discovery servers like Consul or Zookeeper, but defines its own generic REST based API, which needs to be implemented to enable cluster members fetching. The main method of this API is GET /v1/registration/:service used for fetching the list of currently registered instances of service. Lyft’s provides its default implementation in Python, but for the example purpose we develope our own solution using Java and Spring Boot. Sample application source code is available on GitHub. In addition to service discovery implementation you would also find there two sample microservices.

Service Discovery

Our custom discovery implementation does nothing more than exposing REST based API with methods for registration, unregistration and fetching service’s instances. GET method needs to return specific JSON structure which matches the following schema.

{
    "hosts": [{
        "ip_address": "...",
        "port": "...",
        ...
    }]
}

Here’s REST controller class with discovery API implementation.

@RestController
public class EnvoyDiscoveryController {

    private static final Logger LOGGER = LoggerFactory.getLogger(EnvoyDiscoveryController.class);

    private Map<String, List<DiscoveryHost>> hosts = new HashMap<>();

    @GetMapping(value = "/v1/registration/{serviceName}")
    public DiscoveryHosts getHostsByServiceName(@PathVariable("serviceName") String serviceName) {
        LOGGER.info("getHostsByServiceName: service={}", serviceName);
        DiscoveryHosts hostsList = new DiscoveryHosts();
        hostsList.setHosts(hosts.get(serviceName));
        LOGGER.info("getHostsByServiceName: hosts={}", hostsList);
        return hostsList;
    }

    @PostMapping("/v1/registration/{serviceName}")
    public void addHost(@PathVariable("serviceName") String serviceName, @RequestBody DiscoveryHost host) {
        LOGGER.info("addHost: service={}, body={}", serviceName, host);
        List<DiscoveryHost> tmp = hosts.get(serviceName);
        if (tmp == null)
            tmp = new ArrayList<>();
        tmp.add(host);
        hosts.put(serviceName, tmp);
    }

    @DeleteMapping("/v1/registration/{serviceName}/{ipAddress}")
    public void deleteHost(@PathVariable("serviceName") String serviceName, @PathVariable("ipAddress") String ipAddress) {
        LOGGER.info("deleteHost: service={}, ip={}", serviceName, ipAddress);
        List<DiscoveryHost> tmp = hosts.get(serviceName);
        if (tmp != null) {
            Optional<DiscoveryHost> optHost = tmp.stream().filter(it -> it.getIpAddress().equals(ipAddress)).findFirst();
            if (optHost.isPresent())
                tmp.remove(optHost.get());
            hosts.put(serviceName, tmp);
        }
    }

}

Let’s get back to the Envoy configuration settings. Assuming we have built an image from Dockerfile visible below and then ran the container on default port we can invoke it under address http://192.168.99.100:9200. That address should be placed in envoy.json configuration file. Service discovery connection settings should be provided inside Cluster Manager section.

FROM openjdk:alpine
MAINTAINER Piotr Minkowski <piotr.minkowski@gmail.com>
ADD target/envoy-discovery.jar envoy-discovery.jar
ENTRYPOINT ["java", "-jar", "/envoy-discovery.jar"]
EXPOSE 9200

Here’s fragment from envoy.json file. Cluster for service discovery should be defined as a global SDS configuration, which must be specified inside sds property (1). The most important thing is to provide a correct URL (2) and on the basis of that Envoy automatically tries to call endpoint GET /v1/registration/{service_name}. The last interesting configuration field for that section is refresh_delay_ms, which is responsible for setting a delay between fetches a list of services registered in a discovery server. That’s not all. We also have to define cluster members. They are identified by the name (4). Their type is sds (5), what means that this cluster uses service discovery server for locating network addresses of calling microservice with the name defined in the service-name property.

"cluster_manager": {
    "clusters": [{
        "name": "service1", (4)
        "type": "sds", // (5)
	"connect_timeout_ms": 5000,
	"lb_type": "round_robin",
	"service_name": "person-service" // (6)
    }, {
        "name": "service2",
        "type": "sds",
        "connect_timeout_ms": 5000,
        "lb_type": "round_robin",
        "service_name": "product-service"
    }],
    "sds": { // (1)
	"cluster": {
		"name": "service_discovery",
		"type": "strict_dns",
		"connect_timeout_ms": 5000,
		"lb_type": "round_robin",
		"hosts": [{
			"url": "tcp://192.168.99.100:9200" // (2)
		}]
	},
	"refresh_delay_ms": 3000 // (3)
    }
}

Routing configuration is defined for every single listener inside route_config property (1). The first route is configured for person-service, which is processing by cluster service1 (2), second for product-service processing by service2 cluster. So, our services are available under http://192.168.99.100:10000/person and http://192.168.99.100:10000/product adresses.

{
    "name": "http_connection_manager",
    "config": {
        "codec_type": "auto",
        "stat_prefix": "ingress_http",
        "route_config": { // (1)
            "virtual_hosts": [{
		"name": "service",
		"domains": ["*"],
		"routes": [{
			"prefix": "/person", // (2)
			"cluster": "service1"
		}, {
			"prefix": "/product", // (3)
			"cluster": "service2"
		}]
            }]
        },
	"filters": [{
		"name": "router",
		"config": {}
        }]
    }
}

Building Microservices

The routing on Envoy proxy has been already configured. We still don’t have running microservices. Their implementation is based on Spring Boot framework and do nothing more than expose REST API providing simple operations on the object’s list and registering/unregistering service on discovery server. Here’s @Service bean responsible for that registration. The onApplicationEvent method is fired after application startup and destroy method just before gracefully shutdown.

@Service
public class PersonRegister implements ApplicationListener<ApplicationReadyEvent> {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonRegister.class);

    private String ip;
    @Value("${server.port}")
    private int port;
    @Value("${spring.application.name}")
    private String appName;
    @Value("${envoy.discovery.url}")
    private String discoveryUrl;

    @Autowired
    RestTemplate template;

	@Override
	public void onApplicationEvent(ApplicationReadyEvent event) {
		LOGGER.info("PersonRegistration.register");
		try {
			ip = InetAddress.getLocalHost().getHostAddress();
			DiscoveryHost host = new DiscoveryHost();
			host.setPort(port);
			host.setIpAddress(ip);
			template.postForObject(discoveryUrl + "/v1/registration/{service}", host, DiscoveryHosts.class, appName);
		} catch (Exception e) {
			LOGGER.error("Error during registration", e);
		}
	}

	@PreDestroy
	public void destroy() {
		try {
			template.delete(discoveryUrl + "/v1/registration/{service}/{ip}/", appName, ip);
			LOGGER.info("PersonRegister.unregistered: service={}, ip={}", appName, ip);
		} catch (Exception e) {
			LOGGER.error("Error during unregistration", e);
		}
	}

}

The best way to shutdown Spring Boot application gracefully is by its Actuator endpoint. To enable such endpoints for the service include spring-boot-starter-actuator to your project dependencies. Shutdown is disabled by default, so we should add the following properties to application.yml to enable it and additionally disable default security (endpoints.shutdown.sensitive=false). Now, just by calling POST /shutdown we can stop our Spring Boot application and test unregister method.

endpoints:
  shutdown:
    enabled: true
    sensitive: false

Same as before for microservices we also build docker images. Here’s person-service Dockerfile, which allows to override default service and SDS port.

FROM openjdk:alpine
MAINTAINER Piotr Minkowski <piotr.minkowski@gmail.com>
ADD target/person-service.jar person-service.jar
ENV DISCOVERY_URL http://192.168.99.100:9200
ENTRYPOINT ["java", "-jar", "/person-service.jar"]
EXPOSE 9300

To build image and run container of the service with custom listen port type the following docker commands.

docker build -t piomin/person-service .
docker run -d --name person-service -p 9301:9300 piomin/person-service

Distributed Tracing

It is time for the last piece of the puzzle – Zipkin tracing. Statistics related to all incoming requests should be sent there. The first part of configuration in Envoy proxy is inside tracing property which specifies global settings for the HTTP tracer.

"tracing": {
    "http": {
        "driver": {
            "type": "zipkin",
            "config": {
                "collector_cluster": "zipkin",
                "collector_endpoint": "/api/v1/spans"
            }
        }
    }
}

Network location and settings for Zipkin connection should be defined as a cluster member.

"clusters": [{
    "name": "zipkin",
    "connect_timeout_ms": 5000,
    "type": "strict_dns",
    "lb_type": "round_robin",
    "hosts": [
      {
        "url": "tcp://192.168.99.100:9411"
      }
    ]
}]

We should also add new section tracing in HTTP connection manager configuration (1). Field operation_name is required and sets a span name. Only ‘ingress’ and ‘egress’ values are supported.

"listeners": [{
	"filters": [{
        "name": "http_connection_manager",
        "config": {
			"tracing": { // (1)
				"operation_name": "ingress" // (2)
			}
			// ...
		}
	}]
}]

Zipkin server can be started using its Docker image.

docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin

Summary

Here’s a list of running Docker containers for the test purpose. As you probably remember we have Zipkin, Envoy, custom discovery, two instances of person-service and one of product-service. You can add some person objects by calling POST /person and that display a list of all persons by calling GET /person. The requests should be load balanced between two instances basing on entries in the service discovery.

envoy-1

Information about every request is sent to Zipkin with a service name taken –service-cluster Envoy proxy running parameter.

envoy-2

Building Secure APIs with Vert.x and OAuth2

Preface

Today I would like to get back to the subject touched on in the one of my previous articles – Vert.x toolkit. In the post Asynchronous Microservices With Vert.x I described how to develop microservices using Vert.x modules for web application, service discovery, circuit breaker and distributed configuration. I did not mentioned there anything about security aspects, which are usually important when talking about open APIs. It is a time to take a closer look on some Vert.x modules for authentication and authorization. Following description available on vert.io site it provides some simple out of the box implementations for authentication in our applications. There are modules providing auth implementation backed by JDBC, MongoDB and also some supporting solutions like JSON web tokens (JWT), Apache Shiro and OAuth2. Like you probably know OAuth2 is the most common authentication method for APIs provided by Facebook, Twitter or LinkedIn. If you are interested in more details about that authentication method read my article Microservices security with Oauth2, where I described the basics and introduced the simple sample with Spring Security in conjunction with OAuth2 usage.

In the sample application which is available on GitHub under security branch I’m going to present how to provide Oauth2 security for Vertx application using Keycloak and Vert.x OAuth2 module.

Keycloak

For authentication and authorization management we use Keycloak. It is an open source identity and access management solution, which provides mechanisms supporting i.a. OAuth2. Keycloak has web admin console where administrators can manage all aspects of the server. We can easily run it using docker container.

docker run -d --name keycloak -p 38080:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin -e KEYCLOAK_LOGLEVEL=DEBUG jboss/keycloak

Management dashboard is available under http://192.168.99.100:38080/. Let’s begin from creating Client. Client will be used by our application (or rather service) for authenticate itself against Keycloak. In the first step we have to set Client ID and Root URL. Root URL is not needed while using OAuth2 Password Credentials Flow, but rather for Authorization Code Flow. I put there our sample application localhost address.

vertx-sec-1

We should enable options Direct Access Grants and Authorization in the Settings section of newly created client. Also Access Type should be set to confidential and Valid Redirect URIs to the callback address routed inside application (it is explained in the later section).

vertx-sec-2

The last information needed from Client section is a Secret available under Credentials tab.

vertx-sec-3

Now we can proceed to create user with credentials. In the sample I’ll present in the next section we use password credentials flow, so don’t forget to change password on newly created user.

vertx-sec-5

vertx-sec-7

Finally, we set authorities for our user. First, let’s create some roles in Roles section. For me it is view-account, modify-account. For these roles I also enabled Scope Param Required. It means that if client need to obtain that authority it has to send role name in the request scope.

vertx-sec-4

The last step is to assign the roles to our test user piotr.minkowski.

vertx-sec-6

Building application

Vert.x provides the module supporting OAuth2 authorization. We should include the following dependency into our pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-auth-oauth2</artifactId>
	<version>${vertx.version}</version>
</dependency>

We have to begin from defining Keycloak OAuth2Auth provider. We use default realm (1). In additional to the realm name we should set realm public key (2) which is available in the Realm Settings section under Keys tab. We should also set Keycloak Client ID (3) as resource and client secret as credentials (4).

JsonObject keycloakJson = new JsonObject()
	.put("realm", "master") // (1)
	.put("realm-public-key", "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1xVBifXfS1uVM8S14JlyLpXck+0+hBQX258IiL5Fm2rZpkQ5lN9N1tadQdXBKk8V/0SxdTyoX7cpYQkcOs0Rj0XXmX7Lnk56euZwel+3MKAZWA20ld8BCfmDtX4/+VP311USUqR/W8Fd2p/gugKWF6VDMkri92qob1DdrcUiRlD8XYC0pwHwSvyW/3JvE5HeTy3U4vxC+19wHcwzLGNlVOlYPk9mzJHXN+LhZr/Tc7HeAsvVxYDXwOOh+/UWweMkvKy+OSNKG3aWLb92Ni3HejFn9kd4TRHfaapwWg1m5Duf3uqz8WDHbS/LeS4g3gQS0SvcCYI0huSoG3NA/z4K7wIDAQAB") // (2)
	.put("auth-server-url", "http://192.168.99.100:38080/auth")
	.put("ssl-required", "external")
	.put("resource", "vertx-account") // (3)
	.put("credentials", new JsonObject().put("secret", "73b55e04-e562-41ea-b39c-263b7b36945d")); // (4)

OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);

vertx-sec-8

I exposed API method for login which retrieves token from Keycloak using OAuth2FlowType.PASSWORD authentication method.

router.post("/login").produces("application/json").handler(rc -> {
	User u = Json.decodeValue(rc.getBodyAsString(), User.class);
	oauth2.getToken(u.toJson(), res -> {
		if (res.failed()) {
			LOGGER.error("Access token error: {}", res.cause().getMessage());
			rc.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
		} else {
			AccessToken token = res.result();
			LOGGER.info("Access Token: {}", KeycloakHelper.rawAccessToken(token.principal()));
			User user = new User(KeycloakHelper.rawAccessToken(token.principal()));
			rc.response().end(user.toString());
		}
	});
});

I sent the following message to POST /login service.

{"username":"piotr.minkowski", "password":"Piot_123", "scope":"modify-account view-account"}

That is an equivalent to the following Vert.x JsonObject passed as a parameter to OAuth2 getToken method.

new JsonObject().put("username", "piotr.minkowski").put("password", "Piot_123").put("scope", "modify-account view-account")

POST /login method return access token inside JSON object. That token should be passed as Authorization header parameter for every call of a protected resource. Here’s main class with API methods definitions. We begin from creating OAuth2AuthHandler object (1) which is responsible for token validation. It takes OAuth2Auth Keycloak object as a parameter. Then we should set OAuth2AuthHandler as a handler for all methods matching /account/* path (2). If token has been successfully validated we can proceed to authorization. We check if view-account role is assigned to user when calling GET method (3), and modify-account role when calling POST method (4). If using Keycloak for authorization we always have to set prefix to “realm” while invoking isAuthorised method. If the role is realm then the lookup happens in global roles list.

OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);
OAuth2AuthHandler oauth2Handler = (OAuth2AuthHandler) OAuth2AuthHandler.create(oauth2, "http://localhost:2222"); // (1)
Router router = Router.router(vertx);
router.route("/account/*").handler(ResponseContentTypeHandler.create());
router.route("/account/*").handler(oauth2Handler); // (2)
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create());
router.route(HttpMethod.POST, "/login").handler(BodyHandler.create());
oauth2Handler.setupCallback(router.get("/callback"));
router.get("/account/:id").produces("application/json").handler(rc -> {
	rc.user().isAuthorised("realm:view-account", authRes -> { // (3)
		LOGGER.info("Auth: {}", authRes.result());
		if (authRes.result() == Boolean.TRUE) {
			repository.findById(rc.request().getParam("id"), res -> {
				Account account = res.result();
				LOGGER.info("Found: {}", account);
				rc.response().end(account.toString());
			});
		} else {
			rc.response().setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()).end();
		}
	});
});
router.post("/account").produces("application/json").handler(rc -> {
	rc.user().isAuthorised("realm:modify-account", authRes -> { // (4)
		LOGGER.info("Auth: {}", authRes.result());
		if (authRes.result() == Boolean.TRUE) {
			Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
			repository.save(a, res -> {
				Account account = res.result();
				LOGGER.info("Created: {}", account);
				rc.response().end(account.toString());
			});
		} else {
			rc.response().setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()).end();
		}
	});
});

Testing

I created JUnit test case to check if OAuth2 authentication works fine. Vert.x provides library which can be used for testing. It is especially design to work well with asynchronous code. Include the following dependency to your pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-unit</artifactId>
	<version>${vertx.version}</version>
	<scope>test</scope>
</dependency>

Then annotate your JUnit test class with @RunWith(VertxUnitRunner.class). Before running our test method we should deploy verticles. Verticle with REST API is deployed on port 2222.

Vertx vertx;

@Before
public void before(TestContext context) throws IOException {
	vertx = Vertx.vertx();
	vertx.deployVerticle(MongoVerticle.class.getName(), context.asyncAssertSuccess());
	DeploymentOptions options = new DeploymentOptions().setConfig(new JsonObject().put("http.port", 2222));
	vertx.deployVerticle(AccountServer.class.getName(), options, context.asyncAssertSuccess());
}

Here’s JUnit test method. We use WebClient for calling HTTP methods and Vert.x-Unit Async for complete test case on asynchronous calls finishes (3). First, we are calling POST \login method te retrieve access token from Keycloak (1). Then we are calling one of API method and setting Authorization header with access token string retrieved from POST \login method (2). During test case execution verticle with MongoDB (MongoVerticle) and API definition (AccountServer) are deployed and started, but you need to start manually MongoDB database, Consul and Keycloak. I suggest running it with Docker.

@Test
public void testAuth(TestContext context) {
	Async async = context.async();
	WebClient client = WebClient.create(vertx);
	User u = new User("piotr.minkowski", "Piot_123", "modify-account view-account");
	client.post(2222, "localhost", "/login").sendJson(u, ar -> { // (1)
		LOGGER.info("Response code: {}", ar.result().statusCode());
		LOGGER.info("Response: {}", ar.result().bodyAsString());
		if (ar.result().statusCode() == 200) {
			User user = ar.result().bodyAsJson(User.class);
			client.get(2222, "localhost", "/account").putHeader("Authorization", "Bearer " + user.getAccessToken()).send(r -> { // (2)
				LOGGER.info("GET result: {}", r.result().bodyAsString());
				async.complete(); // (3)
			});
		} else {
			async.complete();
		}
	});
}

Final Thoughts

To be honest I have never dealt with Vert.x before the start of work on a series of articles published on my blog. From those couple of days spending on that toolkit recognition I’ll definitely recommend using it when working on REST APIs. Vert.x provides the smart implementation for security with OAuth2. Additionally you can use it in combination with the solution like Keycloak, which is used for identity and access management. As usual there are also some drawbacks. I had a problem with understanding how the authorities exactly work in Vert.x. When I created a role inside Keycloak client it didn’t work in my application. Only global realm role worked fine. However, those problems does not overshadow Vert.x advantages.