Partitioning with Apache Kafka and Vert.x

Apache Kafka is a distributed streaming platform. It also may act as a messaging system in your architecture. Traditional message brokers provides two models of communication: queuing and publish-subscribe (topics). Queues are used for point-to-point messaging, while topics allows you broadcast data to multiple target consumers. Kafka does not provide queuing mechanism directly. However, it introduces the consumer group concept, which generalizes both queuing and publish-subscribe models. The consumer group mechanisms guarantees that a single message would be processed by the only one consumer that belongs to the given group. It is especially useful when you have more than one instance of your service, which listens for messages incoming to the topic. That feature makes your consumers to behave as queuing clients within the same group.

Eclipse Vert.x is a lightweight and fast toolkit for building reactive applications on the JVM. I have already introduced that solution is the some of my previous posts, for example Asynchronous Microservices with Vert.x. Vert.x does not force you to implement a reactive application. You may create a standard service, which processes the HTTP requests asynchronously in accordance with Asynchronous I/O concept.

The purpose of this article

The main purpose of this article is to show you the main features of Apache Kafka, that may be useful when creating applications consuming messages. The Java client’s library choice is not a key point here. However, in my opinion Vert.x that is asynchronous, high performance framework perfectly matches to Apache Kafka. It provides Vert.x Kafka client, which allows you to read and send messages from/to an Kafka cluster. Before we proceed to the sample, let’s first dive into the core abstraction of Kafka.

Kafka topic

I’m assuming you excellent know what topic is and what is its main role. The every message incoming to the topic goes to every subscriber. What is the main difference between Kafka and standard topic provided by other message brokers? Kafka topic is partitioned. Each partition is an ordered, immutable sequence of records. Every record can be uniquecly identified within the partition by a sequential id number called the offset. The Kafka cluster retains all published records according to the configured retention period.

Consumer may subscribe to the whole topic or only to the selected partition. It can also control the offset from where it starts processing data. For example, it is able to reset offset in order reprocess data from the past or just or skip ahead to the most recent record to consume only messages currently sent to the topic. Here’s the figure that illustrates a single partition structure with producer and consumers listening for the incoming data.

Kafka-1

Sample architecture

Let me say some words about the sample system architecture. Its source code is available on GitHub (https://github.com/piomin/sample-vertx-kafka-messaging.git). In accordance of the principle that one picture speaks more than a thousand words, the diagram illustrating the architecture of our system is visible below. We have one topic created on Kafka platform, that consists of two partitions. There is one client application that exposes REST API allowing to send orders into the system and then forwarding them into the topic. The target partition is calculated basing on the type of order. We may create orders with types SINGLE and MULTIPLE. There are also some applications that consumes data from topic. First of them single-order-processor reads data from partition 0, the second multiple-order-processor from partition 1, and the last all-order-processor does not choose any partition.

kafka-2

Running Kafka

To run Apache Kafka on the local machine we may use its Docker image. The image shared by Spotify also starts ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100.

docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.99.100 --env ADVERTISED_PORT=9092 spotify/kafka

However, that option assumes the topics would be automatically created during application startup. I’ve got some problems with it while creating multi-partitions topic. There is also another image ches/kafka, which requires starting ZooKeeper separately, but provides Kafka client interface.

docker run -d --name zookeeper -p 2181:2181 zookeeper
docker run -d --name kafka -p 9092:9092 -p 7203:7203 --network kafka-net --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env ZOOKEEPER_IP=192.168.99.100 ches/kafka

Finally, we can run ches/kafka container in client mode and then create topic orders-out with two partitions.

docker run --rm --network kafka-net ches/kafka kafka-topics.sh --create --topic orders-out --replication-factor 1 --partitions 2 --zookeeper 192.168.99.100:2181
Created topic "orders-out".

Building producer application

First, we need to include Maven dependencies to enable Vert.x framework for the application. If the application exposes RESTful HTTP API you should include vertx-web. Library vertx-kafka-client has to be included to all the sample modules.

To start Vert.x as Java application we have to create verticle by extending AbstractVerticle. Then the verticle needs to be deployed in the main method using Vertx object. For more details about Vert.x and verticles concept you may refer to one of my previous article mentioned in the preface.

public class OrderVerticle extends AbstractVerticle {

	public static void main(String[] args) {
		Vertx vertx = Vertx.vertx();
		vertx.deployVerticle(new OrderVerticle());
	}

}

The next step is to define producer using KafkaProducer interface. We have to provide connection settings and serializer implementation class. You can choose between various built-in serializer implemementations. The most suitable for me was JsonObjectSerializer, which requires JsonObject as an input parameter.

Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer producer = KafkaProducer.create(vertx, config);

The procuder is invoked inside POST method route definition. It returnes an asynchronous response with a status after sending message to the topic. The message is created using KafkaProducerRecord interface. It takes topic’s name, request object and partition number as the parameters. As you may see in the fragment of code below, partition number is calculated on the basis order type (o.getType().ordinal()).

Router router = Router.router(vertx);
router.route("/order/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/order").handler(BodyHandler.create());
router.post("/order").produces("application/json").handler(rc -> {
	Order o = Json.decodeValue(rc.getBodyAsString(), Order.class);
	KafkaProducerRecord record = KafkaProducerRecord.create("orders", null, rc.getBodyAsJson(), o.getType().ordinal());
	producer.write(record, done -> {
		if (done.succeeded()) {
			RecordMetadata recordMetadata = done.result();
			LOGGER.info("Record sent: msg={}, destination={}, partition={}, offset={}", record.value(), recordMetadata.getTopic(), recordMetadata.getPartition(), recordMetadata.getOffset());
			o.setId(recordMetadata.getOffset());
			o.setStatus(OrderStatus.PROCESSING);
		} else {
			Throwable t = done.cause();
			LOGGER.error("Error sent to topic: {}", t.getMessage());
			o.setStatus(OrderStatus.REJECTED);
		}
		rc.response().end(Json.encodePrettily(o));
	});
});
vertx.createHttpServer().requestHandler(router::accept).listen(8090);

Building consumer applications

The consumer configuration is very similar to that for producer. We also have to set connection settings and class using for deserialization. There is one interesting setting, which has been defined for the consumer in the fragment of code visible below. It is auto.offset.reset (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG). It sets the initial offset in Kafka for the customer during initialization. If you would like to read all records from the beginning of stream use value earliest. If you would like to processes only the newest records (received after application startup) set that property to latest. Because in our case Kafka acts as a message broker, it is set to latest.

Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = KafkaConsumer.create(vertx, config);

As you probably remember we have three different application that subscribe to the topic. The first of them, implemented under the module all-order-processor consumes all the events incoming to the the topic. This implemementation is relatively the simplest. We only need to invoke subscribe method and pass the name of topic as a parameter. Then every incoming message is processed by handler method.

consumer.subscribe("orders-out", ar -> {
	if (ar.succeeded()) {
		LOGGER.info("Subscribed");
	} else {
		LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
	}
});

consumer.handler(record -> {
	LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
	Order order = Json.decodeValue(record.value(), Order.class);
	order.setStatus(OrderStatus.DONE);
	LOGGER.info("Order processed: id={}, price={}", order.getId(), order.getPrice());
});

The implementation of consuming method for the other applications is a little more complicated. Besides defining target topic, every consumer can ask for a specific partition. The application multiple-order-processor subscribes to partition 1, while multiple-order-processor to partition 0.

TopicPartition tp = new TopicPartition().setPartition(1).setTopic("orders-out");
consumer.assign(tp, ar -> {
	if (ar.succeeded()) {
		LOGGER.info("Subscribed");
		consumer.assignment(done1 -> {
			if (done1.succeeded()) {
				for (TopicPartition topicPartition : done1.result()) {
					LOGGER.info("Partition: topic={}, number={}", topicPartition.getTopic(), topicPartition.getPartition());
				}
			} else {
				LOGGER.error("Could not assign partition: err={}", done1.cause().getMessage());
			}
		});
	} else {
		LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
	}
});

The implamentation of handle method inside multiple-order-processor is pretty interesting. If it receives order with non-empty field relatedOrderId it tries to find it in the historical records stored in topic. It may achieved by calling seek method on KafkaConsumer.

consumer.handler(record -> {
	LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
	Order order = Json.decodeValue(record.value(), Order.class);
	if (ordersWaiting.containsKey(record.offset())) {
		LOGGER.info("Related order found: id={}, price={}", order.getId(), order.getPrice());
		LOGGER.info("Current price: price={}", order.getPrice() + ordersWaiting.get(record.offset()).getPrice());
		consumer.seekToEnd(tp);
	}

	if (order.getRelatedOrderId() != null && !ordersWaiting.containsKey(order.getRelatedOrderId())) {
		ordersWaiting.put(order.getRelatedOrderId(), order);
		consumer.seek(tp, order.getRelatedOrderId());
	}
});

Testing

Now it is time to launch our applications. You may run the main classes from your IDE or build the whole project using mvn clean install command and then run it with java -jar. Also run two instances of all-order-processor in order to check out how a consumer groups mechanism works in practice.

Let’s send some test requests to the order-service in the following sequence.

curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":200}' http://localhost:8090/order
{"id":0,"type":"SINGLE","status":"PROCESSING","price":200}
curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":300}' http://localhost:8090/order
{"id":1,"type":"SINGLE","status":"PROCESSING","price":300}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":400}' http://localhost:8090/order
{"id":0,"type":"MULTIPLE","status":"PROCESSING","price":400}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId" :0}' http://localhost:8090/order
{"id":1,"type":"MULTIPLE","status":"PROCESSING","price":500}

Here’s log from producer application.

2018-01-30 11:08:48 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":200}, destination=orders-out, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":300}, destination=orders-out, partition=0, offset=1
2018-01-30 11:09:08 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":400}, destination=orders-out, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, destination=orders-out, partition=1, offset=1

Here’s log from single-order-processor. It has processed only messages from partition 0.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from multiple-order-processor. It has processed only messages from partition 1.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Here’s log from first instance of all-order-processor.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from second instance of all-order-processor. It may be a little bit surprising for you. But, if you run two instances of consumer, which listens for the whole topic each instance would process message from the single partition.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Summary

In this article I was trying to give you a little bit of messaging with Apache Kafka. Such concepts like consumer groups or partitioning are something what makes it different from traditional messaging solutions. Kafka is widely adopted product, which can acts as storage, messaging system or stream processor. Together with popular JVM based toolkit Vert.x it may be really powerful, fast and lightweight solution for your applications that exchanges messages between each other. The key concepts introduced by Kafka has been adopted by Spring Cloud Stream, which makes them as an opinionated choice for creating messaging microservices.

Advertisements

Building Secure APIs with Vert.x and OAuth2

Preface

Today I would like to get back to the subject touched on in the one of my previous articles – Vert.x toolkit. In the post Asynchronous Microservices With Vert.x I described how to develop microservices using Vert.x modules for web application, service discovery, circuit breaker and distributed configuration. I did not mentioned there anything about security aspects, which are usually important when talking about open APIs. It is a time to take a closer look on some Vert.x modules for authentication and authorization. Following description available on vert.io site it provides some simple out of the box implementations for authentication in our applications. There are modules providing auth implementation backed by JDBC, MongoDB and also some supporting solutions like JSON web tokens (JWT), Apache Shiro and OAuth2. Like you probably know OAuth2 is the most common authentication method for APIs provided by Facebook, Twitter or LinkedIn. If you are interested in more details about that authentication method read my article Microservices security with Oauth2, where I described the basics and introduced the simple sample with Spring Security in conjunction with OAuth2 usage.

In the sample application which is available on GitHub under security branch I’m going to present how to provide Oauth2 security for Vertx application using Keycloak and Vert.x OAuth2 module.

Keycloak

For authentication and authorization management we use Keycloak. It is an open source identity and access management solution, which provides mechanisms supporting i.a. OAuth2. Keycloak has web admin console where administrators can manage all aspects of the server. We can easily run it using docker container.

docker run -d --name keycloak -p 38080:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin -e KEYCLOAK_LOGLEVEL=DEBUG jboss/keycloak

Management dashboard is available under http://192.168.99.100:38080/. Let’s begin from creating Client. Client will be used by our application (or rather service) for authenticate itself against Keycloak. In the first step we have to set Client ID and Root URL. Root URL is not needed while using OAuth2 Password Credentials Flow, but rather for Authorization Code Flow. I put there our sample application localhost address.

vertx-sec-1

We should enable options Direct Access Grants and Authorization in the Settings section of newly created client. Also Access Type should be set to confidential and Valid Redirect URIs to the callback address routed inside application (it is explained in the later section).

vertx-sec-2

The last information needed from Client section is a Secret available under Credentials tab.

vertx-sec-3

Now we can proceed to create user with credentials. In the sample I’ll present in the next section we use password credentials flow, so don’t forget to change password on newly created user.

vertx-sec-5

vertx-sec-7

Finally, we set authorities for our user. First, let’s create some roles in Roles section. For me it is view-account, modify-account. For these roles I also enabled Scope Param Required. It means that if client need to obtain that authority it has to send role name in the request scope.

vertx-sec-4

The last step is to assign the roles to our test user piotr.minkowski.

vertx-sec-6

Building application

Vert.x provides the module supporting OAuth2 authorization. We should include the following dependency into our pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-auth-oauth2</artifactId>
	<version>${vertx.version}</version>
</dependency>

We have to begin from defining Keycloak OAuth2Auth provider. We use default realm (1). In additional to the realm name we should set realm public key (2) which is available in the Realm Settings section under Keys tab. We should also set Keycloak Client ID (3) as resource and client secret as credentials (4).

JsonObject keycloakJson = new JsonObject()
	.put("realm", "master") // (1)
	.put("realm-public-key", "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1xVBifXfS1uVM8S14JlyLpXck+0+hBQX258IiL5Fm2rZpkQ5lN9N1tadQdXBKk8V/0SxdTyoX7cpYQkcOs0Rj0XXmX7Lnk56euZwel+3MKAZWA20ld8BCfmDtX4/+VP311USUqR/W8Fd2p/gugKWF6VDMkri92qob1DdrcUiRlD8XYC0pwHwSvyW/3JvE5HeTy3U4vxC+19wHcwzLGNlVOlYPk9mzJHXN+LhZr/Tc7HeAsvVxYDXwOOh+/UWweMkvKy+OSNKG3aWLb92Ni3HejFn9kd4TRHfaapwWg1m5Duf3uqz8WDHbS/LeS4g3gQS0SvcCYI0huSoG3NA/z4K7wIDAQAB") // (2)
	.put("auth-server-url", "http://192.168.99.100:38080/auth")
	.put("ssl-required", "external")
	.put("resource", "vertx-account") // (3)
	.put("credentials", new JsonObject().put("secret", "73b55e04-e562-41ea-b39c-263b7b36945d")); // (4)

OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);

vertx-sec-8

I exposed API method for login which retrieves token from Keycloak using OAuth2FlowType.PASSWORD authentication method.

router.post("/login").produces("application/json").handler(rc -> {
	User u = Json.decodeValue(rc.getBodyAsString(), User.class);
	oauth2.getToken(u.toJson(), res -> {
		if (res.failed()) {
			LOGGER.error("Access token error: {}", res.cause().getMessage());
			rc.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
		} else {
			AccessToken token = res.result();
			LOGGER.info("Access Token: {}", KeycloakHelper.rawAccessToken(token.principal()));
			User user = new User(KeycloakHelper.rawAccessToken(token.principal()));
			rc.response().end(user.toString());
		}
	});
});

I sent the following message to POST /login service.

{"username":"piotr.minkowski", "password":"Piot_123", "scope":"modify-account view-account"}

That is an equivalent to the following Vert.x JsonObject passed as a parameter to OAuth2 getToken method.

new JsonObject().put("username", "piotr.minkowski").put("password", "Piot_123").put("scope", "modify-account view-account")

POST /login method return access token inside JSON object. That token should be passed as Authorization header parameter for every call of a protected resource. Here’s main class with API methods definitions. We begin from creating OAuth2AuthHandler object (1) which is responsible for token validation. It takes OAuth2Auth Keycloak object as a parameter. Then we should set OAuth2AuthHandler as a handler for all methods matching /account/* path (2). If token has been successfully validated we can proceed to authorization. We check if view-account role is assigned to user when calling GET method (3), and modify-account role when calling POST method (4). If using Keycloak for authorization we always have to set prefix to “realm” while invoking isAuthorised method. If the role is realm then the lookup happens in global roles list.

OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);
OAuth2AuthHandler oauth2Handler = (OAuth2AuthHandler) OAuth2AuthHandler.create(oauth2, "http://localhost:2222"); // (1)
Router router = Router.router(vertx);
router.route("/account/*").handler(ResponseContentTypeHandler.create());
router.route("/account/*").handler(oauth2Handler); // (2)
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create());
router.route(HttpMethod.POST, "/login").handler(BodyHandler.create());
oauth2Handler.setupCallback(router.get("/callback"));
router.get("/account/:id").produces("application/json").handler(rc -> {
	rc.user().isAuthorised("realm:view-account", authRes -> { // (3)
		LOGGER.info("Auth: {}", authRes.result());
		if (authRes.result() == Boolean.TRUE) {
			repository.findById(rc.request().getParam("id"), res -> {
				Account account = res.result();
				LOGGER.info("Found: {}", account);
				rc.response().end(account.toString());
			});
		} else {
			rc.response().setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()).end();
		}
	});
});
router.post("/account").produces("application/json").handler(rc -> {
	rc.user().isAuthorised("realm:modify-account", authRes -> { // (4)
		LOGGER.info("Auth: {}", authRes.result());
		if (authRes.result() == Boolean.TRUE) {
			Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
			repository.save(a, res -> {
				Account account = res.result();
				LOGGER.info("Created: {}", account);
				rc.response().end(account.toString());
			});
		} else {
			rc.response().setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()).end();
		}
	});
});

Testing

I created JUnit test case to check if OAuth2 authentication works fine. Vert.x provides library which can be used for testing. It is especially design to work well with asynchronous code. Include the following dependency to your pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-unit</artifactId>
	<version>${vertx.version}</version>
	<scope>test</scope>
</dependency>

Then annotate your JUnit test class with @RunWith(VertxUnitRunner.class). Before running our test method we should deploy verticles. Verticle with REST API is deployed on port 2222.

Vertx vertx;

@Before
public void before(TestContext context) throws IOException {
	vertx = Vertx.vertx();
	vertx.deployVerticle(MongoVerticle.class.getName(), context.asyncAssertSuccess());
	DeploymentOptions options = new DeploymentOptions().setConfig(new JsonObject().put("http.port", 2222));
	vertx.deployVerticle(AccountServer.class.getName(), options, context.asyncAssertSuccess());
}

Here’s JUnit test method. We use WebClient for calling HTTP methods and Vert.x-Unit Async for complete test case on asynchronous calls finishes (3). First, we are calling POST \login method te retrieve access token from Keycloak (1). Then we are calling one of API method and setting Authorization header with access token string retrieved from POST \login method (2). During test case execution verticle with MongoDB (MongoVerticle) and API definition (AccountServer) are deployed and started, but you need to start manually MongoDB database, Consul and Keycloak. I suggest running it with Docker.

@Test
public void testAuth(TestContext context) {
	Async async = context.async();
	WebClient client = WebClient.create(vertx);
	User u = new User("piotr.minkowski", "Piot_123", "modify-account view-account");
	client.post(2222, "localhost", "/login").sendJson(u, ar -> { // (1)
		LOGGER.info("Response code: {}", ar.result().statusCode());
		LOGGER.info("Response: {}", ar.result().bodyAsString());
		if (ar.result().statusCode() == 200) {
			User user = ar.result().bodyAsJson(User.class);
			client.get(2222, "localhost", "/account").putHeader("Authorization", "Bearer " + user.getAccessToken()).send(r -> { // (2)
				LOGGER.info("GET result: {}", r.result().bodyAsString());
				async.complete(); // (3)
			});
		} else {
			async.complete();
		}
	});
}

Final Thoughts

To be honest I have never dealt with Vert.x before the start of work on a series of articles published on my blog. From those couple of days spending on that toolkit recognition I’ll definitely recommend using it when working on REST APIs. Vert.x provides the smart implementation for security with OAuth2. Additionally you can use it in combination with the solution like Keycloak, which is used for identity and access management. As usual there are also some drawbacks. I had a problem with understanding how the authorities exactly work in Vert.x. When I created a role inside Keycloak client it didn’t work in my application. Only global realm role worked fine. However, those problems does not overshadow Vert.x advantages.

Asynchronous Microservices with Vert.x

Preface

I must admit that as soon as I saw Vert.x documentation I liked this concept. This may have happened because I had previously use with very similar framework which I used to create simple and lightweight applications exposing REST APIs – Node.js. It is really fine framework, but has one big disadvantage for me – it is JavaScript runtime. What is worth mentioning Vert.x is polyglot, it supports all the most popular JVM based languages like Java, Scala, Groovy, Kotlin and even JavaScript. These are not all of its advantages. It’s lightweight, fast and modular. I was pleasantly surprised when I added the main Vert.x dependencies to my pom.xml and there was not downloaded many of other dependencies, as is often the case when using Spring Boot framework.
Well, I will not elaborate about the advantages and key concepts of this toolkit. I think you can read more about it in other articles. The most important thing for us is that using Vert.x we can can create high performance and asynchronous microservices based on Netty framework. In addition, we can use standardized microservices mechanisms such as service discovery, configuration server or circuit breaking.

Sample application source code is available on Github. It consists of two modules account-vertx-service and customer-vertx-service. Customer service retrieves data from Consul registry and invokes acccount service API. Architecture of the sample solution is visible on the figure below.

vertx

Building services

To be able to create HTTP service exposing REST API we need to include the following dependency into pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-web</artifactId>
	<version>${vertx.version}</version>
</dependency>

Here’s the fragment from account service where I defined all API methods. The first step (1) was to declare Router which is one of the core concepts of Vert.x-Web. A router takes an HTTP request, finds the first matching route for that request, and passes the request to that route. The next step (2), (3) is to add some handlers, for example BodyHandler, which allows you to retrieve request bodies and has been added to POST method. Then we can begin to define API methods (4), (5), (6), (7), (8). And finally (9) we are starting HTTP server on the port retrieved from configuration.

Router router = Router.router(vertx); // (1)
router.route("/account/*").handler(ResponseContentTypeHandler.create()); // (2)
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create()); // (3)
router.get("/account/:id").produces("application/json").handler(rc -> { // (4)
	repository.findById(rc.request().getParam("id"), res -> {
		Account account = res.result();
		LOGGER.info("Found: {}", account);
		rc.response().end(account.toString());
	});
});
router.get("/account/customer/:customer").produces("application/json").handler(rc -> { // (5)
	repository.findByCustomer(rc.request().getParam("customer"), res -> {
		List<Account> accounts = res.result();
		LOGGER.info("Found: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.get("/account").produces("application/json").handler(rc -> { // (6)
	repository.findAll(res -> {
		List<Account> accounts = res.result();
		LOGGER.info("Found all: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.post("/account").produces("application/json").handler(rc -> { // (7)
	Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
	repository.save(a, res -> {
		Account account = res.result();
		LOGGER.info("Created: {}", account);
		rc.response().end(account.toString());
	});
});
router.delete("/account/:id").handler(rc -> { // (8)
	repository.remove(rc.request().getParam("id"), res -> {
		LOGGER.info("Removed: {}", rc.request().getParam("id"));
		rc.response().setStatusCode(200);
	});
});
...
vertx.createHttpServer().requestHandler(router::accept).listen(conf.result().getInteger("port")); // (9)

All API methods uses repository object to communicate with datasource. In this case I decided to use Mongo. Vert.x has a module for interacting with that database, we need to include as new dependency.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-mongo-client</artifactId>
	<version>${vertx.version}</version>
</dependency>

Mongo client, same as all other Vert.x modules, works asynchronously. That’s why we need to use AsyncResult Handler to pass result from repository object. To be able to pass custom object as AsyncResult we have to annotate it with @DataObject and add toJson method.

public AccountRepositoryImpl(final MongoClient client) {
	this.client = client;
}

@Override
public AccountRepository save(Account account, Handler<AsyncResult<Account>> resultHandler) {
	JsonObject json = JsonObject.mapFrom(account);
	client.save(Account.DB_TABLE, json, res -> {
		if (res.succeeded()) {
			LOGGER.info("Account created: {}", res.result());
			account.setId(res.result());
			resultHandler.handle(Future.succeededFuture(account));
		} else {
			LOGGER.error("Account not created", res.cause());
			resultHandler.handle(Future.failedFuture(res.cause()));
		}
	});
	return this;
}

@Override
public AccountRepository findAll(Handler<AsyncResult<List<Account>>> resultHandler) {
	client.find(Account.DB_TABLE, new JsonObject(), res -> {
		if (res.succeeded()) {
			List<Account> accounts = res.result().stream().map(it -> new Account(it.getString("_id"), it.getString("number"), it.getInteger("balance"), it.getString("customerId"))).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		} else {
			LOGGER.error("Account not found", res.cause());
			resultHandler.handle(Future.failedFuture(res.cause()));
		}
	});
	return this;
}

Here’s Account model class.

@DataObject
public class Account {

	public static final String DB_TABLE = "account";

	private String id;
	private String number;
	private int balance;
	private String customerId;

	public Account() {

	}

	public Account(String id, String number, int balance, String customerId) {
		this.id = id;
		this.number = number;
		this.balance = balance;
		this.customerId = customerId;
	}

	public Account(JsonObject json) {
		this.id = json.getString("id");
		this.number = json.getString("number");
		this.balance = json.getInteger("balance");
		this.customerId = json.getString("customerId");
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getNumber() {
		return number;
	}

	public void setNumber(String number) {
		this.number = number;
	}

	public int getBalance() {
		return balance;
	}

	public void setBalance(int balance) {
		this.balance = balance;
	}

	public String getCustomerId() {
		return customerId;
	}

	public void setCustomerId(String customerId) {
		this.customerId = customerId;
	}

	public JsonObject toJson() {
		return JsonObject.mapFrom(this);
	}

	@Override
	public String toString() {
		return Json.encodePrettily(this);
	}

}

Verticles

It is worth to mention a few words about running an application written in Vert.x. It is based on verticles. Verticles are chunks of code that get deployed and run by Vert.x. A Vert.x instance maintains N event loop threads by default. When creating a verticle we have to extend abstract class AbstractVerticle.

public class AccountServer extends AbstractVerticle {

	@Override
	public void start() throws Exception {
		...
	}
}

I created two verticles per microservice. First for HTTP server and second for communication with Mongo. Here’s main application method where I’m deploying verticles.

public static void main(String[] args) throws Exception {
	Vertx vertx = Vertx.vertx();
	vertx.deployVerticle(new MongoVerticle());
	vertx.deployVerticle(new AccountServer());
}

Well, now we should obtain the reference inside AccountServer verticle to the service running on MongoVerticle. To achieve it we have to generate proxy classes using vertx-codegen module.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-proxy</artifactId>
	<version>${vertx.version}</version>
</dependency>
<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-codegen</artifactId>
	<version>${vertx.version}</version>
	<scope>provided</scope>
</dependency>

First, annotate repository interface with @ProxyGen ad all public methods with @Fluent.

@ProxyGen
public interface AccountRepository {

	@Fluent
	AccountRepository save(Account account, Handler<AsyncResult<Account>> resultHandler);

	@Fluent
	AccountRepository findAll(Handler<AsyncResult<List<Account>>> resultHandler);

	@Fluent
	AccountRepository findById(String id, Handler<AsyncResult<Account>> resultHandler);

	@Fluent
	AccountRepository findByCustomer(String customerId, Handler<AsyncResult<List<Account>>> resultHandler);

	@Fluent
	AccountRepository remove(String id, Handler<AsyncResult<Void>> resultHandler);

	static AccountRepository createProxy(Vertx vertx, String address) {
		return new AccountRepositoryVertxEBProxy(vertx, address);
	}

	static AccountRepository create(MongoClient client) {
		return new AccountRepositoryImpl(client);
	}

}

Generator needs additional configuration inside pom.xml file. After running command mvn clean install on the parent project all generated classes should be available under src/main/generated directory for every microservice module.

<plugin>
	<groupId>org.apache.maven.plugins</groupId>
	<artifactId>maven-compiler-plugin</artifactId>
	<version>3.6.2</version>
	<configuration>
		<encoding>${project.build.sourceEncoding}</encoding>
		<source>${java.version}</source>
		<target>${java.version}</target>
		<useIncrementalCompilation>false</useIncrementalCompilation>
		<annotationProcessors>		<annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
		</annotationProcessors>
		<generatedSourcesDirectory>${project.basedir}/src/main/generated</generatedSourcesDirectory>
		<compilerArgs>
			<arg>-AoutputDirectory=${project.basedir}/src/main</arg>
		</compilerArgs>
	</configuration>
</plugin>

Now we are able to obtain AccountRepository reference by calling createProxy with account-service name.

AccountRepository repository = AccountRepository.createProxy(vertx, "account-service");

Service Discovery

To use the Vert.x service discovery, we have to add the following dependencies into pom.xml. In the first of them there are mechanisms for built-in Vert.x discovery, which is rather not usable if we would like to invoke microservices running on different hosts. Fortunately, there are also available some additional bridges, for example Consul bridge.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-discovery</artifactId>
	<version>${vertx.version}</version>
</dependency>
<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-discovery-bridge-consul</artifactId>
	<version>${vertx.version}</version>
</dependency>

Great, we only have to declare service discovery and register service importer. Now, we can retrieve configuration from Consul, but I assume we also would like to register our service. Unfortunately, problems start here… Like the toolkit authors say It (Vert.x) does not export to Consul and does not support service modification. Maybe somebody will explain me why this library can not also export data to Consul – I just do not understand it. I had the same problem with Apache Camel some months ago and I will use the same solution I developed that time. Fortunately, Consul has simple API for service registration and deregistration. To use it in our appplication we need to include Vert.x HTTP client to our dependencies.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-web-client</artifactId>
	<version>${vertx.version}</version>
</dependency>

Then using declared WebClient while starting application we can register service by invoking Consul PUT method.

WebClient client = WebClient.create(vertx);
...
JsonObject json = new JsonObject().put("ID", "account-service-1").put("Name", "account-service").put("Address", "127.0.0.1").put("Port", 2222).put("Tags", new 		JsonArray().add("http-endpoint"));
client.put(discoveryConfig.getInteger("port"), discoveryConfig.getString("host"), "/v1/agent/service/register").sendJsonObject(json, res -> {
	LOGGER.info("Consul registration status: {}", res.result().statusCode());
});

Once the account-service have registered itself on discovery server we can invoke it from another microservice – in this case from customer-service. We only have to create ServiceDiscovery object and register Consul service importer.

ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
...
discovery.registerServiceImporter(new ConsulServiceImporter(), new JsonObject().put("host", discoveryConfig.getString("host")).put("port", discoveryConfig.getInteger("port")).put("scan-period", 2000));

Here’s AccountClient fragment, which is responsile for invoking GET /account/customer/{customerId} from account-service. It obtains service reference from discovery object and cast it to WebClient instance. I don’t know if you have noticed that apart from the standard fields such as ID, Name or Port, I also set the Tags field to the value of the type of service that we register. In this case it will be an http-endpoint. Whenever Vert.x reads data from Consul, it will be able to automatically assign a service reference to WebClient object.

public AccountClient findCustomerAccounts(String customerId, Handler<AsyncResult<List<Account>>> resultHandler) {
	discovery.getRecord(r -> r.getName().equals("account-service"), res -> {
		LOGGER.info("Result: {}", res.result().getType());
		ServiceReference ref = discovery.getReference(res.result());
		WebClient client = ref.getAs(WebClient.class);
		client.get("/account/customer/" + customerId).send(res2 -> {
			LOGGER.info("Response: {}", res2.result().bodyAsString());
			List<Account> accounts = res2.result().bodyAsJsonArray().stream().map(it -> Json.decodeValue(it.toString(), Account.class)).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		});
	});
	return this;
}

Config

For configuration management within the application Vert.x Config module is responsible.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-config</artifactId>
	<version>${vertx.version}</version>
</dependency>

There are many configuration stores, which can be used as configuration data location:

  • File
  • Environment Variables
  • HTTP
  • Event Bus
  • Git
  • Redis
  • Consul
  • Kubernetes
  • Spring Cloud Config Server

I selected the simplest one – file. But it can be easily changed only by defining another type on ConfigStoreOptions object. For loading configuration data from the store ConfigRetriever is responsible. It reads configuration as JsonObject.

ConfigStoreOptions file = new ConfigStoreOptions().setType("file").setConfig(new JsonObject().put("path", "application.json"));
ConfigRetriever retriever = ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(file));
retriever.getConfig(conf -> {
	JsonObject discoveryConfig = conf.result().getJsonObject("discovery");
	vertx.createHttpServer().requestHandler(router::accept).listen(conf.result().getInteger("port"));
	JsonObject json = new JsonObject().put("ID", "account-service-1").put("Name", "account-service").put("Address", "127.0.0.1").put("Port", 2222).put("Tags", new JsonArray().add("http-endpoint"));
	client.put(discoveryConfig.getInteger("port"), discoveryConfig.getString("host"), "/v1/agent/service/register").sendJsonObject(json, res -> {
		LOGGER.info("Consul registration status: {}", res.result().statusCode());
	});
});

Configuration file application.json is available under src/main/resources and it contains application port, service discovery and datasource adresses.

{
	"port" : 2222,
	"discovery" : {
		"host" : "192.168.99.100",
		"port" : 8500
	},
	"datasource" : {
		"host" : "192.168.99.100",
		"port" : 27017,
		"db_name" : "test"
	}
}

Final thoughts

Vert.x authors wouldn’t like to define their solution as a framework, but as a tool-kit. They don’t tell you what is a correct way to write an application, but only give you a lot of useful bricks helping to create your app. With Vert.x you can create fast and lightweight APIs basing on non-blocking, asynchronous I/O. It gives you a lot of possibilities, as you can see on the Config module example, where you can even use Spring Cloud Config Server as a configuration store. But it is also not free from drawbacks, as I showed on the service registration with Consul example. Vert.x also allows to create reactive microservices with RxJava, what seems to be interesting option, I hope to describe in the future.