Deploying Spring Cloud Microservices on Hashicorp’s Nomad

Nomad is a little less popular HashiCorp’s cloud product than Consul, Terraform or Vault. It is also not as popular as a competitive software like Kubernetes and Docker Swarm. However, it has its advantages. While Kubernetes is specifically focused on Docker, Nomad is more general purpose. It supports containerized Docker applications as well as simple applications delivered as an executable JAR files. Besides that, Nomad is architecturally much simpler. It is a single binary, both for clients and servers, and does not require any services for coordination or storage.

In this article I’m going to show you how to install, configure and use Nomad in order to run on it some microservices created in Spring Boot and Spring Cloud frameworks. Let’s move on.

Step 1. Installing and running Nomad

HashiCorp’s Nomad can be easily started on Windows. You just have to download it from the following site https://www.nomadproject.io/downloads.html, and then add nomad.exe file to your PATH. Now you are able to run Nomad commands from your command-line. Let’s begin from starting Nomad agent. For simplicity, we will run it in development mode (-dev). With this option it is acting both as a client and a server.  Here’s command that starts Nomad agent on my local machine.

nomad agent -dev -network-interface="WiFi" -consul-address=192.168.99.100:8500

Sometimes you could be required to pass selected network interface as a parameter. We also need to integrate agent node with Consul discovery for the purpose of inter-service communication discussed in the next part of this article. The most suitable way to run Consul on your local machine is through a Docker container. Here’s the command that launches single node Consul discovery server and exposes it on port 8500. If you run Docker on Windows it is probably available under address 192.168.99.100.

docker run -d --name consul -p 8500:8500 consul

Step 2. Creating job

Nomad is a tool for managing a cluster of machines and running applications on them. To run the application there we should first create job. Job is the primary configuration unit that users interact with when using Nomad. Job is a specification of tasks that should be ran by Nomad. The job consists of multiple groups, and each group may have multiple tasks.

There are some properties that has to be provided, for example datacenters. You should also set type parameter that indicates scheduler type. I set type service, which is designed for scheduling long lived services that should never go down, like an application exposing HTTP API.

Let’s take a look on Nomad’s job descriptor file. The most important elements of that configuration has been marked by the sequence numbers:

  1. Property count specifies the number of the task groups that should be running under this group. In practice it scales up number of instances of the service started by the task. Here, it has been set to 2.
  2. Property driver specifies the driver that should be used by Nomad clients to run the task. The driver name corresponds to a technology used for running the application. For example we can set docker, rkt for containerization solutions or java for executing Java applications packaged into a Java JAR file. Here, the property has been set to java.
  3. After settings the driver we should provide some configuration for this driver in the job spec. There are some options available for java driver. But I decided to set the absolute path to the downloaded JAR and some JVM options related to the memory limits.
  4. We may set some requirements for the task including memory, network, CPU, and more. Our task requires max 300 MB or RAM, and enables dynamic port allocation for the port labeled “http”.
  5. Now, it is required to point out very important thing. When the task is started, it is passed an additional environment variable named NOMAD_HOST_PORT_http which indicates the host port that the HTTP service is bound to. The suffix http relates to the label set for the port.
  6. Property service inside task specifies integrations with Consul for service discovery. Now, Nomad automatically registers a task with the provided name when a task is started and de-registers it when the task dies. As you probably remember, the port number is generated automatically by Nomad. However, I passed the label http to force Nomad to register in Consul with automatically generated port.
job "caller-service" {
	datacenters = ["dc1"]
	type = "service"
	group "caller" {
		count = 2 # (1)
		task "api" {
			driver = "java" # (2)
			config { # (3)
				jar_path    = "C:\\Users\\minkowp\\git\\sample-nomad-java-services\\caller-service\\target\\caller-service-1.0.0-SNAPSHOT.jar"
				jvm_options = ["-Xmx256m", "-Xms128m"]
			}
			resources { # (4)
				cpu    = 500
				memory = 300
				network {
					port "http" {} # (5)
				}
			}
			service { # (6)
				name = "caller-service"
				port = "http"
			}
		}
		restart {
			attempts = 1
		}
	}
}

Once we saved the content visible above as job.nomad file, we may apply it to the Nomad node by executing the following command.

nomad job run job.nomad

Step 3. Building sample microservices

Source code of sample applications is available on GitHub in my repository sample-nomad-java-services. There are two simple microservices callme-service and caller-service. I have already use that sample for in the previous articles for showing inter-service communication mechanism. Microservice callme-service does nothing more than exposing endpoint GET /callme/ping that displays service’s name and version.

@RestController
@RequestMapping("/callme")
public class CallmeController {

	private static final Logger LOGGER = LoggerFactory.getLogger(CallmeController.class);

	@Autowired
	BuildProperties buildProperties;

	@GetMapping("/ping")
	public String ping() {
		LOGGER.info("Ping: name={}, version={}", buildProperties.getName(), buildProperties.getVersion());
		return buildProperties.getName() + ":" + buildProperties.getVersion();
	}

}

Implementation of caller-service endpoint is a little bit more complicated. First we have to connect our service with Consul in order to fetch list of registered instances of callme-service. Because we use Spring Boot for creating sample microservices, the most suitable way to enable Consul client is through Spring Cloud Consul library.

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>

We should override auto-configured connection settings in application.yml. In addition to host and property we have also set spring.cloud.consul.discovery.register property to false. We don’t want discovery client to register application in Consul after startup, because it has been already performed by Nomad.

spring:
  application:
    name: caller-service
  cloud:
    consul:
      host: 192.168.99.100
      port: 8500
      discovery:
        register: false

Then we should enable Spring Cloud discovery client and RestTemplate load balancer in the main class of application.

@SpringBootApplication
@EnableDiscoveryClient
public class CallerApplication {

	public static void main(String[] args) {
		SpringApplication.run(CallerApplication.class, args);
	}

	@Bean
	@LoadBalanced
	RestTemplate restTemplate() {
		return new RestTemplate();
	}

}

Finally, we can implement method GET /caller/ping that call endpoint exposed by callme-service.

@RestController
@RequestMapping("/caller")
public class CallerController {

	private static final Logger LOGGER = LoggerFactory.getLogger(CallerController.class);

	@Autowired
	BuildProperties buildProperties;
	@Autowired
	RestTemplate restTemplate;

	@GetMapping("/ping")
	public String ping() {
		LOGGER.info("Ping: name={}, version={}", buildProperties.getName(), buildProperties.getVersion());
		String response = restTemplate.getForObject("http://callme-service/callme/ping", String.class);
		LOGGER.info("Calling: response={}", response);
		return buildProperties.getName() + ":" + buildProperties.getVersion() + ". Calling... " + response;
	}

}

As you probably remember the port of application is automatically generated by Nomad during task execution. It passes an additional environment variable named NOMAD_HOST_PORT_http to the application. Now, this environment variable should be configured inside application.yml file as the value of server.port property.

server:
  port: ${NOMAD_HOST_PORT_http:8090}

The last step is to build the whole project sample-nomad-java-services with mvn clean install command.

Step 4. Using Nomad web console

During two previous steps we have created, build and deployed our sample applications on Nomad. Now, we should verify the installation. We can do it using CLI or by visiting web console provided by nomad. Web console is available under address http://localhost:4646.

In the main site of web console we may see the summery of existing jobs. If everything goes fine field status is equal to RUNNING and bar Summary is green.

nomad-1

We can display the details of every job in the list. The next screen shows the history of the job, reserved resources and number of running instances (tasks).

nomad-2

If you would like to check out the details related to the single task, you should navigate to Task Group details.

nomad-3

We may also display the details related to the client node.

nomad-4

To display the details of allocation select the row in the table. You would be redirected to the following site. You may check out there an IP address of the application instance.

nomad-5

Step 5. Testing a sample system

Assuming you have succesfully deployed the applications on Nomad you should see the following services registered in Consul.

nomad-6

Now, if you call one of two available instances of caller-service, you should see the following response. The address of callme-service instance has been succesfully fetched from Consul through Spring Cloud Consul Client.

nomad-7

Advertisements

Service Mesh with Istio on Kubernetes in 5 steps

In this article I’m going to show you some basic and more advanced samples that illustrate how to use Istio platform in order to provide communication between microservices deployed on Kubernetes. Following the description on Istio website it is:

An open platform to connect, manage, and secure microservices. Istio provides an easy way to create a network of deployed services with load balancing, service-to-service authentication, monitoring, and more, without requiring any changes in service code.

Istio provides mechanisms for traffic management like request routing, discovery, load balancing, handling failures and fault injection. Additionally you may enable istio-auth that provides RBAC (Role-Based Access Control) and Mutual TLS Authentication. In this article we will discuss only about traffic management mechanisms.

Step 1. Installing Istio on Minikube platform

The most comfortable way to test Istio locally on Kubernetes is through Minikube. I have already described how to configure Minikube on your local machine in this article: Microservices with Kubernetes and Docker. When installing Istio on Minikube we should first enable some Minikube’s plugins during startup.

minikube start --extra-config=controller-manager.ClusterSigningCertFile="/var/lib/localkube/certs/ca.crt" --extra-config=controller-manager.ClusterSigningKeyFile="/var/lib/localkube/certs/ca.key" --extra-config=apiserver.Admission.PluginNames=NamespaceLifecycle,LimitRanger,ServiceAccount,PersistentVolumeLabel,DefaultStorageClass,DefaultTolerationSeconds,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,ResourceQuota

Istio is installed in dedicated namespace called istio-system, but is able to manage services from all other namespaces. First, you should go to release page and download installation file corresponding to your OS. For me it is Windows, and all the next steps will be described with the assumption that we are using exactly this OS. After running Minikube it would be useful to enable Docker on Minikube’s VM. Thanks to that you will be able to execute docker commands.

@FOR /f "tokens=* delims=^L" %i IN ('minikube docker-env') DO @call %i

Now, extract Istio files to your local filesystem. File istioctl.exe, which is available under ${ISTIO_HOME}/bin directory should be added to your PATH. Istio contains some installation files for Kubernetes platform in ${ISTIO_HOME}/install/kubernetes. To install Istio’s core components on Minikube just apply the following YAML definition file.

kubectl apply -f install/kubernetes/istio.yaml

Now, you have Istio’s core components deployed on your Minikube instance. These components are:

Envoy – it is an open-source edge and service proxy, designed for cloud-native application. Istio uses an extended version of the Envoy proxy. If you are interested in some details about Envoy and microservices read my article Envoy Proxy with Microservices, that describes how to integrate Envoy gateway with service discovery.

Mixer – it is a platform-independent component responsible for enforcing access control and usage policies across the service mesh.

Pilot – it provides service discovery for the Envoy sidecars, traffic management capabilities for intelligent routing and resiliency.

The configuration provided inside istio.yaml definition file deploys some pods and services related to the components mentioned above. You can verify the installation using kubectl command or just by visiting Web Dashboard available after executing command minikube dashboard.

istio-2

Step 2. Building sample applications based on Spring Boot

Before we start configure any traffic rules with Istio, we need to create sample applications that will communicate with each other. These are really simple services. The source code of these applications is available on my GitHub account inside repository sample-istio-services. There are two services: caller-service and callme-service. Both of them expose endpoint ping which prints application’s name and version. Both of these values are taken from Spring Boot build-info file, which is generated during application build. Here’s implementation of endpoint GET /callme/ping.

@RestController
@RequestMapping("/callme")
public class CallmeController {

	private static final Logger LOGGER = LoggerFactory.getLogger(CallmeController.class);

	@Autowired
	BuildProperties buildProperties;

	@GetMapping("/ping")
	public String ping() {
		LOGGER.info("Ping: name={}, version={}", buildProperties.getName(), buildProperties.getVersion());
		return buildProperties.getName() + ":" + buildProperties.getVersion();
	}

}

And here’s implementation of endpoint GET /caller/ping. It calls GET /callme/ping endpoint using Spring RestTemplate. We are assuming that callme-service is available under address callme-service:8091 on Kubernetes. This service is will be exposed inside Minikube node under port 8091.

@RestController
@RequestMapping("/caller")
public class CallerController {

	private static final Logger LOGGER = LoggerFactory.getLogger(CallerController.class);

	@Autowired
	BuildProperties buildProperties;
	@Autowired
	RestTemplate restTemplate;

	@GetMapping("/ping")
	public String ping() {
		LOGGER.info("Ping: name={}, version={}", buildProperties.getName(), buildProperties.getVersion());
		String response = restTemplate.getForObject("http://callme-service:8091/callme/ping", String.class);
		LOGGER.info("Calling: response={}", response);
		return buildProperties.getName() + ":" + buildProperties.getVersion() + ". Calling... " + response;
	}

}

The sample applications have to be started on Docker container. Here’s Dockerfile that is responsible for building image with caller-service application.

FROM openjdk:8-jre-alpine
ENV APP_FILE caller-service-1.0.0-SNAPSHOT.jar
ENV APP_HOME /usr/app
EXPOSE 8090
COPY target/$APP_FILE $APP_HOME/
WORKDIR $APP_HOME
ENTRYPOINT ["sh", "-c"]
CMD ["exec java -jar $APP_FILE"]

The similar Dockerfile is available for callme-service. Now, the only thing we have to is to build Docker images.

docker build -t piomin/callme-service:1.0 .
docker build -t piomin/caller-service:1.0 .

There is also version 2.0.0-SNAPSHOT of callme-service available in branch v2. Switch to this branch, build the whole application, and then build docker image with 2.0 tag. Why we need version 2.0? I’ll describe it in the next section.

docker build -t piomin/callme-service:2.0 .

Step 3. Deploying sample applications on Minikube

Before we start deploying our applications on Minikube, let’s take a look on the sample system architecture visible on the following diagram. We are going to deploy callme-service in two versions: 1.0 and 2.0. Application caller-service is just calling callme-service, so I does not know anything about different versions of the target service. If we would like to route traffic between two versions of callme-service in proportions 20% to 80%, we have to configure the proper Istio’s routerule. And also one thing. Because Istio Ingress is not supported on Minikube, we will just Kubernetes Service. If we need to expose it outside Minikube cluster we should set type to NodePort.

istio-1

Let’s proceed to the deployment phase. Here’s deployment definition for callme-service in version 1.0.

apiVersion: v1
kind: Service
metadata:
  name: callme-service
  labels:
    app: callme-service
spec:
  type: NodePort
  ports:
  - port: 8091
    name: http
  selector:
    app: callme-service
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: callme-service
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: callme-service
        version: v1
    spec:
      containers:
      - name: callme-service
        image: piomin/callme-service:1.0
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 8091

Before deploying it on Minikube we have to inject some Istio properties. The command visible below prints a new version of deployment definition enriched with Istio configuration. We may copy it and save as deployment-with-istio.yaml file.

istioctl kube-inject -f deployment.yaml

Now, let’s apply the configuration to Kubernetes.

kubectl apply -f deployment-with-istio.yaml

The same steps should be performed for caller-service, and also for version 2.0 of callme-service. All YAML configuration files are committed together with applications, and are located in the root directory of every application’s module. If you have succesfully deployed all the required components you should see the following elements in your Minikube’s dashboard.

istio-3

Step 4. Applying Istio routing rules

Istio provides a simple Domain-specific language (DSL) that allows you configure some interesting rules that control how requests are routed within your service mesh. I’m going to show you the following rules:

  • Split traffic between different service versions
  • Injecting the delay in the request path
  • Injecting HTTP error as a reponse from service

Here’s sample route rule definition for callme-service. It splits traffic in proportions 20:80 between versions 1.0 and 2.0 of the service. It also adds 3 seconds delay in 10% of the requests, and returns an HTTP 500 error code for 10% of the requests.

apiVersion: config.istio.io/v1alpha2
kind: RouteRule
metadata:
  name: callme-service
spec:
  destination:
    name: callme-service
  route:
  - labels:
      version: v1
    weight: 20
  - labels:
      version: v2
    weight: 80
  httpFault:
    delay:
      percent: 10
      fixedDelay: 3s
    abort:
      percent: 10
      httpStatus: 500

Let’s apply a new route rule to Kubernetes.

kubectl apply -f routerule.yaml

Now, we can easily verify that rule by executing command istioctl get routerule.

istio-6

Step 5. Testing the solution

Before we start testing let’s deploy Zipkin on Minikube. Istio provides deployment definition file zipkin.yaml inside directory ${ISTIO_HOME}/install/kubernetes/addons.

kubectl apply -f zipkin.yaml

Let’s take a look on the list of services deployed on Minikube. API provided by application caller-service is available under port 30873.

istio-4

We may easily test the service for a web browser by calling URL http://192.168.99.100:30873/caller/ping. It prints the name and version of the service, and also the name and version of callme-service invoked by caller-service. Because 80% of traffic is routed to version 2.0 of callme-service you will probably see the following response.

istio-7

However, sometimes version 1.0 of callme-service may be called…

istio-8

… or Istio can simulate HTTP 500 code.

istio-9

You can easily analyze traffic statistics with Zipkin console.

istio-10

Or just take a look on the logs generated by pods.

istio-11

Apache Ignite Cluster together with Spring Boot

I have already introduced Apache Ignite in one of my previous articles In-memory data grid with Apache Ignite. Apache Ignite can be easily launched locally together with Spring Boot application. The only thing we have to do is to include artifact org.apache.ignite:ignite-spring-data to the project dependencies and then declare Ignite instance @Bean. Sample @Bean declaration is visible below.

@Bean
public Ignite igniteInstance() {
	IgniteConfiguration cfg = new IgniteConfiguration();
	cfg.setIgniteInstanceName("ignite-cluster-node");
	CacheConfiguration ccfg1 = new CacheConfiguration("PersonCache");
	ccfg1.setIndexedTypes(Long.class, Person.class);
	CacheConfiguration ccfg2 = new CacheConfiguration("ContactCache");
	ccfg2.setIndexedTypes(Long.class, Contact.class);
	cfg.setCacheConfiguration(ccfg1, ccfg2);
	IgniteLogger log = new Slf4jLogger();
	cfg.setGridLogger(log);
	return Ignition.start(cfg);
}

In this article I would like to show you a little more advanced sample where we will start multiple Ignite’s nodes inside cluster, Ignite’s web console for monitoring cluster, and Ignite’s agent for providing communication between cluster’s nodes and web console. Let’s begin by looking on the picture with an architecture of our sample solution.

ignite-2-1

We have three nodes which are part of the cluster. If you carefully take a look at the picture illustrating an architecture you have probably noticed that there are two nodes called as Server Node, and one called as Client Node. By default, all Ignite nodes are started as server nodes. Client mode needs to be explicitly enabled. Server nodes participate in caching, compute execution, stream processing, while client nodes provide an ability to connect to the servers remotely. However, they allow using the whole set of Ignite APIs, including near caching, transactions, compute and streaming.

Here’s Ignite’s client instance @Bean declaration.

@Bean
public Ignite igniteInstance() {
	IgniteConfiguration cfg = new IgniteConfiguration();
	cfg.setIgniteInstanceName("ignite-cluster-node");
	cfg.setClientMode(true);
	CacheConfiguration ccfg1 = new CacheConfiguration("PersonCache");
	ccfg1.setIndexedTypes(Long.class, Person.class);
	CacheConfiguration ccfg2 = new CacheConfiguration("ContactCache");
	ccfg2.setIndexedTypes(Long.class, Contact.class);
	cfg.setCacheConfiguration(ccfg1, ccfg2);
	return Ignition.start(cfg);
}

The fact is that we don’t have to do anything more to make our nodes working together within the cluster. Every new node is automatically detected by all other cluster’s nodes using multicast communication. When starting our sample application we only have to guarantee that each instance’s server would listen of different port by overriding server.port Spring Boot property. Here’s command that starts the sample application, which is available on GitHub (https://github.com/piomin/sample-ignite-jpa.git) under branch cluster (https://github.com/piomin/sample-ignite-jpa/tree/cluster). Each node exposes the same REST API, which may be easily tested using Swagger2 just by opening its dashboard available under address http://localhost:port/swagger-ui.html.

java -jar -Dserver.port=8901 -Xms512m -Xmx1024m -XX:+UseG1GC -XX:+DisableExplicitGC -XX:MaxDirectMemorySize=256m target/ignite-rest-service-1.0-SNAPSHOT.jar

If you have successfully started a new node you should see the similar information in your application logs.

>>> +----------------------------------------------------------------------+
>>> Ignite ver. 2.4.0#20180305-sha1:aa342270b13cc1f4713382a8eb23b2eb7edaa3a5
>>> +----------------------------------------------------------------------+
>>> OS name: Windows 10 10.0 amd64
>>> CPU(s): 4
>>> Heap: 1.0GB
>>> VM name: 14132@piomin
>>> Ignite instance name: ignite-cluster-node
>>> Local node [ID=9DB1296A-7EEC-4564-BAAD-14E5D4A3A08D, order=2, clientMode=false]
>>> Local node addresses: [piomin/0:0:0:0:0:0:0:1, piomin/127.0.0.1, piomin/192.168.1.102, piomin/192.168.116.1, /192.168.226.1, /192.168.99.1]
>>> Local ports: TCP:8082 TCP:10801 TCP:11212 TCP:47101 UDP:47400 TCP:47501

Let’s move back for a moment to the source code of our sample application. I assume you have already cloned a given repository from GitHub. There are two Maven modules available. The module ignite-rest-service is responsible for starting Ignite’s cluster node in server mode, while ignite-client-service for starting node in client mode. Because we run only a single instance of client’s node, we would not override its default port set inside application.yml file. You can build the project using mvn clean install command and then start with java -jar or just run the main class IgniteClientApplication from your IDE.

There is also JUnit test class inside module ignite-client-service, which defines one test responsible for calling HTTP endpoints (POST /person, POST /contact) that put data into Ignite’s cache. This test performs two operations. It puts some data to the Ignite’s in-memory cluster by calling endpoints exposed by client node, and then check if that data has been propagated through the cluster by calling GET /person/{id}/withContacts endpoint exposed by one of the selected server nodes.

public class TestCluster {

	TestRestTemplate template = new TestRestTemplate();
	Random r = new Random();
	int[] clusterPorts = new int[] {8901, 8902};

	@Test
	public void testCluster() throws InterruptedException {
		for (int i=0; i<1000; i++) {
			Person p = template.postForObject("http://localhost:8090/person", createPerson(), Person.class);
			Assert.notNull(p, "Create person failed");
			Contact c1 = template.postForObject("http://localhost:8090/contact", createContact(p.getId(), 0), Contact.class);
			Assert.notNull(c1, "Create contact failed");
			Contact c2 = template.postForObject("http://localhost:8090/contact", createContact(p.getId(), 1), Contact.class);
			Assert.notNull(c2, "Create contact failed");
			Thread.sleep(10);
			Person result = template.getForObject("http://localhost:{port}/person/{id}/withContacts", Person.class, clusterPorts[r.nextInt(2)], p.getId());
			Assert.notNull(result, "Person not found");
			Assert.notEmpty(result.getContacts(), "Contacts not found");
		}
	}

	private Contact createContact(Long personId, int index) {
		...
	}

	private Person createPerson() {
		...
	}

}

Before running any tests, we should launch two additional elements being a part of our architecture: Ignite's web console and agent. The most suitable way to run Ignite's web console on the local machine is through its Docker image apacheignite/web-console-standalone.  Here's Docker command that starts Ignite's web console and exposes it on port 80. Because I run Docker on Windows, it is now available under default VM address http://192.168.99.100/.

docker run -d -p 80:80 -p 3001:3001 -v /var/data:/var/lib/mongodb --name ignite-web-console apacheignite/web-console-standalone

In order to access it you should first register your user. Although mail server is not available on the Docker container, you would be logged in after it. You can configure your cluster using Ignite’s web console, and also run some SQL queries on that cluster. Of course, we still need to connect our cluster consisting of three nodes with the instance of web console started on Docker container. To achieve it you have to download a web agent. Probably it is not very intuitive, but you have to click button Start Demo, which is located on the right corner of Ignite’s web console. Then you would be redirected to the download page, where you can accept download of ignite-web-agent-2.4.0.zip file, which contains all needed libraries and configuration to start web agent locally.

ignite-2-2

After downloading and unpacking web agent go to its main directory and change property server-uri to http://192.168.99.100 inside default.properties file. Then you may run script ignite-web-agent.bat (or .sh if you are testing it on Linux), which starts web agent. Unfortunately, it’s not all what has to be done. Every server node’s application should include artifact ignite-rest-http in order to be able to communicate with the agent. It is responsible for exposing HTTP endpoint that is accessed by a web agent. It is based on Jetty server, what causes some problems in conjunction with Spring Boot. Spring Boot sets default versions of Jetty libraries used inside the project. The problem is that ignite-rest-http requires older versions of that libraries, so we also have to override some default managed versions in pom.xml file according to the sample visible below.

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-http</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-server</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-io</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-continuation</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-util</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-xml</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
	</dependencies>
</dependencyManagement>

After implementing the changes described above, we may finally proceed to running all the elements being a part of our sample system. If you start Ignite Web Agent locally it should automatically detect all running cluster nodes. Here’s the screen with the logs displayed by the agent after startup.

ignite-2-3

At the same time you should see that a new cluster has been detected by Ignite Web Console.

ignite-2-4

You can configure a new or a currently existing cluster using web console, or just run a test query on the selected managed cluster. You have to include a name of cache as a prefix to the table name when defining a query.

ignite-2-5

Similar queries have be declared inside a repository interface. Here are additional methods used for finding entities stored in PersonCache. If you would like to include results stored in other cache, you have to explicitly declare its name together with table name.

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository {

	List findByFirstNameAndLastName(String firstName, String lastName);

	@Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.id=?")
	List<List> findByIdWithContacts(Long id);

	@Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
	List selectContacts(String firstName, String lastName);

	@Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
	List<List> selectContacts2(String firstName, String lastName);
}

We are nearing the end. Now, let’s run our JUnit test TestCluster in order to generate some test data and put it into the clustered cache. You can monitor a size of a cache using web console. All you have to do is to run SELECT COUNT(*) query, and set graph mode as a default mode for result displaying. The chart visible below illustrates number of entities stored inside Ignite’s cluster at 5s intervals.

ignite-2-6

Up-to-date cache with EclipseLink and Oracle

One of the most useful feature provided by ORM libraries is a second-level cache, usually called L2. L2 object cache reduces database access for entities and their relationships. It is enabled by default in the most popular JPA implementations like Hibernate or EclipseLink. That won’t be a problem, unless a table inside a database is not modified directly by third-party applications, or by the other instance of the same application in a clustered environment. One of the available solutions to this problem is in-memory data grid, which stores all data in a memory, and is distributed across many nodes inside a cluster. Such a tools like Hazelcast or Apache Ignite has been described several times in my blog. If you are interested in one of that tools I recommend you read one of my previous article bout it: Hazelcast Hot Cache with Striim.

However, we won’t discuss about it in this article. Today, I would like to talk about Continuous Query Notification feature provided by Oracle Database. It solves a problem with updating or invalidating a cache when the data changes in the database. Oracle JDBC drivers provide support for it since 11g Release 1. This functionality is based on receiving invalidation events from the JDBC drivers. Fortunately, EclipseLink extends that feature in their solution called EclipseLink Database Change Notification. In this article I’m going to show you how to implement it using Spring Data JPA together with EclipseLink library.

How it works

The most useful functionality provided by the Oracle Database Continuous Query Notification is an ability to raise database events when rows in a table were modified. It enables client applications to register queries with the database and receive notifications in response to DML or DDL changes on the objects associated with the queries. To detect modifications, EclipseLink DCN uses Oracle ROWID to intercept changes in the table. ROWID is included to all queries for a DCN-enabled class. EclipseLink also retrieves ROWID of saved entity after an insert operation, and maintains a cache index on that ROWID. It also selects the database transaction ID once for each transaction to avoid invalidating the cache during the processing of transaction.

When a database sends a notification it usually contains the followoing information:

  • Names of the modifying objects, for example a name of changed table
  • Type of change. The possible values are INSERT, UPDATE, DELETE, ALTER TABLE, or DROP TABLE
  • Oracle’s ROWID of changed record

Running Oracle database locally

Before starting working on our sample application we need to have Oracle database installed. Fortunately, there are some Docker images with Oracle Standard Edition 12c. The command visible below starts Oracle XE version and exposes it on default 1521 port. It is also possible to use web console available under port 9080.

$ docker run -d --name oracle -p 9080:8080 -p 1521:1521 sath89/oracle-12c

We need to have sysdba role in order to be able to grant privilege CHANGE NOTIFICATION to our database. The default password for user system is oracle.

GRANT CHANGE NOTIFICATION TO PIOMIN;

You may use any Oracle client like Oracle SQL Developer to connect with database or just login to a web console. Since I run Docker on Windows it is available on my laptop under address http://192.168.99.100:9080/em. Of course it is Oracle, so you need to settle in for a long haul, and wait until it starts. You can observer a progress of an installation by running command docker logs -f oracle. When you finally see a “100% complete” log entry you may grant the required privileges to the existing user or create a new one with a set of needed privileges, and proceed to the next step.

Sample application

The sample application source code is available on GitHub under address https://github.com/piomin/sample-eclipselink-jpa.git. It is Spring Boot application that uses Spring Data JPA as a data access layer implementation. Because the default JPA provider used in that project is EclipseLink, we should remember about excluding Hibernate libraries from starters spring-boot-starter-data-jpa and spring-boot-starter-web. Besides a standard EclipseLink library for JPA, we also have to include EclipseLink implementation for Oracle database (org.eclipse.persistence.oracle) and Oracle JDBC driver.

<dependency>
	<groupId>org.eclipse.persistence</groupId>
	<artifactId>org.eclipse.persistence.jpa</artifactId>
	<version>2.7.1</version>
</dependency>
<dependency>
	<groupId>org.eclipse.persistence</groupId>
	<artifactId>org.eclipse.persistence.oracle</artifactId>
	<version>2.7.1</version>
</dependency>
<dependency>
	<groupId>com.oracle</groupId>
	<artifactId>ojdbc7</artifactId>
	<version>12.1.0.1</version>
</dependency>

The next step is to provide connection settings to Oracle database launched as a Docker container. Do not try to do it through application.yml properties, because Spring Boot by default uses HikariCP for connection pooling. This in turn causes a conflict with Oracle datasource during application bootstrap. The following datasource declaration would work succesfully.

@Bean
public DataSource dataSource() {
	final DriverManagerDataSource dataSource = new DriverManagerDataSource();
	dataSource.setDriverClassName("oracle.jdbc.driver.OracleDriver");
	dataSource.setUrl("jdbc:oracle:thin:@192.168.99.100:1521:xe");
	dataSource.setUsername("piomin");
	dataSource.setPassword("Piot_123");
	return dataSource;
}

EclipseLink with Database Change Notification

EclipseLink needs some specific configuration settings to succesfully work with Spring Boot and Spring Data JPA. These settings may be provided inside @Configuration class that extends JpaBaseConfiguration class. First, we should set EclipseLinkJpaVendorAdapter as default JPA vendor adapter. Then, we may configure some additional JPA settings like detailed logging level or automatic creation of database objects during application startup. However, the most important thing for us in the fragment of source code visible below is Oracle Continuous Query Notification settings.
EclipseLink CQN support is enabled by the OracleChangeNotificationListener listener which integrates with Oracle JDBC in order to received database change notifications. The full class name of the listener should be passed as a value of eclipselink.cache.database-event-listener property. EclipseLink by default enabled L2 cache for all entities, and respectively all tables in the persistence unit are registered for a change notification. You may exclude some of them by using the databaseChangeNotificationType attribute of the @Cache annotation on the selected entity.

@Configuration
@EnableAutoConfiguration
public class JpaConfiguration extends JpaBaseConfiguration {

	protected JpaConfiguration(DataSource dataSource, JpaProperties properties, ObjectProvider jtaTransactionManager, ObjectProvider transactionManagerCustomizers) {
		super(dataSource, properties, jtaTransactionManager, transactionManagerCustomizers);
	}

	@Override
	protected AbstractJpaVendorAdapter createJpaVendorAdapter() {
		return new EclipseLinkJpaVendorAdapter();
	}

	@Override
	protected Map getVendorProperties() {
	    HashMap map = new HashMap();
	    map.put(PersistenceUnitProperties.WEAVING, InstrumentationLoadTimeWeaver.isInstrumentationAvailable() ? "true" : "static");
	    map.put(PersistenceUnitProperties.DDL_GENERATION, "create-or-extend-tables");
	    map.put(PersistenceUnitProperties.LOGGING_LEVEL, SessionLog.FINEST_LABEL);
	    map.put(PersistenceUnitProperties.DATABASE_EVENT_LISTENER, "org.eclipse.persistence.platform.database.oracle.dcn.OracleChangeNotificationListener");
	    return map;
	}

}

What is worth mentioning EclipseLink’s CQN integration has some important limitations:

  • Changes to an object’s secondary tables will not trigger it to be invalidate unless a version is used and updated in the primary table
  • Changes to an object’s OneToMany, ManyToMany, and ElementCollection relationships will not trigger it to be invalidate unless a version is used and updated in the primary table

The conclusion from these limitations is obvious. We should enable optimistic locking by including an @Version in our entities. The column with @Version in the primary table will always be updated, and the object will always be invalidated. There are three entities implemented. Entity Order is in many-to-one relationship with Product and Customer entities. All these classes has @Version feature enabled.

@Entity
@Table(name = "JPA_ORDER")
public class Order {

	@Id
	@SequenceGenerator(sequenceName = "SEQ_ORDER", allocationSize = 1, initialValue = 1, name = "orderSequence")
	@GeneratedValue(generator = "orderSequence", strategy = GenerationType.SEQUENCE)
	private Long id;
	@ManyToOne
	private Customer customer;
	@ManyToOne
	private Product product;
	@Enumerated
	private OrderStatus status;
	private int count;

	@Version
	private long version;

	public Long getId() {
		return id;
	}

	public void setId(Long id) {
		this.id = id;
	}

	public Customer getCustomer() {
		return customer;
	}

	public void setCustomer(Customer customer) {
		this.customer = customer;
	}

	public Product getProduct() {
		return product;
	}

	public void setProduct(Product product) {
		this.product = product;
	}

	public OrderStatus getStatus() {
		return status;
	}

	public void setStatus(OrderStatus status) {
		this.status = status;
	}

	public int getCount() {
		return count;
	}

	public void setCount(int count) {
		this.count = count;
	}

	public long getVersion() {
		return version;
	}

	public void setVersion(long version) {
		this.version = version;
	}

	@Override
	public String toString() {
		return "Order [id=" + id + ", product=" + product + ", status=" + status + ", count=" + count + "]";
	}

}

Testing

After launching your application you see the following logs generated with Finest level.

[EL Finest]: connection: 2018-03-23 15:45:50.591--ServerSession(465621833)--Thread(Thread[main,5,main])--Registering table [JPA_PRODUCT] for database change event notification.
[EL Finest]: connection: 2018-03-23 15:45:50.608--ServerSession(465621833)--Thread(Thread[main,5,main])--Registering table [JPA_CUSTOMER] for database change event notification.
[EL Finest]: connection: 2018-03-23 15:45:50.616--ServerSession(465621833)--Thread(Thread[main,5,main])--Registering table [JPA_ORDER] for database change event notification.

The registration are stored in table user_change_notification_regs, which is available for your application’s user (PIOMIN).

$ SELECT regid, table_name FROM user_change_notification_regs;
     REGID TABLE_NAME
---------- ---------------------------------------------------------------
       326 PIOMIN.JPA_PRODUCT
       326 PIOMIN.JPA_CUSTOMER
       326 PIOMIN.JPA_ORDER

Our sample application exposes Swagger documentation of API, which may be accessed under address http://localhost:8090/swagger-ui.html. You can create or find some entities using it. If try to find the same entity several times you would see that the only first invoke generates SQL query in logs, while all others are taken from a cache. Now, try to change that record using any Oracle’s client like Oracle SQL Developer, and verify if cache has been succesfully refreshed.

eclipse-link-1

Summary

When I first heard about Oracle Database Change Notification supported by EclipseLink JPA vendor, my expectations were really high. It is very interesting solution, which guarantees automatic cache refresh after changes performed on database tables by third-party application avoiding your cache. However, I had some problems with that solution during tests. In some cases it just doesn’t work, and the detection of errors was really troublesome. It would be fine if such a solution could be also available for other databases than Oracle and JPA vendors like Hibernate.

Running Vert.x Microservices on Kubernetes/OpenShift

Automatic deployment, scaling, container orchestration, self-healing are a few of very popular topics in some recent months. This is reflected in the rapidly growing popularity of such tools like Docker, Kubernetes or OpenShift. It’s hard to find any developer who didn’t heard about these technologies. How many of you did setup and run all those tools locally?

Despite appearances, it is not very hard thing to do. Both Kubernetes and OpenShift provide simplified, single-node versions of their platform that allows you to create and try a local cluster, even on Windows.

In this article I’m going to guide you through the all steps that result in deploying and running microservices that communicates with each other and use MongoDB as a data source.

Technologies

Eclipse Vert.x – a toolkit for building reactive applications (and more) on the JVM. It’s a polyglot, event-driven, non blocking and fast framework what makes it the perfect choice for creating light-weight, high-performance microservices.

Kubernetes – is an open-source system for automating deployment, scaling, and management of containerized applications. Now, even Docker platform decided to get support for Kubernetes, although they are promoting their own clustering solution – Docker Swarm. You may easily run it locally using Minikube. However, we won’t use it this time. You can read interesting article about creating Spring Boot microservices and running them on Minikube here: Microservices with Kubernetes and Docker.

RedHat OpenShift – is an open source container application platform build on top of Docker containers and Kubernetes. It is also available online on website https://www.openshift.com/. You may easily run it locally with Minishift.

Getting started with Minishift

Of cource, you can read some tutorials available on RedHat website, but I’ll try to condense an instruction of installation and configuration in a few words. Firstly, I would like to point out that all the instructions will be applied to Windows OS.

Minishift requires a hyper-visor to start the virtual machine, so first you should download and install one of these tools. If you use other solution than Hyper-V, like I do, you would have to pass that driver name during Minishift starting. The command visible below launches it on Oracle VirtualBox and allocates 3GB of RAM memory for VM.

$  minishift start --vm-driver=virtualbox --memory=3G

The executable minishift.exe should be included in the system path. You should also have Docker client binary installed on your machine. Docker daemon is in turn managed by Minishift, so you can reuse it for other use-cases as well. All what you need to do to take an advantage of this functionality is to run the following command in your shell.

$ @FOR /f "tokens=* delims=^L" %i IN ('minishift docker-env') DO @call %i

OpenShift platform my be managed using CLI or web console. To enable CLI on Windows you should add it to the path and then run one command to configure your shell. The description of required steps is displayed after running the following command.

$ minishift oc-env
SET PATH=C:\Users\minkowp\.minishift\cache\oc\v3.7.1\windows;%PATH%
REM Run this command to configure your shell:
REM @FOR /f "tokens=*" %i IN ('minishift oc-env') DO @call %i

In order to use web console just run command $ minishift console, which automatically opens it in your web browser. For me, it is available under address https://192.168.99.100:8443/console. To check your ip just execute $ minishift ip.

Sample applications

The source code of sample applications is available on GitHub (https://github.com/piomin/sample-vertx-kubernetes.git). In fact, the similar application have been ran locally and described in the article Asynchronous Microservices with Vert.x. This article can be treated as an introduction to building microservices with Vert.x framework and to to Vert.x framework in general. The current application is even simpler, because it does not have to integrate with any external discovery server like Consul.

Now, let’s take a look on the code below. It declares a verticle that establishes a client connection to MongoDB and registers repository object as a proxy service. Such a service may be easily accessed by another verticle. MongoDB network address is managed by Minishift.

public class MongoVerticle extends AbstractVerticle {

	@Override
	public void start() throws Exception {
		JsonObject config = new JsonObject();
		config.put("connection_string", "mongodb://micro:micro@mongodb/microdb");
		final MongoClient client = MongoClient.createShared(vertx, config);
		final AccountRepository service = new AccountRepositoryImpl(client);
		ProxyHelper.registerService(AccountRepository.class, vertx, service, "account-service");
	}

}

That verticle can be deployed in the application’s main method. It is also important to set property vertx.disableFileCPResolving to true, if you would like to run your application on Minishift. It forces Vert.x to resolve file from the its classloader in addition from the file system.

public static void main(String[] args) throws Exception {
	System.setProperty("vertx.disableFileCPResolving", "true");
	Vertx vertx = Vertx.vertx();
	vertx.deployVerticle(new MongoVerticle());
	vertx.deployVerticle(new AccountServer());
}

AccountServer verticle contains simple API methods that performs CRUD operations on MongoDB.

Building Docker image

Assuming you have successfully installed and configured Minishift, and cloned my sample Maven project shared on GitHub, you may proceed to the build and deploy stage. The first step is to build the applications from source code by executing mvn clean install command on the root project. It consists of two independent modules: account-vert-service, customer-vertx-service. Each of these modules contains Dockerfile with image definition. Here’s the one created for customer-vertx-service. It is based openjdk:8-jre-alpine image. Alpine Linux is much smaller than most distribution base images, so our result image would have around 100MB, instead around 600MB if using standard OpenJDK image. Because we are generating Fat JAR files during Maven build we only have to run application inside container using java -jar command.

FROM openjdk:8-jre-alpine
ENV VERTICLE_FILE customer-vertx-service-1.0-SNAPSHOT.jar
ENV VERTICLE_HOME /usr/verticles
EXPOSE 8090
COPY target/$VERTICLE_FILE $VERTICLE_HOME/
WORKDIR $VERTICLE_HOME
ENTRYPOINT ["sh", "-c"]
CMD ["exec java -jar $VERTICLE_FILE"]

Once we have successfully build the project, we should navigate to the main directory of every module. The sample command visible below builds Docker image of customer-vertx-service.

$ docker build -t microservices/customer-vertx-service:1.0 .

In fact, there are some different approaches of building and deploying microservices on OpenShift. For example, we could use Maven plugin or OpenShift definition file. Currently discussed way of deploying application is obviously one the simplest, and it assumes using CLI and web console for configuring deployments and services.

Deploy application on Minishift

Before proceeding to the main part of that article including deploy and run application on Minishift we have to provide some pre-configuration. We have to begin from logging into OpenShift and creating new project with oc command. Here are two required CLI commands. The name of our first OpenShift project is microservices.

$ oc login -u developer -p developer
$ oc new-project microservices

We might as well perform the same actions using web console. After succesfully login there first you will see a dashboard with all available services brokered by Minishift. Let’s initialize a container with MongoDB. All the provided container settings should the same as configured inside application. After creating MongoDB service would available for all other services under mongodb name.

minishift-1

Creating MongoDB container managed by Minishift is only a part of a success. The most important thing is to deploy containers with two sample microservices, where each of them would have access to the database. Here as well, we may leverage two methods of resources creation: by CLI or via web console. Here are some CLI commands for creating deployment on OpenShift.

$ oc new-app --docker-image microservices/customer-vertx-service:1.0
$ oc new-app --docker-image microservices/account-vertx-service:1.0

The commands visible above create not only deployment, but also creates pods, and exposes each of them as a service. Now yoiu may easily scale number of running pods by executing the following command.

oc scale --replicas=2 dc customer-vertx-service
oc scale --replicas=2 dc account-vertx-service

The next step is to expose your service outside a container to make it publicly visible. We can achieve it by creating a route. OpenShift route is in fact Kubernetes ingress. OpenShift web console provides an interface for creating routes available under section Applications -> Routes. When defining new route you should enter its name, a name of a service, and a path on the basis of which requets are proxied. If hostname is not specified, it is automatically generated by OpenShift.

minishift-2

Now, let’s take a look on web console dashboard. There are three applications deployed: mongodb-persistent, account-vertx-service and customer-vertx-service. Both Vert.x microservices are scaled up with two running instances (Kubernetes pods), and are exposed under automatically generated hostname with given context path, for example http://account-route-microservices.192.168.99.100.nip.io/account.

minishift-3

You may check the details of every deployment by expanding it on the list view.

minishift-4

HTTP API is available outside and can be easily tested. Here’s the source code with REST API implementation for account-vertx-service.

AccountRepository repository = AccountRepository.createProxy(vertx, "account-service");
Router router = Router.router(vertx);
router.route("/account/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create());
router.get("/account/:id").produces("application/json").handler(rc -> {
	repository.findById(rc.request().getParam("id"), res -> {
		Account account = res.result();
		LOGGER.info("Found: {}", account);
		rc.response().end(account.toString());
	});
});
router.get("/account/customer/:customer").produces("application/json").handler(rc -> {
	repository.findByCustomer(rc.request().getParam("customer"), res -> {
		List accounts = res.result();
		LOGGER.info("Found: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.get("/account").produces("application/json").handler(rc -> {
	repository.findAll(res -> {
		List accounts = res.result();
		LOGGER.info("Found all: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.post("/account").produces("application/json").handler(rc -> {
	Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
	repository.save(a, res -> {
		Account account = res.result();
		LOGGER.info("Created: {}", account);
		rc.response().end(account.toString());
	});
});
router.delete("/account/:id").handler(rc -> {
	repository.remove(rc.request().getParam("id"), res -> {
		LOGGER.info("Removed: {}", rc.request().getParam("id"));
		rc.response().setStatusCode(200);
	});
});
vertx.createHttpServer().requestHandler(router::accept).listen(8095);

Inter-service communication

All the microservices are deployed and exposed outside the container. The last thing that we still have to do is provide a communication between them. In our sample system customer-vertx-service calls endpoint exposed by account-vertx-service. Thanks to Kubernetes services mechanism we may easily call another service from application’s container, for example using simple HTTP client implementation. Let’s take a look on the list of services exposed by Kubernetes.

minishift-6

Here’s client’s implementation responsible for communication with account-vertx-service. Vert.x WebClient takes three parameters when calling GET method: port, hostname and path. We should set a Kubernetes service name as a hostname paramater, and default container’s port as a port.

public class AccountClient {

	private static final Logger LOGGER = LoggerFactory.getLogger(AccountClient.class);

	private Vertx vertx;

	public AccountClient(Vertx vertx) {
		this.vertx = vertx;
	}

	public AccountClient findCustomerAccounts(String customerId, Handler<AsyncResult<List>> resultHandler) {
		WebClient client = WebClient.create(vertx);
		client.get(8095, "account-vertx-service", "/account/customer/" + customerId).send(res2 -> {
			LOGGER.info("Response: {}", res2.result().bodyAsString());
			List accounts = res2.result().bodyAsJsonArray().stream().map(it -> Json.decodeValue(it.toString(), Account.class)).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		});
		return this;
	}

}

AccountClient is invoked inside customer-vertx-service GET /customer/:id endpoint’s implementation.

router.get("/customer/:id").produces("application/json").handler(rc -> {
	repository.findById(rc.request().getParam("id"), res -> {
		Customer customer = res.result();
		LOGGER.info("Found: {}", customer);
		new AccountClient(vertx).findCustomerAccounts(customer.getId(), res2 -> {
			customer.setAccounts(res2.result());
			rc.response().end(customer.toString());
		});
	});
});

Summary

It is no coincidence that OpenShift is considered as the leading enterprise distribution of Kubernetes. It adds several helpful features to Kubernetes that simplify adopting it for developers and operation teams. You can easily try such features like CI/CD for DevOps, multiple projects with collaboration, networking, log aggregation from multiple pods on your local machine with Minishift.

Versioning REST API with Spring Boot and Swagger

One thing’s for sure. If you don’t have to version your API, do not try to do that. However, sometimes you have to. A large part of the most popular services like Twitter, Facebook, Netflix or PayPal is versioning their REST APIs. The advantages and disadvantages of that approach are obvious. On the one hand you don’t have to worry about making changes in your API even if many external clients and applications consume it. But on the other hand, you have maintain different versions of API implementation in your code, what sometimes may be troublesome.

In this article I’m going to show you how to maintain the several versions of REST API in your application in the most comfortable way. We will base on the sample application written on the top of Spring Boot framework and exposing API documentation using Swagger2 and SpringFox libraries.

Spring Boot does not provide any dedicated solutions for versioning APIs. The situation is different for SpringFox Swagger2 library, which provides grouping mechanism from version 2.8.0, which is perfect for generating documentation of versioned REST API.

I have already introduced Swagger2 together with Spring Boot application in one of my previous posts. In the article Microservices API Documentation with Swagger2 you may read how to use Swagger2 for generating API documentation for all the independent microservices and publishing it in one place – on API Gateway.

Different approaches to API versioning

There are some different ways to provide an API versioning in your application. The most popular of them are:

  1. Through an URI path – you include the version number in the URL path of the endpoint, for example /api/v1/persons
  2. Through query parameters – you pass the version number as a query parameter with specified name, for example /api/persons?version=1
  3. Through custom HTTP headers – you define a new header that contains the version number in the request
  4. Through a content negotiation – the version number is included to the “Accept” header together with accepted content type. The request with cURL would look like in the following sample: curl -H "Accept: application/vnd.piomin.v1+json" http://localhost:8080/api/persons

The decision, which of that approach implement in the application is up to you. We would discuss the advantages and disadvantages of every single approach, however it is not the main purpose of that article. The main purpose is to show you how to implement versioning in Spring Boot application and then publish the API documentation automatically using Swagger2. The sample application source code is available on GitHub (https://github.com/piomin/sample-api-versioning.git). I have implemented two of the approaches described above – in point 1 and 4.

Enabling Swagger for Spring Boot

Swagger2 can be enabled in Spring Boot application by including SpringFox library. In fact, this is the suite of java libraries used for automating the generation of machine and human readable specifications for JSON APIs written using Spring Framework. It supports such formats like swagger, RAML and jsonapi. To enable it for your application include the following Maven dependencies to the project: io.springfox:springfox-swagger-ui, io.springfox:springfox-swagger2, io.springfox:springfox-spring-web. Then you will have to annotate the main class with @EnableSwagger2 and define Docker object. Docket is a Springfox’s primary configuration mechanism for Swagger 2.0. We will discuss the details about it in the next section along with the sample for each way of versioning API.

Sample API

Our sample API is very simple. It exposes basic CRUD methods for Person entity. There are three versions of API available for external clients: 1.0, 1.1 and 1.2. In the version 1.1 I have changed the method for updating Person entity. In version 1.0 it was available under /person path, while now it is available under /person/{id} path. This is the only difference between versions 1.0 and 1.1. There is also one only difference in API between versions 1.1 and 1.2. Instead of field birthDate it returns age as integer parameter. This change affects to all the endpoints except DELETE /person/{id}. Now, let’s proceed to the implementation.

Versioning using URI path

Here’s the full implementation of URI path versioning inside Spring @RestController.

@RestController
@RequestMapping("/person")
public class PersonController {

	@Autowired
	PersonMapper mapper;
	@Autowired
	PersonRepository repository;

	@PostMapping({"/v1.0", "/v1.1"})
	public PersonOld add(@RequestBody PersonOld person) {
		return (PersonOld) repository.add(person);
	}

	@PostMapping("/v1.2")
	public PersonCurrent add(@RequestBody PersonCurrent person) {
		return mapper.map((PersonOld) repository.add(person));
	}

	@PutMapping("/v1.0")
	@Deprecated
	public PersonOld update(@RequestBody PersonOld person) {
		return (PersonOld) repository.update(person);
	}

	@PutMapping("/v1.1/{id}")
	public PersonOld update(@PathVariable("id") Long id, @RequestBody PersonOld person) {
		return (PersonOld) repository.update(person);
	}

	@PutMapping("/v1.2/{id}")
	public PersonCurrent update(@PathVariable("id") Long id, @RequestBody PersonCurrent person) {
		return mapper.map((PersonOld) repository.update(person));
	}

	@GetMapping({"/v1.0/{id}", "/v1.1/{id}"})
	public PersonOld findByIdOld(@PathVariable("id") Long id) {
		return (PersonOld) repository.findById(id);
	}

	@GetMapping("/v1.2/{id}")
	public PersonCurrent findById(@PathVariable("id") Long id) {
		return mapper.map((PersonOld) repository.findById(id));
	}

	@DeleteMapping({"/v1.0/{id}", "/v1.1/{id}", "/v1.2/{id}"})
	public void delete(@PathVariable("id") Long id) {
		repository.delete(id);
	}

}

If you would like to have three different versions available in the single generated API specification you should declare three Docket @Beans – one per single version. In this case the swagger group concept, which has been already introduced by SpringFox, would be helpful for us. The reason this concept bas been introduced is a necessity for support applications which require more than one swagger resource listing. Usually you need more than one resource listing in order to provide different versions of the same API. We can assign group to every Docket just by invoking groupName DSL method on it. Because different versions of API method are implemented within the same controller, we have to distinguish them by declaring path regex matching the selected version. All other settings are standard.

@Bean
public Docket swaggerPersonApi10() {
	return new Docket(DocumentationType.SWAGGER_2)
		.groupName("person-api-1.0")
		.select()
			.apis(RequestHandlerSelectors.basePackage("pl.piomin.services.versioning.controller"))
			.paths(regex("/person/v1.0.*"))
		.build()
		.apiInfo(new ApiInfoBuilder().version("1.0").title("Person API").description("Documentation Person API v1.0").build());
}

@Bean
public Docket swaggerPersonApi11() {
	return new Docket(DocumentationType.SWAGGER_2)
		.groupName("person-api-1.1")
		.select()
			.apis(RequestHandlerSelectors.basePackage("pl.piomin.services.versioning.controller"))
			.paths(regex("/person/v1.1.*"))
		.build()
		.apiInfo(new ApiInfoBuilder().version("1.1").title("Person API").description("Documentation Person API v1.1").build());
}

@Bean
public Docket swaggerPersonApi12() {
	return new Docket(DocumentationType.SWAGGER_2)
		.groupName("person-api-1.2")
		.select()
			.apis(RequestHandlerSelectors.basePackage("pl.piomin.services.versioning.controller"))
			.paths(regex("/person/v1.2.*"))
		.build()
		.apiInfo(new ApiInfoBuilder().version("1.2").title("Person API").description("Documentation Person API v1.2").build());
}

Now, we may display Swagger UI for our API just by calling URL in the web browser path /swagger-ui.html. You can switch between all available versions of API as you can see on the picture below.

api-1
Switching between available versions of API

Specification is generated by the exact version of API. Here’s documentation for version 1.0. Because method PUT /person is annotated with @Deprecated it is crossed out on the generated HTML documentation page.

api-2
Person API 1.0 specification

If you switch to group person-api-1 you will see all the methods that contains v1.1 in the path. Along them you may recognize the current version of PUT method with {id} field in the path.

api-3
Person API 1.1 specification

When using documentation generated by Swagger you may easily call every method after expanding it. Here’s the sample of calling method PUT /person/{id} from implemented for version 1.2.

api-5
Updating Person entity by calling method PUT from 1.2 version

Versioning using ‘Accept’ header

To access the implementation of versioning witt ‘Accept’ header you should switch to branch header (https://github.com/piomin/sample-api-versioning/tree/header). Here’s the full implementation of content negotiation using ‘Accept’ header versioning inside Spring @RestController.

@RestController
@RequestMapping("/person")
public class PersonController {

	@Autowired
	PersonMapper mapper;
	@Autowired
	PersonRepository repository;

	@PostMapping(produces = {"application/vnd.piomin.app-v1.0+json", "application/vnd.piomin.app-v1.1+json"})
	public PersonOld add(@RequestBody PersonOld person) {
		return (PersonOld) repository.add(person);
	}

	@PostMapping(produces = "application/vnd.piomin.app-v1.2+json")
	public PersonCurrent add(@RequestBody PersonCurrent person) {
		return mapper.map((PersonOld) repository.add(person));
	}

	@PutMapping(produces = "application/vnd.piomin.app-v1.0+json")
	@Deprecated
	public PersonOld update(@RequestBody PersonOld person) {
		return (PersonOld) repository.update(person);
	}

	@PutMapping(value = "/{id}", produces = "application/vnd.piomin.app-v1.1+json")
	public PersonOld update(@PathVariable("id") Long id, @RequestBody PersonOld person) {
		return (PersonOld) repository.update(person);
	}

	@PutMapping(value = "/{id}", produces = "application/vnd.piomin.app-v1.2+json")
	public PersonCurrent update(@PathVariable("id") Long id, @RequestBody PersonCurrent person) {
		return mapper.map((PersonOld) repository.update(person));
	}

	@GetMapping(name = "findByIdOld", value = "/{idOld}", produces = {"application/vnd.piomin.app-v1.0+json", "application/vnd.piomin.app-v1.1+json"})
	@Deprecated
	public PersonOld findByIdOld(@PathVariable("idOld") Long id) {
		return (PersonOld) repository.findById(id);
	}

	@GetMapping(name = "findById", value = "/{id}", produces = "application/vnd.piomin.app-v1.2+json")
	public PersonCurrent findById(@PathVariable("id") Long id) {
		return mapper.map((PersonOld) repository.findById(id));
	}

	@DeleteMapping(value = "/{id}", produces = {"application/vnd.piomin.app-v1.0+json", "application/vnd.piomin.app-v1.1+json", "application/vnd.piomin.app-v1.2+json"})
	public void delete(@PathVariable("id") Long id) {
		repository.delete(id);
	}

}

We still have to define three Docker @Beans, but the filtering criterias are slightly different. The simple filtering by path is not an option here. We have to crate Predicate for RequestHandler object and pass it to apis DSL method. The predicate implementation should filter every method in order to find only those which have produces field with required version number. Here’s sample Docket implementation for version 1.2.

@Bean
public Docket swaggerPersonApi12() {
	return new Docket(DocumentationType.SWAGGER_2)
		.groupName("person-api-1.2")
		.select()
			.apis(p -> {
				if (p.produces() != null) {
					for (MediaType mt : p.produces()) {
						if (mt.toString().equals("application/vnd.piomin.app-v1.2+json")) {
							return true;
						}
					}
				}
				return false;
			})
		.build()
		.produces(Collections.singleton("application/vnd.piomin.app-v1.2+json"))
		.apiInfo(new ApiInfoBuilder().version("1.2").title("Person API").description("Documentation Person API v1.2").build());
}

As you can see on the picture below the generated methods does not have the version number in the path.

api-6
Person API 1.2 specification for a content negotiation approach

When calling method for the selected version of API the only difference is in the response’s required content type.

api-7
Updating person and setting response content type

Summary

Versioning is one of the most important concept around HTTP APIs designing. No matter which approach to versioning you choose you should do everything to describe your API well. This seems to be especially important in the era of microservices, where your interface may be called by many other independent applications. In this case creating documentation in isolation from the source code could be troublesome. Swagger solves all of described problems. It may be easily integrated with your application, supports versioning. Thanks to SpringFox project it also can be easily customized in your Spring Boot application to meet more advanced demands.

Partitioning with Apache Kafka and Vert.x

Apache Kafka is a distributed streaming platform. It also may act as a messaging system in your architecture. Traditional message brokers provides two models of communication: queuing and publish-subscribe (topics). Queues are used for point-to-point messaging, while topics allows you broadcast data to multiple target consumers. Kafka does not provide queuing mechanism directly. However, it introduces the consumer group concept, which generalizes both queuing and publish-subscribe models. The consumer group mechanisms guarantees that a single message would be processed by the only one consumer that belongs to the given group. It is especially useful when you have more than one instance of your service, which listens for messages incoming to the topic. That feature makes your consumers to behave as queuing clients within the same group.

Eclipse Vert.x is a lightweight and fast toolkit for building reactive applications on the JVM. I have already introduced that solution is the some of my previous posts, for example Asynchronous Microservices with Vert.x. Vert.x does not force you to implement a reactive application. You may create a standard service, which processes the HTTP requests asynchronously in accordance with Asynchronous I/O concept.

The purpose of this article

The main purpose of this article is to show you the main features of Apache Kafka, that may be useful when creating applications consuming messages. The Java client’s library choice is not a key point here. However, in my opinion Vert.x that is asynchronous, high performance framework perfectly matches to Apache Kafka. It provides Vert.x Kafka client, which allows you to read and send messages from/to an Kafka cluster. Before we proceed to the sample, let’s first dive into the core abstraction of Kafka.

Kafka topic

I’m assuming you excellent know what topic is and what is its main role. The every message incoming to the topic goes to every subscriber. What is the main difference between Kafka and standard topic provided by other message brokers? Kafka topic is partitioned. Each partition is an ordered, immutable sequence of records. Every record can be uniquecly identified within the partition by a sequential id number called the offset. The Kafka cluster retains all published records according to the configured retention period.

Consumer may subscribe to the whole topic or only to the selected partition. It can also control the offset from where it starts processing data. For example, it is able to reset offset in order reprocess data from the past or just or skip ahead to the most recent record to consume only messages currently sent to the topic. Here’s the figure that illustrates a single partition structure with producer and consumers listening for the incoming data.

Kafka-1

Sample architecture

Let me say some words about the sample system architecture. Its source code is available on GitHub (https://github.com/piomin/sample-vertx-kafka-messaging.git). In accordance of the principle that one picture speaks more than a thousand words, the diagram illustrating the architecture of our system is visible below. We have one topic created on Kafka platform, that consists of two partitions. There is one client application that exposes REST API allowing to send orders into the system and then forwarding them into the topic. The target partition is calculated basing on the type of order. We may create orders with types SINGLE and MULTIPLE. There are also some applications that consumes data from topic. First of them single-order-processor reads data from partition 0, the second multiple-order-processor from partition 1, and the last all-order-processor does not choose any partition.

kafka-2

Running Kafka

To run Apache Kafka on the local machine we may use its Docker image. The image shared by Spotify also starts ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100.

docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.99.100 --env ADVERTISED_PORT=9092 spotify/kafka

However, that option assumes the topics would be automatically created during application startup. I’ve got some problems with it while creating multi-partitions topic. There is also another image ches/kafka, which requires starting ZooKeeper separately, but provides Kafka client interface.

docker run -d --name zookeeper -p 2181:2181 zookeeper
docker run -d --name kafka -p 9092:9092 -p 7203:7203 --network kafka-net --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env ZOOKEEPER_IP=192.168.99.100 ches/kafka

Finally, we can run ches/kafka container in client mode and then create topic orders-out with two partitions.

docker run --rm --network kafka-net ches/kafka kafka-topics.sh --create --topic orders-out --replication-factor 1 --partitions 2 --zookeeper 192.168.99.100:2181
Created topic "orders-out".

Building producer application

First, we need to include Maven dependencies to enable Vert.x framework for the application. If the application exposes RESTful HTTP API you should include vertx-web. Library vertx-kafka-client has to be included to all the sample modules.

To start Vert.x as Java application we have to create verticle by extending AbstractVerticle. Then the verticle needs to be deployed in the main method using Vertx object. For more details about Vert.x and verticles concept you may refer to one of my previous article mentioned in the preface.

public class OrderVerticle extends AbstractVerticle {

	public static void main(String[] args) {
		Vertx vertx = Vertx.vertx();
		vertx.deployVerticle(new OrderVerticle());
	}

}

The next step is to define producer using KafkaProducer interface. We have to provide connection settings and serializer implementation class. You can choose between various built-in serializer implemementations. The most suitable for me was JsonObjectSerializer, which requires JsonObject as an input parameter.

Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer producer = KafkaProducer.create(vertx, config);

The procuder is invoked inside POST method route definition. It returnes an asynchronous response with a status after sending message to the topic. The message is created using KafkaProducerRecord interface. It takes topic’s name, request object and partition number as the parameters. As you may see in the fragment of code below, partition number is calculated on the basis order type (o.getType().ordinal()).

Router router = Router.router(vertx);
router.route("/order/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/order").handler(BodyHandler.create());
router.post("/order").produces("application/json").handler(rc -> {
	Order o = Json.decodeValue(rc.getBodyAsString(), Order.class);
	KafkaProducerRecord record = KafkaProducerRecord.create("orders", null, rc.getBodyAsJson(), o.getType().ordinal());
	producer.write(record, done -> {
		if (done.succeeded()) {
			RecordMetadata recordMetadata = done.result();
			LOGGER.info("Record sent: msg={}, destination={}, partition={}, offset={}", record.value(), recordMetadata.getTopic(), recordMetadata.getPartition(), recordMetadata.getOffset());
			o.setId(recordMetadata.getOffset());
			o.setStatus(OrderStatus.PROCESSING);
		} else {
			Throwable t = done.cause();
			LOGGER.error("Error sent to topic: {}", t.getMessage());
			o.setStatus(OrderStatus.REJECTED);
		}
		rc.response().end(Json.encodePrettily(o));
	});
});
vertx.createHttpServer().requestHandler(router::accept).listen(8090);

Building consumer applications

The consumer configuration is very similar to that for producer. We also have to set connection settings and class using for deserialization. There is one interesting setting, which has been defined for the consumer in the fragment of code visible below. It is auto.offset.reset (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG). It sets the initial offset in Kafka for the customer during initialization. If you would like to read all records from the beginning of stream use value earliest. If you would like to processes only the newest records (received after application startup) set that property to latest. Because in our case Kafka acts as a message broker, it is set to latest.

Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = KafkaConsumer.create(vertx, config);

As you probably remember we have three different application that subscribe to the topic. The first of them, implemented under the module all-order-processor consumes all the events incoming to the the topic. This implemementation is relatively the simplest. We only need to invoke subscribe method and pass the name of topic as a parameter. Then every incoming message is processed by handler method.

consumer.subscribe("orders-out", ar -> {
	if (ar.succeeded()) {
		LOGGER.info("Subscribed");
	} else {
		LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
	}
});

consumer.handler(record -> {
	LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
	Order order = Json.decodeValue(record.value(), Order.class);
	order.setStatus(OrderStatus.DONE);
	LOGGER.info("Order processed: id={}, price={}", order.getId(), order.getPrice());
});

The implementation of consuming method for the other applications is a little more complicated. Besides defining target topic, every consumer can ask for a specific partition. The application multiple-order-processor subscribes to partition 1, while multiple-order-processor to partition 0.

TopicPartition tp = new TopicPartition().setPartition(1).setTopic("orders-out");
consumer.assign(tp, ar -> {
	if (ar.succeeded()) {
		LOGGER.info("Subscribed");
		consumer.assignment(done1 -> {
			if (done1.succeeded()) {
				for (TopicPartition topicPartition : done1.result()) {
					LOGGER.info("Partition: topic={}, number={}", topicPartition.getTopic(), topicPartition.getPartition());
				}
			} else {
				LOGGER.error("Could not assign partition: err={}", done1.cause().getMessage());
			}
		});
	} else {
		LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
	}
});

The implamentation of handle method inside multiple-order-processor is pretty interesting. If it receives order with non-empty field relatedOrderId it tries to find it in the historical records stored in topic. It may achieved by calling seek method on KafkaConsumer.

consumer.handler(record -> {
	LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
	Order order = Json.decodeValue(record.value(), Order.class);
	if (ordersWaiting.containsKey(record.offset())) {
		LOGGER.info("Related order found: id={}, price={}", order.getId(), order.getPrice());
		LOGGER.info("Current price: price={}", order.getPrice() + ordersWaiting.get(record.offset()).getPrice());
		consumer.seekToEnd(tp);
	}

	if (order.getRelatedOrderId() != null && !ordersWaiting.containsKey(order.getRelatedOrderId())) {
		ordersWaiting.put(order.getRelatedOrderId(), order);
		consumer.seek(tp, order.getRelatedOrderId());
	}
});

Testing

Now it is time to launch our applications. You may run the main classes from your IDE or build the whole project using mvn clean install command and then run it with java -jar. Also run two instances of all-order-processor in order to check out how a consumer groups mechanism works in practice.

Let’s send some test requests to the order-service in the following sequence.

curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":200}' http://localhost:8090/order
{"id":0,"type":"SINGLE","status":"PROCESSING","price":200}
curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":300}' http://localhost:8090/order
{"id":1,"type":"SINGLE","status":"PROCESSING","price":300}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":400}' http://localhost:8090/order
{"id":0,"type":"MULTIPLE","status":"PROCESSING","price":400}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId" :0}' http://localhost:8090/order
{"id":1,"type":"MULTIPLE","status":"PROCESSING","price":500}

Here’s log from producer application.

2018-01-30 11:08:48 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":200}, destination=orders-out, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":300}, destination=orders-out, partition=0, offset=1
2018-01-30 11:09:08 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":400}, destination=orders-out, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, destination=orders-out, partition=1, offset=1

Here’s log from single-order-processor. It has processed only messages from partition 0.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from multiple-order-processor. It has processed only messages from partition 1.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Here’s log from first instance of all-order-processor.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from second instance of all-order-processor. It may be a little bit surprising for you. But, if you run two instances of consumer, which listens for the whole topic each instance would process message from the single partition.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Summary

In this article I was trying to give you a little bit of messaging with Apache Kafka. Such concepts like consumer groups or partitioning are something what makes it different from traditional messaging solutions. Kafka is widely adopted product, which can acts as storage, messaging system or stream processor. Together with popular JVM based toolkit Vert.x it may be really powerful, fast and lightweight solution for your applications that exchanges messages between each other. The key concepts introduced by Kafka has been adopted by Spring Cloud Stream, which makes them as an opinionated choice for creating messaging microservices.