I have already described how to build microservices architecture entirely based on message-driven communication through Apache Kafka in one of my previous articles Kafka In Microservices With Micronaut. As you can see in the article title the sample applications and integration with Kafka has been built on top of Micronaut Framework. I described some interesting features of Micronaut, that can be used for building message-driven microservices, but I specially didn’t write anything about testing. In this article I’m going to show you how to test your Kafka microservice using Micronaut Test core features (Component Tests), Testcontainers (Integration Tests) and Pact (Contract Tests).
Generally automated testing is one of the biggest challenge related to microservices architecture. Therefore the most popular microservice frameworks like Micronaut or Spring Boot provides some useful features for that. There are also some dedicated tools which helps you to use Docker containers in your tests or provides mechanisms for verifying the contracts between different applications. For the purpose of current article demo applications I’m using the same repository as for the previous article: https://github.com/piomin/sample-kafka-micronaut-microservices.git.
Sample Architecture
The architecture of sample applications has been described in the previous article but let me perform a quick recap. We have 4 microservices: order-service, trip-service, driver-service and passenger-service. The implementation of these applications is very simple. All of them have in-memory storage and connect to the same Kafka instance.
A primary goal of our system is to arrange a trip for customers. The order-service application also acts as a gateway. It is receiving requests from customers, saving history and sending events to orders
topic. All the other microservices are listening on this topic and processing orders sent by order-service. Each microservice has its own dedicated topic, where it sends events with information about changes. Such events are received by some of other microservices. The architecture is presented on the picture below.
Embedded Kafka – Component Tests
After a short description of the architecture we may proceed to the key point of this article – testing. Micronaut allows you to start embedded Kafka instance during your tests. To do that you should first include the following dependencies to your Maven pom.xml:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> <classifier>test</classifier> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.3.0</version> <classifier>test</classifier> </dependency>
To enable embedded Kafka for a test class we have to set property kafka.embedded.enabled
to true
. Because I have run Kafka on Docker container, which is by default available on address 192.168.99.100
I also need to change dynamically the value of property kafka.bootstrap.servers
to localhost:9092
for a given test. The test implementation class using embedded Kafka for testing three basic scenarios for order-service: sending order with new trip, and receiving orders for trip cancellation and completement from other microservices. Here’s the full code of my OrderKafkaEmbeddedTest
@MicronautTest @Property(name = "kafka.embedded.enabled", value = "true") @Property(name = "kafka.bootstrap.servers", value = "localhost:9092") @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class OrderKafkaEmbeddedTest { private static final Logger LOGGER = LoggerFactory.getLogger(OrderKafkaEmbeddedTest.class); @Inject OrderClient client; @Inject OrderInMemoryRepository repository; @Inject OrderHolder orderHolder; @Inject KafkaEmbedded kafkaEmbedded; @BeforeAll public void init() { LOGGER.info("Topics: {}", kafkaEmbedded.getKafkaServer().get().zkClient().getAllTopicsInCluster()); } @Test @org.junit.jupiter.api.Order(1) public void testAddNewTripOrder() throws InterruptedException { Order order = new Order(OrderType.NEW_TRIP, 1L, 50, 30); order = repository.add(order); client.send(order); Order orderSent = waitForOrder(); Assertions.assertNotNull(orderSent); Assertions.assertEquals(order.getId(), orderSent.getId()); } @Test @org.junit.jupiter.api.Order(2) public void testCancelTripOrder() throws InterruptedException { Order order = new Order(OrderType.CANCEL_TRIP, 1L, 50, 30); client.send(order); Order orderReceived = waitForOrder(); Optional<Order> oo = repository.findById(1L); Assertions.assertTrue(oo.isPresent()); Assertions.assertEquals(OrderStatus.REJECTED, oo.get().getStatus()); } @Test @org.junit.jupiter.api.Order(3) public void testPaymentTripOrder() throws InterruptedException { Order order = new Order(OrderType.PAYMENT_PROCESSED, 1L, 50, 30); order.setTripId(1L); order = repository.add(order); client.send(order); Order orderSent = waitForOrder(); Optional<Order> oo = repository.findById(order.getId()); Assertions.assertTrue(oo.isPresent()); Assertions.assertEquals(OrderStatus.COMPLETED, oo.get().getStatus()); } private Order waitForOrder() throws InterruptedException { Order orderSent = null; for (int i = 0; i < 10; i++) { orderSent = orderHolder.getCurrentOrder(); if (orderSent != null) break; Thread.sleep(1000); } orderHolder.setCurrentOrder(null); return orderSent; } }
At that stage some things requires clarification – especially the mechanism of verifying sending and receiving messages. I’ll describe it on the example of driver-service. When a message is incoming to the order
topic it is received by OrderListener
, which is annotated with @KafkaListener
as shown below. It gets the order type and forward the NEW_TRIP
request to DriverService
bean.
@KafkaListener(groupId = "driver") public class OrderListener { private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class); private DriverService service; public OrderListener(DriverService service) { this.service = service; } @Topic("orders") public void receive(@Body Order order) { LOGGER.info("Received: {}", order); switch (order.getType()) { case NEW_TRIP -> service.processNewTripOrder(order); } } }
The DriverService
is processing order. It is trying to find the driver located closest to the customer, changing found driver’s status to unavailable and sent event with change with the current driver state.
@Singleton public class DriverService { private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class); private DriverClient client; private OrderClient orderClient; private DriverInMemoryRepository repository; public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) { this.client = client; this.orderClient = orderClient; this.repository = repository; } public void processNewTripOrder(Order order) { LOGGER.info("Processing: {}", order); Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY()); if (driver.isPresent()) { Driver driverLocal = driver.get(); driverLocal.setStatus(DriverStatus.UNAVAILABLE); repository.updateDriver(driverLocal); client.send(driverLocal, String.valueOf(order.getId())); LOGGER.info("Message sent: {}", driverLocal); } } // OTHER METHODS ... }
To verify that a final message with change notification has been sent to the drivers
topic we have to create our own listener for the test purposes. It receives the message and writes it in @Singleton
holder class which is then accessed by a single-thread test class. The described process is visualized on the picture below.
Here’s the implementation of test listener which is responsible just for receiving the message sent to drivers
topic and writing it to DriverHolder
bean.
@KafkaListener(groupId = "driverTest") public class DriverConfirmListener { private static final Logger LOGGER = LoggerFactory.getLogger(DriverConfirmListener.class); @Inject DriverHolder driverHolder; @Topic("orders") public void receive(@Body Driver driver) { LOGGER.info("Confirmed: {}", driver); driverHolder.setCurrentDriver(driver); } }
Here’s the implementation of DriverHolder
class.
@Singleton public class DriverHolder { private Driver currentDriver; public Driver getCurrentDriver() { return currentDriver; } public void setCurrentDriver(Driver currentDriver) { this.currentDriver = currentDriver; } }
No matter if you are using embedded Kafka, Testcontainers or just manually started Docker container you can use verification mechanism described above.
Kafka with Testcontainers
We will use Testcontainers framework for running Docker containers of Zookeeper and Kafka during JUnit tests. Testcontainers is a Java library that provides lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container. To use it in your project together with JUnit 5, which is already used for our sample Micronaut application, you have to add the following dependencies to your Maven pom.xml
:
<dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.12.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> <version>1.12.2</version> <scope>test</scope> </dependency>
The declared library org.testcontainers:kafka:1.12.2
provides KafkaContainer
class that allows to define and start Kafka container with embedded Zookeeper in your tests. However, I decided to use GenericContainer
class and run two containers wurstmeister/zookeeper
and wurstmeister/kafka
. Because Kafka needs to communicate with Zookeeper both containers should be run in the same network. We will also have to override Zookeeper container’s name and host name to allow Kafka call it by the hostname.
When running Kafka container we need to set some important environment variables. Variable KAFKA_ADVERTISED_HOST_NAME
sets the hostname under which Kafka is visible for external client and KAFKA_ZOOKEEPER_CONNECT
Zookeeper lookup address. Although it is not recommended we should disable dynamic expose port generation by setting static port number equal to the container binding port 9092
. That helps us to avoid some problems with setting Kafka advertised port and injecting it into Micronaut configuration.
@MicronautTest @Testcontainers @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class OrderKafkaContainerTest { private static final Logger LOGGER = LoggerFactory.getLogger(OrderKafkaContainerTest.class); static Network network = Network.newNetwork(); @Container public static final GenericContainer ZOOKEEPER = new GenericContainer("wurstmeister/zookeeper") .withCreateContainerCmdModifier(it -> ((CreateContainerCmd) it).withName("zookeeper").withHostName("zookeeper")) .withExposedPorts(2181) .withNetworkAliases("zookeeper") .withNetwork(network); @Container public static final GenericContainer KAFKA_CONTAINER = new GenericContainer("wurstmeister/kafka") .withCreateContainerCmdModifier(it -> ((CreateContainerCmd) it).withName("kafka").withHostName("kafka") .withPortBindings(new PortBinding(Ports.Binding.bindPort(9092), new ExposedPort(9092)))) .withExposedPorts(9092) .withNetworkAliases("kafka") .withEnv("KAFKA_ADVERTISED_HOST_NAME", "192.168.99.100") .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181") .withNetwork(network); // TESTS ... }
The test scenarios may be the same as for embedded Kafka or we may attempt to define some more advanced integration tests. To do that we first create Docker image of every microservice during the build. We can use io.fabric8:docker-maven-plugin
for that. Here’s the example for driver-service.
<plugin> <groupId>io.fabric8</groupId> <artifactId>docker-maven-plugin</artifactId> <version>0.31.0</version> <configuration> <images> <image> <name>piomin/driver-service:${project.version}</name> <build> <dockerFile>${project.basedir}/Dockerfile</dockerFile> <tags> <tag>latest</tag> <tag>${project.version}</tag> </tags> </build> </image> </images> </configuration> <executions> <execution> <id>start</id> <phase>pre-integration-test</phase> <goals> <goal>build</goal> <goal>start</goal> </goals> </execution> <execution> <id>stop</id> <phase>post-integration-test</phase> <goals> <goal>stop</goal> </goals> </execution> </executions> </plugin>
If we have Docker image of every microservice we can easily run it using Testcontainers during our integration tests. In the fragment of test class visible below I’m running the container with driver-service in addition to Kafka and Zookeeper containers. The test is implemented inside order-service. We are building the same scenario as in test with embedded Kafka – sending the NEW_TRIP
order. But this time we are verifying if the message has been received and processed by the driver-service. This verification is performed by listening for notification event sent by driver-service started on Docker container to the drivers
topic. Normally, order-service does not listen for messages incoming to drivers
topic, but we were created such integration just for the integration test purpose.
@Container public static final GenericContainer DRIVER_CONTAINER = new GenericContainer("piomin/driver-service") .withNetwork(network); @Inject OrderClient client; @Inject OrderInMemoryRepository repository; @Inject DriverHolder driverHolder; @Test @org.junit.jupiter.api.Order(1) public void testNewTrip() throws InterruptedException { Order order = new Order(OrderType.NEW_TRIP, 1L, 50, 30); order = repository.add(order); client.send(order); Driver driverReceived = null; for (int i = 0; i < 10; i++) { driverReceived = driverHolder.getCurrentDriver(); if (driverReceived != null) break; Thread.sleep(1000); } driverHolder.setCurrentDriver(null); Assertions.assertNotNull(driverReceived); }
Summary
In this article I have described an approach to component tests with embedded Kafka and integration tests with Docker and Testcontainers. This is the first part of article, in the second I’m going to show you how to build contract tests for Micronaut application with Pact.