Testing microservices on OpenShift using Arquillian Cube

I had a touch with Arquillian framework for the first time when I was building the automated end-to-end tests for JavaEE based applications. At that time testing applications deployed on JavaEE servers was not very comfortable. Arquillian came with nice solution for that problem. It has been providing useful mechanisms for testing EJBs deployed on an embedded application server.
Currently, Arquillian provides multiple modules dedicated for different technologies and use cases. One of these modules is Arquillian Cube. With this extension you can create integration/functional tests running on Docker containers or even more advanced orchestration platforms like Kubernetes or OpenShift.
In this article I’m going to show you how to use Arquillian Cube for building integration tests for applications running on OpenShift platform. All the examples would be deployed locally on Minishift. Here’s the full list of topics covered in this article:

  • Using Arquillian Cube for deploying, and running applications on Minishift
  • Testing applications deployed on Minishift by calling their REST API exposed using OpenShift routes
  • Testing inter-service communication between deployed applications basing on Kubernetes services

Before reading this article it is worth to consider reading two of my previous articles about Kubernetes and OpenShift:

The following picture illustrates the architecture of currently discussed solution. We will build and deploy two sample applications on Minishift. They integrate with NoSQL database, which is also ran as a service on OpenShift platform.

arquillian-1

Now, we may proceed to the development.

1. Including Arquillian Cube dependencies

Before including dependencies to Arquillian Cube libraries we should define dependency management section in our pom.xml. It should contain BOM of Arquillian framework and also of its Cube extension.

<dependencyManagement>
     <dependencies>
          <dependency>
                <groupId>org.arquillian.cube</groupId>
                <artifactId>arquillian-cube-bom</artifactId>
                <version>1.15.3</version>
                <scope>import</scope>
                <type>pom</type>
          </dependency>
          <dependency>
                <groupId>org.jboss.arquillian</groupId>
                <artifactId>arquillian-bom</artifactId>
                <version>1.4.0.Final</version>
                <scope>import</scope>
                <type>pom</type>
          </dependency>
     </dependencies>
</dependencyManagement>

Here’s the list of libraries used in my sample project. The most important thing is to include starter for Arquillian Cube OpenShift extension, which contains all required dependencies. It is also worth to include arquillian-cube-requirement artifact if you would like to annotate test class with @RunWith(ArquillianConditionalRunner.class), and openshift-client in case you would like to use Fabric8 OpenShiftClient.

<dependency>
     <groupId>org.jboss.arquillian.junit</groupId>
     <artifactId>arquillian-junit-container</artifactId>
     <version>1.4.0.Final</version>
     <scope>test</scope>
</dependency>
<dependency>
     <groupId>org.arquillian.cube</groupId>
     <artifactId>arquillian-cube-requirement</artifactId>
     <scope>test</scope>
</dependency>
<dependency>
     <groupId>org.arquillian.cube</groupId>
     <artifactId>arquillian-cube-openshift-starter</artifactId>
     <scope>test</scope>
</dependency>
<dependency>
     <groupId>io.fabric8</groupId>
     <artifactId>openshift-client</artifactId>
     <version>3.1.12</version>
     <scope>test</scope>
</dependency>

2. Running Minishift

I gave you a detailed instruction how to run Minishift locally in my previous articles about OpenShift. Here’s the full list of commands that should be executed in order to start Minishift, reuse Docker daemon managed by Minishift and create test namespace (project).

$ minishift start --vm-driver=virtualbox --memory=2G
$ minishift docker-env
$ minishift oc-env
$ oc login -u developer -p developer
$ oc new-project sample-deployment

We also have to create Mongo database service on OpenShift. OpenShift platform provides an easily way of deploying built-in services via web console available at https://192.168.99.100:8443. You can select there the required service on main dashboard, and just confirm the installation using default properties. Otherwise, you would have to provide YAML template with deployment configuration, and apply it to Minishift using oc command. YAML file will be also required if you decide to recreate namespace on every single test case (explained in the subsequent text in Step 3). I won’t paste here content of the template with configuration for creating MongoDB service on Minishift. This file is available in my GitHub repository in the /openshift/mongo-deployment.yaml file. To access that file you need to clone repository sample-vertx-kubernetes and switch to branch openshift (https://github.com/piomin/sample-vertx-kubernetes/tree/openshift-tests). It contains definitions of secret, persistentVolumeClaim, deploymentConfig and service.

arquillian-2

3. Configuring connection with Minishift for Arquillian

All the Arquillian configuration settings should be provided in arquillian.xml file located in src/test/resources directory. When running Arquillian tests on Minishift you generally have two approaches that may be applied. You can create new namespace per every test suite and then remove it after the test or just use the existing one, and then remove all the created components within the selected namespace. First approach is set by default for every test until you modify it inside Arquillian configuration file using namespace.use.existing and namespace.use.current properties.

<extension qualifier="openshift">
	<property name="namespace.use.current">true</property>
	<property name="namespace.use.existing">sample-deployment</property>
	<property name="kubernetes.master">https://192.168.99.100:8443</property>
	<property name="cube.auth.token">EMNHP8QIB4A_VU4kE_vQv8k9he_4AV3GTltrzd06yMU</property>
</extension>

You also have to set Kubernetes master address and API token. In order to obtain token just run the following command.

$ oc whoami -t
EMNHP8QIB4A_VU4kE_vQv8k9he_4AV3GTltrzd06yMU

4. Building Arquillian JUnit test

Every JUnit test class should be annotated with @RequiresOpenshift. It should also have runner set. In this case it is ArquillianConditionalRunner. The test method testCustomerRoute applies the configuration passed inside file deployment.yaml, which is assigned to the method using @Template annotation.
The important part of this unit test is route’s URL declaration. We have to annotate it with the following annotation:

  • @RouteURL – it searches for a route with a name defined using value parameter and inject it into URL object instance
  • @AwaitRoute – if you do not declare this annotation the test will finish just after running, because deployment on OpenShift is processed asynchronously. @AwaitRoute will force test to wait until route is available on Minishift. We can set the timeout of waiting for route (in this case it is 2 minutes) and route’s path. Especially route’s path is very important here, without it our test won’t locate the route and finished with 2 minutes timeout.

The test method is very simple. In fact, I only send POST request with JSON object to the endpoint assigned to the customer-route route and verify if HTTP status code is 200. Because I had a problem with injecting route’s URL (in fact it doesn’t work for my sample with Minishift v3.9.0, while it works with Minishift v3.7.1) I needed to prepare it manually in the code. If it works properly we could use URL url instance for that.

@Category(RequiresOpenshift.class)
@RequiresOpenshift
@RunWith(ArquillianConditionalRunner.class)
public class CustomerServiceApiTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(CustomerServiceApiTest.class);

    @ArquillianResource
    OpenShiftAssistant assistant;
    @ArquillianResource
    OpenShiftClient client;

    @RouteURL(value = "customer-route")
    @AwaitRoute(timeoutUnit = TimeUnit.MINUTES, timeout = 2, path = "/customer")
    private URL url;

    @Test
    @Template(url = "classpath:deployment.yaml")
    public void testCustomerRoute() {
        OkHttpClient httpClient = new OkHttpClient();
        RequestBody body = RequestBody.create(MediaType.parse("application/json"), "{\"name\":\"John Smith\", \"age\":33}");
        Request request = new Request.Builder().url("http://customer-route-sample-deployment.192.168.99.100.nip.io/customer").post(body).build();
        try {
            Response response = httpClient.newCall(request).execute();
            LOGGER.info("Test: response={}", response.body().string());
            Assert.assertNotNull(response.body());
            Assert.assertEquals(200, response.code());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

5. Preparing deployment configuration

Before running the test we have to prepare template with configuration, which is loaded by Arquillian Cube using @Template annotation. We need to create deploymentConfig, inject there MongoDB credentials stored in secret object, and finally expose the service outside container using route object.

kind: Template
apiVersion: v1
metadata:
  name: customer-template
objects:
  - kind: ImageStream
    apiVersion: v1
    metadata:
      name: customer-image
    spec:
      dockerImageRepository: piomin/customer-vertx-service
  - kind: DeploymentConfig
    apiVersion: v1
    metadata:
      name: customer-service
    spec:
      template:
        metadata:
          labels:
            name: customer-service
        spec:
          containers:
          - name: customer-vertx-service
            image: piomin/customer-vertx-service
            ports:
            - containerPort: 8090
              protocol: TCP
            env:
            - name: DATABASE_USER
              valueFrom:
                secretKeyRef:
                  key: database-user
                  name: mongodb
            - name: DATABASE_PASSWORD
              valueFrom:
                secretKeyRef:
                  key: database-password
                  name: mongodb
            - name: DATABASE_NAME
              valueFrom:
                secretKeyRef:
                  key: database-name
                  name: mongodb
      replicas: 1
      triggers:
      - type: ConfigChange
      - type: ImageChange
        imageChangeParams:
          automatic: true
          containerNames:
          - customer-vertx-service
          from:
            kind: ImageStreamTag
            name: customer-image:latest
      strategy:
        type: Rolling
      paused: false
      revisionHistoryLimit: 2
      minReadySeconds: 0
  - kind: Service
    apiVersion: v1
    metadata:
      name: customer-service
    spec:
      ports:
      - name: "web"
        port: 8090
        targetPort: 8090
      selector:
        name: customer-service
  - kind: Route
    apiVersion: v1
    metadata:
      name: customer-route
    spec:
      path: "/customer"
      to:
        kind: Service
        name: customer-service

6. Testing inter-service communication

In the sample project the communication with other microservices is realized by Vert.x WebClient. It takes Kubernetes service name and its container port as parameters. It is implemented inside customer-service by AccountClient, which is then invoked inside Vert.x HTTP route implementation. Here’s AccountClient implementation.

public class AccountClient {

	private static final Logger LOGGER = LoggerFactory.getLogger(AccountClient.class);
	
	private Vertx vertx;

	public AccountClient(Vertx vertx) {
		this.vertx = vertx;
	}
	
	public AccountClient findCustomerAccounts(String customerId, Handler<AsyncResult<List>> resultHandler) {
		WebClient client = WebClient.create(vertx);
		client.get(8095, "account-service", "/account/customer/" + customerId).send(res2 -> {
			LOGGER.info("Response: {}", res2.result().bodyAsString());
			List accounts = res2.result().bodyAsJsonArray().stream().map(it -> Json.decodeValue(it.toString(), Account.class)).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		});
		return this;
	}
	
}

Endpoint GET /account/customer/:customerId exposed by account-service is called within implementation of method GET /customer/:id exposed by customer-service. This time we create new namespace instead using the existing one. That’s why we have to apply MongoDB deployment configuration before applying configuration of sample services. We also need to upload configuration of account-service that is provided inside account-deployment.yaml file. The rest part of JUnit test is pretty similar to the test described in Step 4. It waits until customer-route is available on Minishift. The only differences are in calling URL and dynamic injection of namespace into route’s URL.

@Category(RequiresOpenshift.class)
@RequiresOpenshift
@RunWith(ArquillianConditionalRunner.class)
@Templates(templates = {
        @Template(url = "classpath:mongo-deployment.yaml"),
        @Template(url = "classpath:deployment.yaml"),
        @Template(url = "classpath:account-deployment.yaml")
})
public class CustomerCommunicationTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(CustomerCommunicationTest.class);

    @ArquillianResource
    OpenShiftAssistant assistant;

    String id;
    
    @RouteURL(value = "customer-route")
    @AwaitRoute(timeoutUnit = TimeUnit.MINUTES, timeout = 2, path = "/customer")
    private URL url;

    // ...

    @Test
    public void testGetCustomerWithAccounts() {
        LOGGER.info("Route URL: {}", url);
        String projectName = assistant.getCurrentProjectName();
        OkHttpClient httpClient = new OkHttpClient();
        Request request = new Request.Builder().url("http://customer-route-" + projectName + ".192.168.99.100.nip.io/customer/" + id).get().build();
        try {
            Response response = httpClient.newCall(request).execute();
            LOGGER.info("Test: response={}", response.body().string());
            Assert.assertNotNull(response.body());
            Assert.assertEquals(200, response.code());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

You can run the test using your IDE or just by executing command mvn clean install.

Conclusion

Arquillian Cube comes with gentle solution for integration testing over Kubernetes and OpenShift platforms. It is not difficult to prepare and upload configuration with database and microservices and then deploy it on OpenShift node. You can event test communication between microservices just by deploying dependent application with OpenShift template.

Advertisements

Quick guide to deploying Java apps on OpenShift

In this article I’m going to show you how to deploy your applications on OpenShift (Minishift), connect them with other services exposed there or use some other interesting deployment features provided by OpenShift. Openshift is built on top of Docker containers and the Kubernetes container cluster orchestrator. Currently, it is the most popular enterprise platform basing on those two technologies, so it is definitely worth examining it in more details.

1. Running Minishift

We use Minishift to run a single-node OpenShift cluster on the local machine. The only prerequirement before installing MiniShift is the necessity to have a virtualization tool installed. I use Oracle VirtualBox as a hypervisor, so I should set --vm-driver parameter to virtualbox in my running command.

$  minishift start --vm-driver=virtualbox --memory=3G

2. Running Docker

It turns out that you can easily reuse the Docker daemon managed by Minishift, in order to be able to run Docker commands directly from your command line, without any additional installations. To achieve this just run the following command after starting Minishift.

@FOR /f "tokens=* delims=^L" %i IN ('minishift docker-env') DO @call %i

3. Running OpenShift CLI

The last tool, that is required before starting any practical exercise with Minishift is CLI. CLI is available under command oc. To enable it on your command-line run the following commands.

$ minishift oc-env
$ SET PATH=C:\Users\minkowp\.minishift\cache\oc\v3.9.0\windows;%PATH%
$ REM @FOR /f "tokens=*" %i IN ('minishift oc-env') DO @call %i

Alternatively you can use OpenShift web console which is available under port 8443. On my Windows machine it is by default launched under address 192.168.99.100.

4. Building Docker images of the sample applications

I prepared the two sample applications that are used for the purposes of presenting OpenShift deployment process. These are simple Java, Vert.x applications that provide HTTP API and store data in MongoDB. However, a technology is not very important now. We need to build Docker images with these applications. The source code is available on GitHub (https://github.com/piomin/sample-vertx-kubernetes.git) in branch openshift (https://github.com/piomin/sample-vertx-kubernetes/tree/openshift). Here’s sample Dockerfile for account-vertx-service.

FROM openjdk:8-jre-alpine
ENV VERTICLE_FILE account-vertx-service-1.0-SNAPSHOT.jar
ENV VERTICLE_HOME /usr/verticles
ENV DATABASE_USER mongo
ENV DATABASE_PASSWORD mongo
ENV DATABASE_NAME db
EXPOSE 8095
COPY target/$VERTICLE_FILE $VERTICLE_HOME/
WORKDIR $VERTICLE_HOME
ENTRYPOINT ["sh", "-c"]
CMD ["exec java -jar $VERTICLE_FILE"]

Go to account-vertx-service directory and run the following command to build image from a Dockerfile visible above.

$ docker build -t piomin/account-vertx-service .

The same step should be performed for customer-vertx-service. After it you have two images built, both in the same version latest, which now can be deployed and ran on Minishift.

5. Preparing OpenShift deployment descriptor

When working with OpenShift, the first step of application’s deployment is to create YAML configuration file. This file contains basic information about deployment like containers used for running applications (1), scaling (2), triggers that drive automated deployments in response to events (3) or a strategy of deploying your pods on the platform (4).

Deployment configurations can be managed with the oc command like any other resource. You can create new configuration or update the existing one by using oc apply command.

$ oc apply -f account-deployment.yaml

You can be surprised a little, but this command does not trigger any build and does not start the pods. In fact, you have only created a resource of type deploymentConfig, which may be describes deployment process. You can start this process using some other oc commands, but first let’s take a closer look on the resources required by our application.

6. Injecting environment variables

As I have mentioned before, our sample applications uses external datasource. They need to open the connection to the existing MongoDB instance in order to store there data passed using HTTP endpoints exposed by the application. Here’s MongoVerticle class, which is responsible for establishing client connection with MongoDB. It uses environment variables for setting security credentials and database name.

public class MongoVerticle extends AbstractVerticle {

	@Override
	public void start() throws Exception {
		ConfigStoreOptions envStore = new ConfigStoreOptions()
				.setType("env")
				.setConfig(new JsonObject().put("keys", new JsonArray().add("DATABASE_USER").add("DATABASE_PASSWORD").add("DATABASE_NAME")));
		ConfigRetrieverOptions options = new ConfigRetrieverOptions().addStore(envStore);
		ConfigRetriever retriever = ConfigRetriever.create(vertx, options);
		retriever.getConfig(r -> {
			String user = r.result().getString("DATABASE_USER");
			String password = r.result().getString("DATABASE_PASSWORD");
			String db = r.result().getString("DATABASE_NAME");
			JsonObject config = new JsonObject();
			config.put("connection_string", "mongodb://" + user + ":" + password + "@mongodb/" + db);
			final MongoClient client = MongoClient.createShared(vertx, config);
			final AccountRepository service = new AccountRepositoryImpl(client);
			ProxyHelper.registerService(AccountRepository.class, vertx, service, "account-service");
		});
	}

}

MongoDB is available in the OpenShift’s catalog of predefined Docker images. You can easily deploy it on your Minishift instance just by clicking “MongoDB” icon in “Catalog” tab. Username and password will be automatically generated if you do not provide them during deployment setup. All the properties are available as deployment’s environment variables and are stored as secrets/mongodb, where mongodb is the name of the deployment.

openshift-1

Environment variables can be easily injected into any other deployment using oc set command, and therefore they are injected into the pod after performing deployment process. The following command inject all secrets assigned to mongodb deployment to the configuration of our sample application’s deployment.

$ oc set env --from=secrets/mongodb dc/account-service

7. Importing Docker images to OpenShift

A deployment configuration is ready. So, in theory we could have start deployment process. However, we have back for a moment to the deployment config defined in the Step 5. We defined there two triggers that causes a new replication controller to be created, what results in deploying new version of pod. First of them is a configuration change trigger that fires whenever changes are detected in the pod template of the deployment configuration (ConfigChange). The second of them, image change trigger (ImageChange) fires when a new version of the Docker image is pushed to the repository. To be able to watch if an image in repository has been changed, we have to define and create image stream. Such an image stream does not contain any image data, but present a single virtual view of related images, something similar to an image repository. Inside deployment config file we referred to image stream account-vertx-service, so the same name should be provided inside image stream definition. In turn, when setting the spec.dockerImageRepository field we define the Docker pull specification for the image.

Finally, we can create resource on OpenShift platform.

$ oc apply -f account-image.yaml

8. Running deployment

Once a deployment configuration has been prepared, and Docker images has been succesfully imported into repository managed by OpenShift instance, we may trigger the build using the following oc command.

$ oc rollout latest dc/account-service
$ oc rollout latest dc/customer-service

If everything goes fine the new pods should be started for the defined deployments. You can easily check it out using OpenShift web console.

9. Updating image stream

We have already created two image streams related to the Docker repositories. Here’s the screen from OpenShift web console that shows the list of available image streams.

openshift-images

To be able to push a new version of an image to OpenShift internal Docker registry we should first perform docker login against this registry using user’s authentication token. To obtain the token from OpenShift use oc whoami command, and then pass it to your docker login command with -p parameter.

$ oc whoami -t
Sz9_TXJQ2nyl4fYogR6freb3b0DGlJ133DVZx7-vMFM
$ docker login -u developer -p Sz9_TXJQ2nyl4fYogR6freb3b0DGlJ133DVZx7-vMFM https://172.30.1.1:5000

Now, if you perform any change in your application and rebuild your Docker image with latest tag, you have to push that image to image stream on OpenShift. The address of internal registry has been automatically generated by OpenShift, and you can check it out in the image stream’s details. For me, it is 172.30.1.1:5000.

$ docker tag piomin/account-vertx-service 172.30.1.1:5000/sample-deployment/account-vertx-service:latest
$ docker push 172.30.1.1:5000/sample-deployment/account-vertx-service

After pushing new version of Docker image to image stream, a rollout of application is started automatically. Here’s the screen from OpenShift web console that shows the history of account-service application deployments.

openshift-2

Conclusion

I have shown you the further steps of deploying your application on the OpenShift platform. Basing on sample Java application that connects to a database, I illustrated how to inject credentials to that application’s pod entirely transparently for a developer. I also perform an update of application’s Docker image, in order to show how to trigger a new version deployment on image change.

openshift-3

Running Vert.x Microservices on Kubernetes/OpenShift

Automatic deployment, scaling, container orchestration, self-healing are a few of very popular topics in some recent months. This is reflected in the rapidly growing popularity of such tools like Docker, Kubernetes or OpenShift. It’s hard to find any developer who didn’t heard about these technologies. How many of you did setup and run all those tools locally?

Despite appearances, it is not very hard thing to do. Both Kubernetes and OpenShift provide simplified, single-node versions of their platform that allows you to create and try a local cluster, even on Windows.

In this article I’m going to guide you through the all steps that result in deploying and running microservices that communicates with each other and use MongoDB as a data source.

Technologies

Eclipse Vert.x – a toolkit for building reactive applications (and more) on the JVM. It’s a polyglot, event-driven, non blocking and fast framework what makes it the perfect choice for creating light-weight, high-performance microservices.

Kubernetes – is an open-source system for automating deployment, scaling, and management of containerized applications. Now, even Docker platform decided to get support for Kubernetes, although they are promoting their own clustering solution – Docker Swarm. You may easily run it locally using Minikube. However, we won’t use it this time. You can read interesting article about creating Spring Boot microservices and running them on Minikube here: Microservices with Kubernetes and Docker.

RedHat OpenShift – is an open source container application platform build on top of Docker containers and Kubernetes. It is also available online on website https://www.openshift.com/. You may easily run it locally with Minishift.

Getting started with Minishift

Of cource, you can read some tutorials available on RedHat website, but I’ll try to condense an instruction of installation and configuration in a few words. Firstly, I would like to point out that all the instructions will be applied to Windows OS.

Minishift requires a hyper-visor to start the virtual machine, so first you should download and install one of these tools. If you use other solution than Hyper-V, like I do, you would have to pass that driver name during Minishift starting. The command visible below launches it on Oracle VirtualBox and allocates 3GB of RAM memory for VM.

$  minishift start --vm-driver=virtualbox --memory=3G

The executable minishift.exe should be included in the system path. You should also have Docker client binary installed on your machine. Docker daemon is in turn managed by Minishift, so you can reuse it for other use-cases as well. All what you need to do to take an advantage of this functionality is to run the following command in your shell.

$ @FOR /f "tokens=* delims=^L" %i IN ('minishift docker-env') DO @call %i

OpenShift platform my be managed using CLI or web console. To enable CLI on Windows you should add it to the path and then run one command to configure your shell. The description of required steps is displayed after running the following command.

$ minishift oc-env
SET PATH=C:\Users\minkowp\.minishift\cache\oc\v3.7.1\windows;%PATH%
REM Run this command to configure your shell:
REM @FOR /f "tokens=*" %i IN ('minishift oc-env') DO @call %i

In order to use web console just run command $ minishift console, which automatically opens it in your web browser. For me, it is available under address https://192.168.99.100:8443/console. To check your ip just execute $ minishift ip.

Sample applications

The source code of sample applications is available on GitHub (https://github.com/piomin/sample-vertx-kubernetes.git). In fact, the similar application have been ran locally and described in the article Asynchronous Microservices with Vert.x. This article can be treated as an introduction to building microservices with Vert.x framework and to to Vert.x framework in general. The current application is even simpler, because it does not have to integrate with any external discovery server like Consul.

Now, let’s take a look on the code below. It declares a verticle that establishes a client connection to MongoDB and registers repository object as a proxy service. Such a service may be easily accessed by another verticle. MongoDB network address is managed by Minishift.

public class MongoVerticle extends AbstractVerticle {

	@Override
	public void start() throws Exception {
		JsonObject config = new JsonObject();
		config.put("connection_string", "mongodb://micro:micro@mongodb/microdb");
		final MongoClient client = MongoClient.createShared(vertx, config);
		final AccountRepository service = new AccountRepositoryImpl(client);
		ProxyHelper.registerService(AccountRepository.class, vertx, service, "account-service");
	}

}

That verticle can be deployed in the application’s main method. It is also important to set property vertx.disableFileCPResolving to true, if you would like to run your application on Minishift. It forces Vert.x to resolve file from the its classloader in addition from the file system.

public static void main(String[] args) throws Exception {
	System.setProperty("vertx.disableFileCPResolving", "true");
	Vertx vertx = Vertx.vertx();
	vertx.deployVerticle(new MongoVerticle());
	vertx.deployVerticle(new AccountServer());
}

AccountServer verticle contains simple API methods that performs CRUD operations on MongoDB.

Building Docker image

Assuming you have successfully installed and configured Minishift, and cloned my sample Maven project shared on GitHub, you may proceed to the build and deploy stage. The first step is to build the applications from source code by executing mvn clean install command on the root project. It consists of two independent modules: account-vert-service, customer-vertx-service. Each of these modules contains Dockerfile with image definition. Here’s the one created for customer-vertx-service. It is based openjdk:8-jre-alpine image. Alpine Linux is much smaller than most distribution base images, so our result image would have around 100MB, instead around 600MB if using standard OpenJDK image. Because we are generating Fat JAR files during Maven build we only have to run application inside container using java -jar command.

FROM openjdk:8-jre-alpine
ENV VERTICLE_FILE customer-vertx-service-1.0-SNAPSHOT.jar
ENV VERTICLE_HOME /usr/verticles
EXPOSE 8090
COPY target/$VERTICLE_FILE $VERTICLE_HOME/
WORKDIR $VERTICLE_HOME
ENTRYPOINT ["sh", "-c"]
CMD ["exec java -jar $VERTICLE_FILE"]

Once we have successfully build the project, we should navigate to the main directory of every module. The sample command visible below builds Docker image of customer-vertx-service.

$ docker build -t microservices/customer-vertx-service:1.0 .

In fact, there are some different approaches of building and deploying microservices on OpenShift. For example, we could use Maven plugin or OpenShift definition file. Currently discussed way of deploying application is obviously one the simplest, and it assumes using CLI and web console for configuring deployments and services.

Deploy application on Minishift

Before proceeding to the main part of that article including deploy and run application on Minishift we have to provide some pre-configuration. We have to begin from logging into OpenShift and creating new project with oc command. Here are two required CLI commands. The name of our first OpenShift project is microservices.

$ oc login -u developer -p developer
$ oc new-project microservices

We might as well perform the same actions using web console. After succesfully login there first you will see a dashboard with all available services brokered by Minishift. Let’s initialize a container with MongoDB. All the provided container settings should the same as configured inside application. After creating MongoDB service would available for all other services under mongodb name.

minishift-1

Creating MongoDB container managed by Minishift is only a part of a success. The most important thing is to deploy containers with two sample microservices, where each of them would have access to the database. Here as well, we may leverage two methods of resources creation: by CLI or via web console. Here are some CLI commands for creating deployment on OpenShift.

$ oc new-app --docker-image microservices/customer-vertx-service:1.0
$ oc new-app --docker-image microservices/account-vertx-service:1.0

The commands visible above create not only deployment, but also creates pods, and exposes each of them as a service. Now yoiu may easily scale number of running pods by executing the following command.

oc scale --replicas=2 dc customer-vertx-service
oc scale --replicas=2 dc account-vertx-service

The next step is to expose your service outside a container to make it publicly visible. We can achieve it by creating a route. OpenShift route is in fact Kubernetes ingress. OpenShift web console provides an interface for creating routes available under section Applications -> Routes. When defining new route you should enter its name, a name of a service, and a path on the basis of which requets are proxied. If hostname is not specified, it is automatically generated by OpenShift.

minishift-2

Now, let’s take a look on web console dashboard. There are three applications deployed: mongodb-persistent, account-vertx-service and customer-vertx-service. Both Vert.x microservices are scaled up with two running instances (Kubernetes pods), and are exposed under automatically generated hostname with given context path, for example http://account-route-microservices.192.168.99.100.nip.io/account.

minishift-3

You may check the details of every deployment by expanding it on the list view.

minishift-4

HTTP API is available outside and can be easily tested. Here’s the source code with REST API implementation for account-vertx-service.

AccountRepository repository = AccountRepository.createProxy(vertx, "account-service");
Router router = Router.router(vertx);
router.route("/account/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create());
router.get("/account/:id").produces("application/json").handler(rc -> {
	repository.findById(rc.request().getParam("id"), res -> {
		Account account = res.result();
		LOGGER.info("Found: {}", account);
		rc.response().end(account.toString());
	});
});
router.get("/account/customer/:customer").produces("application/json").handler(rc -> {
	repository.findByCustomer(rc.request().getParam("customer"), res -> {
		List accounts = res.result();
		LOGGER.info("Found: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.get("/account").produces("application/json").handler(rc -> {
	repository.findAll(res -> {
		List accounts = res.result();
		LOGGER.info("Found all: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.post("/account").produces("application/json").handler(rc -> {
	Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
	repository.save(a, res -> {
		Account account = res.result();
		LOGGER.info("Created: {}", account);
		rc.response().end(account.toString());
	});
});
router.delete("/account/:id").handler(rc -> {
	repository.remove(rc.request().getParam("id"), res -> {
		LOGGER.info("Removed: {}", rc.request().getParam("id"));
		rc.response().setStatusCode(200);
	});
});
vertx.createHttpServer().requestHandler(router::accept).listen(8095);

Inter-service communication

All the microservices are deployed and exposed outside the container. The last thing that we still have to do is provide a communication between them. In our sample system customer-vertx-service calls endpoint exposed by account-vertx-service. Thanks to Kubernetes services mechanism we may easily call another service from application’s container, for example using simple HTTP client implementation. Let’s take a look on the list of services exposed by Kubernetes.

minishift-6

Here’s client’s implementation responsible for communication with account-vertx-service. Vert.x WebClient takes three parameters when calling GET method: port, hostname and path. We should set a Kubernetes service name as a hostname paramater, and default container’s port as a port.

public class AccountClient {

	private static final Logger LOGGER = LoggerFactory.getLogger(AccountClient.class);

	private Vertx vertx;

	public AccountClient(Vertx vertx) {
		this.vertx = vertx;
	}

	public AccountClient findCustomerAccounts(String customerId, Handler<AsyncResult<List>> resultHandler) {
		WebClient client = WebClient.create(vertx);
		client.get(8095, "account-vertx-service", "/account/customer/" + customerId).send(res2 -> {
			LOGGER.info("Response: {}", res2.result().bodyAsString());
			List accounts = res2.result().bodyAsJsonArray().stream().map(it -> Json.decodeValue(it.toString(), Account.class)).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		});
		return this;
	}

}

AccountClient is invoked inside customer-vertx-service GET /customer/:id endpoint’s implementation.

router.get("/customer/:id").produces("application/json").handler(rc -> {
	repository.findById(rc.request().getParam("id"), res -> {
		Customer customer = res.result();
		LOGGER.info("Found: {}", customer);
		new AccountClient(vertx).findCustomerAccounts(customer.getId(), res2 -> {
			customer.setAccounts(res2.result());
			rc.response().end(customer.toString());
		});
	});
});

Summary

It is no coincidence that OpenShift is considered as the leading enterprise distribution of Kubernetes. It adds several helpful features to Kubernetes that simplify adopting it for developers and operation teams. You can easily try such features like CI/CD for DevOps, multiple projects with collaboration, networking, log aggregation from multiple pods on your local machine with Minishift.

Partitioning with Apache Kafka and Vert.x

Apache Kafka is a distributed streaming platform. It also may act as a messaging system in your architecture. Traditional message brokers provides two models of communication: queuing and publish-subscribe (topics). Queues are used for point-to-point messaging, while topics allows you broadcast data to multiple target consumers. Kafka does not provide queuing mechanism directly. However, it introduces the consumer group concept, which generalizes both queuing and publish-subscribe models. The consumer group mechanisms guarantees that a single message would be processed by the only one consumer that belongs to the given group. It is especially useful when you have more than one instance of your service, which listens for messages incoming to the topic. That feature makes your consumers to behave as queuing clients within the same group.

Eclipse Vert.x is a lightweight and fast toolkit for building reactive applications on the JVM. I have already introduced that solution is the some of my previous posts, for example Asynchronous Microservices with Vert.x. Vert.x does not force you to implement a reactive application. You may create a standard service, which processes the HTTP requests asynchronously in accordance with Asynchronous I/O concept.

The purpose of this article

The main purpose of this article is to show you the main features of Apache Kafka, that may be useful when creating applications consuming messages. The Java client’s library choice is not a key point here. However, in my opinion Vert.x that is asynchronous, high performance framework perfectly matches to Apache Kafka. It provides Vert.x Kafka client, which allows you to read and send messages from/to an Kafka cluster. Before we proceed to the sample, let’s first dive into the core abstraction of Kafka.

Kafka topic

I’m assuming you excellent know what topic is and what is its main role. The every message incoming to the topic goes to every subscriber. What is the main difference between Kafka and standard topic provided by other message brokers? Kafka topic is partitioned. Each partition is an ordered, immutable sequence of records. Every record can be uniquecly identified within the partition by a sequential id number called the offset. The Kafka cluster retains all published records according to the configured retention period.

Consumer may subscribe to the whole topic or only to the selected partition. It can also control the offset from where it starts processing data. For example, it is able to reset offset in order reprocess data from the past or just or skip ahead to the most recent record to consume only messages currently sent to the topic. Here’s the figure that illustrates a single partition structure with producer and consumers listening for the incoming data.

Kafka-1

Sample architecture

Let me say some words about the sample system architecture. Its source code is available on GitHub (https://github.com/piomin/sample-vertx-kafka-messaging.git). In accordance of the principle that one picture speaks more than a thousand words, the diagram illustrating the architecture of our system is visible below. We have one topic created on Kafka platform, that consists of two partitions. There is one client application that exposes REST API allowing to send orders into the system and then forwarding them into the topic. The target partition is calculated basing on the type of order. We may create orders with types SINGLE and MULTIPLE. There are also some applications that consumes data from topic. First of them single-order-processor reads data from partition 0, the second multiple-order-processor from partition 1, and the last all-order-processor does not choose any partition.

kafka-2

Running Kafka

To run Apache Kafka on the local machine we may use its Docker image. The image shared by Spotify also starts ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100.

docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.99.100 --env ADVERTISED_PORT=9092 spotify/kafka

However, that option assumes the topics would be automatically created during application startup. I’ve got some problems with it while creating multi-partitions topic. There is also another image ches/kafka, which requires starting ZooKeeper separately, but provides Kafka client interface.

docker run -d --name zookeeper -p 2181:2181 zookeeper
docker run -d --name kafka -p 9092:9092 -p 7203:7203 --network kafka-net --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env ZOOKEEPER_IP=192.168.99.100 ches/kafka

Finally, we can run ches/kafka container in client mode and then create topic orders-out with two partitions.

docker run --rm --network kafka-net ches/kafka kafka-topics.sh --create --topic orders-out --replication-factor 1 --partitions 2 --zookeeper 192.168.99.100:2181
Created topic "orders-out".

Building producer application

First, we need to include Maven dependencies to enable Vert.x framework for the application. If the application exposes RESTful HTTP API you should include vertx-web. Library vertx-kafka-client has to be included to all the sample modules.

To start Vert.x as Java application we have to create verticle by extending AbstractVerticle. Then the verticle needs to be deployed in the main method using Vertx object. For more details about Vert.x and verticles concept you may refer to one of my previous article mentioned in the preface.

public class OrderVerticle extends AbstractVerticle {

	public static void main(String[] args) {
		Vertx vertx = Vertx.vertx();
		vertx.deployVerticle(new OrderVerticle());
	}

}

The next step is to define producer using KafkaProducer interface. We have to provide connection settings and serializer implementation class. You can choose between various built-in serializer implemementations. The most suitable for me was JsonObjectSerializer, which requires JsonObject as an input parameter.

Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer producer = KafkaProducer.create(vertx, config);

The procuder is invoked inside POST method route definition. It returnes an asynchronous response with a status after sending message to the topic. The message is created using KafkaProducerRecord interface. It takes topic’s name, request object and partition number as the parameters. As you may see in the fragment of code below, partition number is calculated on the basis order type (o.getType().ordinal()).

Router router = Router.router(vertx);
router.route("/order/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/order").handler(BodyHandler.create());
router.post("/order").produces("application/json").handler(rc -> {
	Order o = Json.decodeValue(rc.getBodyAsString(), Order.class);
	KafkaProducerRecord record = KafkaProducerRecord.create("orders", null, rc.getBodyAsJson(), o.getType().ordinal());
	producer.write(record, done -> {
		if (done.succeeded()) {
			RecordMetadata recordMetadata = done.result();
			LOGGER.info("Record sent: msg={}, destination={}, partition={}, offset={}", record.value(), recordMetadata.getTopic(), recordMetadata.getPartition(), recordMetadata.getOffset());
			o.setId(recordMetadata.getOffset());
			o.setStatus(OrderStatus.PROCESSING);
		} else {
			Throwable t = done.cause();
			LOGGER.error("Error sent to topic: {}", t.getMessage());
			o.setStatus(OrderStatus.REJECTED);
		}
		rc.response().end(Json.encodePrettily(o));
	});
});
vertx.createHttpServer().requestHandler(router::accept).listen(8090);

Building consumer applications

The consumer configuration is very similar to that for producer. We also have to set connection settings and class using for deserialization. There is one interesting setting, which has been defined for the consumer in the fragment of code visible below. It is auto.offset.reset (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG). It sets the initial offset in Kafka for the customer during initialization. If you would like to read all records from the beginning of stream use value earliest. If you would like to processes only the newest records (received after application startup) set that property to latest. Because in our case Kafka acts as a message broker, it is set to latest.

Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = KafkaConsumer.create(vertx, config);

As you probably remember we have three different application that subscribe to the topic. The first of them, implemented under the module all-order-processor consumes all the events incoming to the the topic. This implemementation is relatively the simplest. We only need to invoke subscribe method and pass the name of topic as a parameter. Then every incoming message is processed by handler method.

consumer.subscribe("orders-out", ar -> {
	if (ar.succeeded()) {
		LOGGER.info("Subscribed");
	} else {
		LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
	}
});

consumer.handler(record -> {
	LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
	Order order = Json.decodeValue(record.value(), Order.class);
	order.setStatus(OrderStatus.DONE);
	LOGGER.info("Order processed: id={}, price={}", order.getId(), order.getPrice());
});

The implementation of consuming method for the other applications is a little more complicated. Besides defining target topic, every consumer can ask for a specific partition. The application multiple-order-processor subscribes to partition 1, while multiple-order-processor to partition 0.

TopicPartition tp = new TopicPartition().setPartition(1).setTopic("orders-out");
consumer.assign(tp, ar -> {
	if (ar.succeeded()) {
		LOGGER.info("Subscribed");
		consumer.assignment(done1 -> {
			if (done1.succeeded()) {
				for (TopicPartition topicPartition : done1.result()) {
					LOGGER.info("Partition: topic={}, number={}", topicPartition.getTopic(), topicPartition.getPartition());
				}
			} else {
				LOGGER.error("Could not assign partition: err={}", done1.cause().getMessage());
			}
		});
	} else {
		LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
	}
});

The implamentation of handle method inside multiple-order-processor is pretty interesting. If it receives order with non-empty field relatedOrderId it tries to find it in the historical records stored in topic. It may achieved by calling seek method on KafkaConsumer.

consumer.handler(record -> {
	LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
	Order order = Json.decodeValue(record.value(), Order.class);
	if (ordersWaiting.containsKey(record.offset())) {
		LOGGER.info("Related order found: id={}, price={}", order.getId(), order.getPrice());
		LOGGER.info("Current price: price={}", order.getPrice() + ordersWaiting.get(record.offset()).getPrice());
		consumer.seekToEnd(tp);
	}

	if (order.getRelatedOrderId() != null && !ordersWaiting.containsKey(order.getRelatedOrderId())) {
		ordersWaiting.put(order.getRelatedOrderId(), order);
		consumer.seek(tp, order.getRelatedOrderId());
	}
});

Testing

Now it is time to launch our applications. You may run the main classes from your IDE or build the whole project using mvn clean install command and then run it with java -jar. Also run two instances of all-order-processor in order to check out how a consumer groups mechanism works in practice.

Let’s send some test requests to the order-service in the following sequence.

curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":200}' http://localhost:8090/order
{"id":0,"type":"SINGLE","status":"PROCESSING","price":200}
curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":300}' http://localhost:8090/order
{"id":1,"type":"SINGLE","status":"PROCESSING","price":300}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":400}' http://localhost:8090/order
{"id":0,"type":"MULTIPLE","status":"PROCESSING","price":400}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId" :0}' http://localhost:8090/order
{"id":1,"type":"MULTIPLE","status":"PROCESSING","price":500}

Here’s log from producer application.

2018-01-30 11:08:48 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":200}, destination=orders-out, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":300}, destination=orders-out, partition=0, offset=1
2018-01-30 11:09:08 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":400}, destination=orders-out, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, destination=orders-out, partition=1, offset=1

Here’s log from single-order-processor. It has processed only messages from partition 0.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from multiple-order-processor. It has processed only messages from partition 1.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Here’s log from first instance of all-order-processor.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from second instance of all-order-processor. It may be a little bit surprising for you. But, if you run two instances of consumer, which listens for the whole topic each instance would process message from the single partition.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Summary

In this article I was trying to give you a little bit of messaging with Apache Kafka. Such concepts like consumer groups or partitioning are something what makes it different from traditional messaging solutions. Kafka is widely adopted product, which can acts as storage, messaging system or stream processor. Together with popular JVM based toolkit Vert.x it may be really powerful, fast and lightweight solution for your applications that exchanges messages between each other. The key concepts introduced by Kafka has been adopted by Spring Cloud Stream, which makes them as an opinionated choice for creating messaging microservices.

Building Secure APIs with Vert.x and OAuth2

Preface

Today I would like to get back to the subject touched on in the one of my previous articles – Vert.x toolkit. In the post Asynchronous Microservices With Vert.x I described how to develop microservices using Vert.x modules for web application, service discovery, circuit breaker and distributed configuration. I did not mentioned there anything about security aspects, which are usually important when talking about open APIs. It is a time to take a closer look on some Vert.x modules for authentication and authorization. Following description available on vert.io site it provides some simple out of the box implementations for authentication in our applications. There are modules providing auth implementation backed by JDBC, MongoDB and also some supporting solutions like JSON web tokens (JWT), Apache Shiro and OAuth2. Like you probably know OAuth2 is the most common authentication method for APIs provided by Facebook, Twitter or LinkedIn. If you are interested in more details about that authentication method read my article Microservices security with Oauth2, where I described the basics and introduced the simple sample with Spring Security in conjunction with OAuth2 usage.

In the sample application which is available on GitHub under security branch I’m going to present how to provide Oauth2 security for Vertx application using Keycloak and Vert.x OAuth2 module.

Keycloak

For authentication and authorization management we use Keycloak. It is an open source identity and access management solution, which provides mechanisms supporting i.a. OAuth2. Keycloak has web admin console where administrators can manage all aspects of the server. We can easily run it using docker container.

docker run -d --name keycloak -p 38080:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin -e KEYCLOAK_LOGLEVEL=DEBUG jboss/keycloak

Management dashboard is available under http://192.168.99.100:38080/. Let’s begin from creating Client. Client will be used by our application (or rather service) for authenticate itself against Keycloak. In the first step we have to set Client ID and Root URL. Root URL is not needed while using OAuth2 Password Credentials Flow, but rather for Authorization Code Flow. I put there our sample application localhost address.

vertx-sec-1

We should enable options Direct Access Grants and Authorization in the Settings section of newly created client. Also Access Type should be set to confidential and Valid Redirect URIs to the callback address routed inside application (it is explained in the later section).

vertx-sec-2

The last information needed from Client section is a Secret available under Credentials tab.

vertx-sec-3

Now we can proceed to create user with credentials. In the sample I’ll present in the next section we use password credentials flow, so don’t forget to change password on newly created user.

vertx-sec-5

vertx-sec-7

Finally, we set authorities for our user. First, let’s create some roles in Roles section. For me it is view-account, modify-account. For these roles I also enabled Scope Param Required. It means that if client need to obtain that authority it has to send role name in the request scope.

vertx-sec-4

The last step is to assign the roles to our test user piotr.minkowski.

vertx-sec-6

Building application

Vert.x provides the module supporting OAuth2 authorization. We should include the following dependency into our pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-auth-oauth2</artifactId>
	<version>${vertx.version}</version>
</dependency>

We have to begin from defining Keycloak OAuth2Auth provider. We use default realm (1). In additional to the realm name we should set realm public key (2) which is available in the Realm Settings section under Keys tab. We should also set Keycloak Client ID (3) as resource and client secret as credentials (4).

JsonObject keycloakJson = new JsonObject()
	.put("realm", "master") // (1)
	.put("realm-public-key", "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1xVBifXfS1uVM8S14JlyLpXck+0+hBQX258IiL5Fm2rZpkQ5lN9N1tadQdXBKk8V/0SxdTyoX7cpYQkcOs0Rj0XXmX7Lnk56euZwel+3MKAZWA20ld8BCfmDtX4/+VP311USUqR/W8Fd2p/gugKWF6VDMkri92qob1DdrcUiRlD8XYC0pwHwSvyW/3JvE5HeTy3U4vxC+19wHcwzLGNlVOlYPk9mzJHXN+LhZr/Tc7HeAsvVxYDXwOOh+/UWweMkvKy+OSNKG3aWLb92Ni3HejFn9kd4TRHfaapwWg1m5Duf3uqz8WDHbS/LeS4g3gQS0SvcCYI0huSoG3NA/z4K7wIDAQAB") // (2)
	.put("auth-server-url", "http://192.168.99.100:38080/auth")
	.put("ssl-required", "external")
	.put("resource", "vertx-account") // (3)
	.put("credentials", new JsonObject().put("secret", "73b55e04-e562-41ea-b39c-263b7b36945d")); // (4)

OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);

vertx-sec-8

I exposed API method for login which retrieves token from Keycloak using OAuth2FlowType.PASSWORD authentication method.

router.post("/login").produces("application/json").handler(rc -> {
	User u = Json.decodeValue(rc.getBodyAsString(), User.class);
	oauth2.getToken(u.toJson(), res -> {
		if (res.failed()) {
			LOGGER.error("Access token error: {}", res.cause().getMessage());
			rc.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
		} else {
			AccessToken token = res.result();
			LOGGER.info("Access Token: {}", KeycloakHelper.rawAccessToken(token.principal()));
			User user = new User(KeycloakHelper.rawAccessToken(token.principal()));
			rc.response().end(user.toString());
		}
	});
});

I sent the following message to POST /login service.

{"username":"piotr.minkowski", "password":"Piot_123", "scope":"modify-account view-account"}

That is an equivalent to the following Vert.x JsonObject passed as a parameter to OAuth2 getToken method.

new JsonObject().put("username", "piotr.minkowski").put("password", "Piot_123").put("scope", "modify-account view-account")

POST /login method return access token inside JSON object. That token should be passed as Authorization header parameter for every call of a protected resource. Here’s main class with API methods definitions. We begin from creating OAuth2AuthHandler object (1) which is responsible for token validation. It takes OAuth2Auth Keycloak object as a parameter. Then we should set OAuth2AuthHandler as a handler for all methods matching /account/* path (2). If token has been successfully validated we can proceed to authorization. We check if view-account role is assigned to user when calling GET method (3), and modify-account role when calling POST method (4). If using Keycloak for authorization we always have to set prefix to “realm” while invoking isAuthorised method. If the role is realm then the lookup happens in global roles list.

OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);
OAuth2AuthHandler oauth2Handler = (OAuth2AuthHandler) OAuth2AuthHandler.create(oauth2, "http://localhost:2222"); // (1)
Router router = Router.router(vertx);
router.route("/account/*").handler(ResponseContentTypeHandler.create());
router.route("/account/*").handler(oauth2Handler); // (2)
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create());
router.route(HttpMethod.POST, "/login").handler(BodyHandler.create());
oauth2Handler.setupCallback(router.get("/callback"));
router.get("/account/:id").produces("application/json").handler(rc -> {
	rc.user().isAuthorised("realm:view-account", authRes -> { // (3)
		LOGGER.info("Auth: {}", authRes.result());
		if (authRes.result() == Boolean.TRUE) {
			repository.findById(rc.request().getParam("id"), res -> {
				Account account = res.result();
				LOGGER.info("Found: {}", account);
				rc.response().end(account.toString());
			});
		} else {
			rc.response().setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()).end();
		}
	});
});
router.post("/account").produces("application/json").handler(rc -> {
	rc.user().isAuthorised("realm:modify-account", authRes -> { // (4)
		LOGGER.info("Auth: {}", authRes.result());
		if (authRes.result() == Boolean.TRUE) {
			Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
			repository.save(a, res -> {
				Account account = res.result();
				LOGGER.info("Created: {}", account);
				rc.response().end(account.toString());
			});
		} else {
			rc.response().setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()).end();
		}
	});
});

Testing

I created JUnit test case to check if OAuth2 authentication works fine. Vert.x provides library which can be used for testing. It is especially design to work well with asynchronous code. Include the following dependency to your pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-unit</artifactId>
	<version>${vertx.version}</version>
	<scope>test</scope>
</dependency>

Then annotate your JUnit test class with @RunWith(VertxUnitRunner.class). Before running our test method we should deploy verticles. Verticle with REST API is deployed on port 2222.

Vertx vertx;

@Before
public void before(TestContext context) throws IOException {
	vertx = Vertx.vertx();
	vertx.deployVerticle(MongoVerticle.class.getName(), context.asyncAssertSuccess());
	DeploymentOptions options = new DeploymentOptions().setConfig(new JsonObject().put("http.port", 2222));
	vertx.deployVerticle(AccountServer.class.getName(), options, context.asyncAssertSuccess());
}

Here’s JUnit test method. We use WebClient for calling HTTP methods and Vert.x-Unit Async for complete test case on asynchronous calls finishes (3). First, we are calling POST \login method te retrieve access token from Keycloak (1). Then we are calling one of API method and setting Authorization header with access token string retrieved from POST \login method (2). During test case execution verticle with MongoDB (MongoVerticle) and API definition (AccountServer) are deployed and started, but you need to start manually MongoDB database, Consul and Keycloak. I suggest running it with Docker.

@Test
public void testAuth(TestContext context) {
	Async async = context.async();
	WebClient client = WebClient.create(vertx);
	User u = new User("piotr.minkowski", "Piot_123", "modify-account view-account");
	client.post(2222, "localhost", "/login").sendJson(u, ar -> { // (1)
		LOGGER.info("Response code: {}", ar.result().statusCode());
		LOGGER.info("Response: {}", ar.result().bodyAsString());
		if (ar.result().statusCode() == 200) {
			User user = ar.result().bodyAsJson(User.class);
			client.get(2222, "localhost", "/account").putHeader("Authorization", "Bearer " + user.getAccessToken()).send(r -> { // (2)
				LOGGER.info("GET result: {}", r.result().bodyAsString());
				async.complete(); // (3)
			});
		} else {
			async.complete();
		}
	});
}

Final Thoughts

To be honest I have never dealt with Vert.x before the start of work on a series of articles published on my blog. From those couple of days spending on that toolkit recognition I’ll definitely recommend using it when working on REST APIs. Vert.x provides the smart implementation for security with OAuth2. Additionally you can use it in combination with the solution like Keycloak, which is used for identity and access management. As usual there are also some drawbacks. I had a problem with understanding how the authorities exactly work in Vert.x. When I created a role inside Keycloak client it didn’t work in my application. Only global realm role worked fine. However, those problems does not overshadow Vert.x advantages.

Asynchronous Microservices with Vert.x

Preface

I must admit that as soon as I saw Vert.x documentation I liked this concept. This may have happened because I had previously use with very similar framework which I used to create simple and lightweight applications exposing REST APIs – Node.js. It is really fine framework, but has one big disadvantage for me – it is JavaScript runtime. What is worth mentioning Vert.x is polyglot, it supports all the most popular JVM based languages like Java, Scala, Groovy, Kotlin and even JavaScript. These are not all of its advantages. It’s lightweight, fast and modular. I was pleasantly surprised when I added the main Vert.x dependencies to my pom.xml and there was not downloaded many of other dependencies, as is often the case when using Spring Boot framework.
Well, I will not elaborate about the advantages and key concepts of this toolkit. I think you can read more about it in other articles. The most important thing for us is that using Vert.x we can can create high performance and asynchronous microservices based on Netty framework. In addition, we can use standardized microservices mechanisms such as service discovery, configuration server or circuit breaking.

Sample application source code is available on Github. It consists of two modules account-vertx-service and customer-vertx-service. Customer service retrieves data from Consul registry and invokes acccount service API. Architecture of the sample solution is visible on the figure below.

vertx

Building services

To be able to create HTTP service exposing REST API we need to include the following dependency into pom.xml.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-web</artifactId>
	<version>${vertx.version}</version>
</dependency>

Here’s the fragment from account service where I defined all API methods. The first step (1) was to declare Router which is one of the core concepts of Vert.x-Web. A router takes an HTTP request, finds the first matching route for that request, and passes the request to that route. The next step (2), (3) is to add some handlers, for example BodyHandler, which allows you to retrieve request bodies and has been added to POST method. Then we can begin to define API methods (4), (5), (6), (7), (8). And finally (9) we are starting HTTP server on the port retrieved from configuration.

Router router = Router.router(vertx); // (1)
router.route("/account/*").handler(ResponseContentTypeHandler.create()); // (2)
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create()); // (3)
router.get("/account/:id").produces("application/json").handler(rc -> { // (4)
	repository.findById(rc.request().getParam("id"), res -> {
		Account account = res.result();
		LOGGER.info("Found: {}", account);
		rc.response().end(account.toString());
	});
});
router.get("/account/customer/:customer").produces("application/json").handler(rc -> { // (5)
	repository.findByCustomer(rc.request().getParam("customer"), res -> {
		List<Account> accounts = res.result();
		LOGGER.info("Found: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.get("/account").produces("application/json").handler(rc -> { // (6)
	repository.findAll(res -> {
		List<Account> accounts = res.result();
		LOGGER.info("Found all: {}", accounts);
		rc.response().end(Json.encodePrettily(accounts));
	});
});
router.post("/account").produces("application/json").handler(rc -> { // (7)
	Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
	repository.save(a, res -> {
		Account account = res.result();
		LOGGER.info("Created: {}", account);
		rc.response().end(account.toString());
	});
});
router.delete("/account/:id").handler(rc -> { // (8)
	repository.remove(rc.request().getParam("id"), res -> {
		LOGGER.info("Removed: {}", rc.request().getParam("id"));
		rc.response().setStatusCode(200);
	});
});
...
vertx.createHttpServer().requestHandler(router::accept).listen(conf.result().getInteger("port")); // (9)

All API methods uses repository object to communicate with datasource. In this case I decided to use Mongo. Vert.x has a module for interacting with that database, we need to include as new dependency.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-mongo-client</artifactId>
	<version>${vertx.version}</version>
</dependency>

Mongo client, same as all other Vert.x modules, works asynchronously. That’s why we need to use AsyncResult Handler to pass result from repository object. To be able to pass custom object as AsyncResult we have to annotate it with @DataObject and add toJson method.

public AccountRepositoryImpl(final MongoClient client) {
	this.client = client;
}

@Override
public AccountRepository save(Account account, Handler<AsyncResult<Account>> resultHandler) {
	JsonObject json = JsonObject.mapFrom(account);
	client.save(Account.DB_TABLE, json, res -> {
		if (res.succeeded()) {
			LOGGER.info("Account created: {}", res.result());
			account.setId(res.result());
			resultHandler.handle(Future.succeededFuture(account));
		} else {
			LOGGER.error("Account not created", res.cause());
			resultHandler.handle(Future.failedFuture(res.cause()));
		}
	});
	return this;
}

@Override
public AccountRepository findAll(Handler<AsyncResult<List<Account>>> resultHandler) {
	client.find(Account.DB_TABLE, new JsonObject(), res -> {
		if (res.succeeded()) {
			List<Account> accounts = res.result().stream().map(it -> new Account(it.getString("_id"), it.getString("number"), it.getInteger("balance"), it.getString("customerId"))).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		} else {
			LOGGER.error("Account not found", res.cause());
			resultHandler.handle(Future.failedFuture(res.cause()));
		}
	});
	return this;
}

Here’s Account model class.

@DataObject
public class Account {

	public static final String DB_TABLE = "account";

	private String id;
	private String number;
	private int balance;
	private String customerId;

	public Account() {

	}

	public Account(String id, String number, int balance, String customerId) {
		this.id = id;
		this.number = number;
		this.balance = balance;
		this.customerId = customerId;
	}

	public Account(JsonObject json) {
		this.id = json.getString("id");
		this.number = json.getString("number");
		this.balance = json.getInteger("balance");
		this.customerId = json.getString("customerId");
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getNumber() {
		return number;
	}

	public void setNumber(String number) {
		this.number = number;
	}

	public int getBalance() {
		return balance;
	}

	public void setBalance(int balance) {
		this.balance = balance;
	}

	public String getCustomerId() {
		return customerId;
	}

	public void setCustomerId(String customerId) {
		this.customerId = customerId;
	}

	public JsonObject toJson() {
		return JsonObject.mapFrom(this);
	}

	@Override
	public String toString() {
		return Json.encodePrettily(this);
	}

}

Verticles

It is worth to mention a few words about running an application written in Vert.x. It is based on verticles. Verticles are chunks of code that get deployed and run by Vert.x. A Vert.x instance maintains N event loop threads by default. When creating a verticle we have to extend abstract class AbstractVerticle.

public class AccountServer extends AbstractVerticle {

	@Override
	public void start() throws Exception {
		...
	}
}

I created two verticles per microservice. First for HTTP server and second for communication with Mongo. Here’s main application method where I’m deploying verticles.

public static void main(String[] args) throws Exception {
	Vertx vertx = Vertx.vertx();
	vertx.deployVerticle(new MongoVerticle());
	vertx.deployVerticle(new AccountServer());
}

Well, now we should obtain the reference inside AccountServer verticle to the service running on MongoVerticle. To achieve it we have to generate proxy classes using vertx-codegen module.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-proxy</artifactId>
	<version>${vertx.version}</version>
</dependency>
<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-codegen</artifactId>
	<version>${vertx.version}</version>
	<scope>provided</scope>
</dependency>

First, annotate repository interface with @ProxyGen ad all public methods with @Fluent.

@ProxyGen
public interface AccountRepository {

	@Fluent
	AccountRepository save(Account account, Handler<AsyncResult<Account>> resultHandler);

	@Fluent
	AccountRepository findAll(Handler<AsyncResult<List<Account>>> resultHandler);

	@Fluent
	AccountRepository findById(String id, Handler<AsyncResult<Account>> resultHandler);

	@Fluent
	AccountRepository findByCustomer(String customerId, Handler<AsyncResult<List<Account>>> resultHandler);

	@Fluent
	AccountRepository remove(String id, Handler<AsyncResult<Void>> resultHandler);

	static AccountRepository createProxy(Vertx vertx, String address) {
		return new AccountRepositoryVertxEBProxy(vertx, address);
	}

	static AccountRepository create(MongoClient client) {
		return new AccountRepositoryImpl(client);
	}

}

Generator needs additional configuration inside pom.xml file. After running command mvn clean install on the parent project all generated classes should be available under src/main/generated directory for every microservice module.

<plugin>
	<groupId>org.apache.maven.plugins</groupId>
	<artifactId>maven-compiler-plugin</artifactId>
	<version>3.6.2</version>
	<configuration>
		<encoding>${project.build.sourceEncoding}</encoding>
		<source>${java.version}</source>
		<target>${java.version}</target>
		<useIncrementalCompilation>false</useIncrementalCompilation>
		<annotationProcessors>		<annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
		</annotationProcessors>
		<generatedSourcesDirectory>${project.basedir}/src/main/generated</generatedSourcesDirectory>
		<compilerArgs>
			<arg>-AoutputDirectory=${project.basedir}/src/main</arg>
		</compilerArgs>
	</configuration>
</plugin>

Now we are able to obtain AccountRepository reference by calling createProxy with account-service name.

AccountRepository repository = AccountRepository.createProxy(vertx, "account-service");

Service Discovery

To use the Vert.x service discovery, we have to add the following dependencies into pom.xml. In the first of them there are mechanisms for built-in Vert.x discovery, which is rather not usable if we would like to invoke microservices running on different hosts. Fortunately, there are also available some additional bridges, for example Consul bridge.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-discovery</artifactId>
	<version>${vertx.version}</version>
</dependency>
<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-service-discovery-bridge-consul</artifactId>
	<version>${vertx.version}</version>
</dependency>

Great, we only have to declare service discovery and register service importer. Now, we can retrieve configuration from Consul, but I assume we also would like to register our service. Unfortunately, problems start here… Like the toolkit authors say It (Vert.x) does not export to Consul and does not support service modification. Maybe somebody will explain me why this library can not also export data to Consul – I just do not understand it. I had the same problem with Apache Camel some months ago and I will use the same solution I developed that time. Fortunately, Consul has simple API for service registration and deregistration. To use it in our appplication we need to include Vert.x HTTP client to our dependencies.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-web-client</artifactId>
	<version>${vertx.version}</version>
</dependency>

Then using declared WebClient while starting application we can register service by invoking Consul PUT method.

WebClient client = WebClient.create(vertx);
...
JsonObject json = new JsonObject().put("ID", "account-service-1").put("Name", "account-service").put("Address", "127.0.0.1").put("Port", 2222).put("Tags", new 		JsonArray().add("http-endpoint"));
client.put(discoveryConfig.getInteger("port"), discoveryConfig.getString("host"), "/v1/agent/service/register").sendJsonObject(json, res -> {
	LOGGER.info("Consul registration status: {}", res.result().statusCode());
});

Once the account-service have registered itself on discovery server we can invoke it from another microservice – in this case from customer-service. We only have to create ServiceDiscovery object and register Consul service importer.

ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
...
discovery.registerServiceImporter(new ConsulServiceImporter(), new JsonObject().put("host", discoveryConfig.getString("host")).put("port", discoveryConfig.getInteger("port")).put("scan-period", 2000));

Here’s AccountClient fragment, which is responsile for invoking GET /account/customer/{customerId} from account-service. It obtains service reference from discovery object and cast it to WebClient instance. I don’t know if you have noticed that apart from the standard fields such as ID, Name or Port, I also set the Tags field to the value of the type of service that we register. In this case it will be an http-endpoint. Whenever Vert.x reads data from Consul, it will be able to automatically assign a service reference to WebClient object.

public AccountClient findCustomerAccounts(String customerId, Handler<AsyncResult<List<Account>>> resultHandler) {
	discovery.getRecord(r -> r.getName().equals("account-service"), res -> {
		LOGGER.info("Result: {}", res.result().getType());
		ServiceReference ref = discovery.getReference(res.result());
		WebClient client = ref.getAs(WebClient.class);
		client.get("/account/customer/" + customerId).send(res2 -> {
			LOGGER.info("Response: {}", res2.result().bodyAsString());
			List<Account> accounts = res2.result().bodyAsJsonArray().stream().map(it -> Json.decodeValue(it.toString(), Account.class)).collect(Collectors.toList());
			resultHandler.handle(Future.succeededFuture(accounts));
		});
	});
	return this;
}

Config

For configuration management within the application Vert.x Config module is responsible.

<dependency>
	<groupId>io.vertx</groupId>
	<artifactId>vertx-config</artifactId>
	<version>${vertx.version}</version>
</dependency>

There are many configuration stores, which can be used as configuration data location:

  • File
  • Environment Variables
  • HTTP
  • Event Bus
  • Git
  • Redis
  • Consul
  • Kubernetes
  • Spring Cloud Config Server

I selected the simplest one – file. But it can be easily changed only by defining another type on ConfigStoreOptions object. For loading configuration data from the store ConfigRetriever is responsible. It reads configuration as JsonObject.

ConfigStoreOptions file = new ConfigStoreOptions().setType("file").setConfig(new JsonObject().put("path", "application.json"));
ConfigRetriever retriever = ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(file));
retriever.getConfig(conf -> {
	JsonObject discoveryConfig = conf.result().getJsonObject("discovery");
	vertx.createHttpServer().requestHandler(router::accept).listen(conf.result().getInteger("port"));
	JsonObject json = new JsonObject().put("ID", "account-service-1").put("Name", "account-service").put("Address", "127.0.0.1").put("Port", 2222).put("Tags", new JsonArray().add("http-endpoint"));
	client.put(discoveryConfig.getInteger("port"), discoveryConfig.getString("host"), "/v1/agent/service/register").sendJsonObject(json, res -> {
		LOGGER.info("Consul registration status: {}", res.result().statusCode());
	});
});

Configuration file application.json is available under src/main/resources and it contains application port, service discovery and datasource adresses.

{
	"port" : 2222,
	"discovery" : {
		"host" : "192.168.99.100",
		"port" : 8500
	},
	"datasource" : {
		"host" : "192.168.99.100",
		"port" : 27017,
		"db_name" : "test"
	}
}

Final thoughts

Vert.x authors wouldn’t like to define their solution as a framework, but as a tool-kit. They don’t tell you what is a correct way to write an application, but only give you a lot of useful bricks helping to create your app. With Vert.x you can create fast and lightweight APIs basing on non-blocking, asynchronous I/O. It gives you a lot of possibilities, as you can see on the Config module example, where you can even use Spring Cloud Config Server as a configuration store. But it is also not free from drawbacks, as I showed on the service registration with Consul example. Vert.x also allows to create reactive microservices with RxJava, what seems to be interesting option, I hope to describe in the future.