Part 1: Testing Kafka Microservices With Micronaut

I have already described how to build microservices architecture entirely based on message-driven communication through Apache Kafka in one of my previous articles Kafka In Microservices With Micronaut. As you can see in the article title the sample applications and integration with Kafka has been built on top of Micronaut Framework. I described some interesting features of Micronaut, that can be used for building message-driven microservices, but I specially didn’t write anything about testing. In this article I’m going to show you how to test your Kafka microservice using Micronaut Test core features (Component Tests), Testcontainers (Integration Tests) and Pact (Contract Tests).


Generally automated testing is one of the biggest challenge related to microservices architecture. Therefore the most popular microservice frameworks like Micronaut or Spring Boot provides some useful features for that. There are also some dedicated tools which helps you to use Docker containers in your tests or provides mechanisms for verifying the contracts between different applications. For the purpose of current article demo applications I’m using the same repository as for the previous article: https://github.com/piomin/sample-kafka-micronaut-microservices.git.

Sample Architecture

The architecture of sample applications has been described in the previous article but let me perform a quick recap. We have 4 microservices: order-service, trip-service, driver-service and passenger-service. The implementation of these applications is very simple. All of them have in-memory storage and connect to the same Kafka instance.
A primary goal of our system is to arrange a trip for customers. The order-service application also acts as a gateway. It is receiving requests from customers, saving history and sending events to orders topic. All the other microservices are listening on this topic and processing orders sent by order-service. Each microservice has its own dedicated topic, where it sends events with information about changes. Such events are received by some of other microservices. The architecture is presented on the picture below.

micronaut-kafka-1

Embedded Kafka – Component Tests

After a short description of the architecture we may proceed to the key point of this article – testing. Micronaut allows you to start embedded Kafka instance during your tests. To do that you should first include the following dependencies to your Maven pom.xml:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
	<classifier>test</classifier>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.12</artifactId>
	<version>2.3.0</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.12</artifactId>
	<version>2.3.0</version>
	<classifier>test</classifier>
</dependency>

To enable embedded Kafka for a test class we have to set property kafka.embedded.enabled to true. Because I have run Kafka on Docker container, which is by default available on address 192.168.99.100 I also need to change dynamically the value of property kafka.bootstrap.servers to localhost:9092 for a given test. The test implementation class using embedded Kafka for testing three basic scenarios for order-service: sending order with new trip, and receiving orders for trip cancellation and completement from other microservices. Here’s the full code of my OrderKafkaEmbeddedTest

@MicronautTest
@Property(name = "kafka.embedded.enabled", value = "true")
@Property(name = "kafka.bootstrap.servers", value = "localhost:9092")
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class OrderKafkaEmbeddedTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderKafkaEmbeddedTest.class);

    @Inject
    OrderClient client;
    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderHolder orderHolder;
    @Inject
    KafkaEmbedded kafkaEmbedded;

    @BeforeAll
    public void init() {
        LOGGER.info("Topics: {}", kafkaEmbedded.getKafkaServer().get().zkClient().getAllTopicsInCluster());
    }

    @Test
    @org.junit.jupiter.api.Order(1)
    public void testAddNewTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.NEW_TRIP, 1L, 50, 30);
        order = repository.add(order);
        client.send(order);
        Order orderSent = waitForOrder();
        Assertions.assertNotNull(orderSent);
        Assertions.assertEquals(order.getId(), orderSent.getId());
    }

    @Test
    @org.junit.jupiter.api.Order(2)
    public void testCancelTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.CANCEL_TRIP, 1L, 50, 30);
        client.send(order);
        Order orderReceived = waitForOrder();
        Optional<Order> oo = repository.findById(1L);
        Assertions.assertTrue(oo.isPresent());
        Assertions.assertEquals(OrderStatus.REJECTED, oo.get().getStatus());
    }

    @Test
    @org.junit.jupiter.api.Order(3)
    public void testPaymentTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.PAYMENT_PROCESSED, 1L, 50, 30);
        order.setTripId(1L);
        order = repository.add(order);
        client.send(order);
        Order orderSent = waitForOrder();
        Optional<Order> oo = repository.findById(order.getId());
        Assertions.assertTrue(oo.isPresent());
        Assertions.assertEquals(OrderStatus.COMPLETED, oo.get().getStatus());
    }

    private Order waitForOrder() throws InterruptedException {
        Order orderSent = null;
        for (int i = 0; i < 10; i++) {
            orderSent = orderHolder.getCurrentOrder();
            if (orderSent != null)
                break;
            Thread.sleep(1000);
        }
        orderHolder.setCurrentOrder(null);
        return orderSent;
    }

}

At that stage some things requires clarification – especially the mechanism of verifying sending and receiving messages. I’ll describe it on the example of driver-service. When a message is incoming to the order topic it is received by OrderListener, which is annotated with @KafkaListener as shown below. It gets the order type and forward the NEW_TRIP request to DriverService bean.

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }
}

The DriverService is processing order. It is trying to find the driver located closest to the customer, changing found driver’s status to unavailable and sent event with change with the current driver state.

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        if (driver.isPresent()) {
            Driver driverLocal = driver.get();
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        }
    }
	
	// OTHER METHODS ...
}

To verify that a final message with change notification has been sent to the drivers topic we have to create our own listener for the test purposes. It receives the message and writes it in @Singleton holder class which is then accessed by a single-thread test class. The described process is visualized on the picture below.
kafka-micronaut-testing-1.png
Here’s the implementation of test listener which is responsible just for receiving the message sent to drivers topic and writing it to DriverHolder bean.

@KafkaListener(groupId = "driverTest")
public class DriverConfirmListener {

	private static final Logger LOGGER = LoggerFactory.getLogger(DriverConfirmListener.class);

	@Inject
	DriverHolder driverHolder;

	@Topic("orders")
	public void receive(@Body Driver driver) {
		LOGGER.info("Confirmed: {}", driver);
		driverHolder.setCurrentDriver(driver);
	}

}

Here’s the implementation of DriverHolder class.

@Singleton
public class DriverHolder {

	private Driver currentDriver;

	public Driver getCurrentDriver() {
		return currentDriver;
	}

	public void setCurrentDriver(Driver currentDriver) {
		this.currentDriver = currentDriver;
	}

}

No matter if you are using embedded Kafka, Testcontainers or just manually started Docker container you can use verification mechanism described above.

Kafka with Testcontainers

We will use Testcontainers framework for running Docker containers of Zookeeper and Kafka during JUnit tests. Testcontainers is a Java library that provides lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container. To use it in your project together with JUnit 5, which is already used for our sample Micronaut application, you have to add the following dependencies to your Maven pom.xml:

<dependency>
	<groupId>org.testcontainers</groupId>
	<artifactId>kafka</artifactId>
	<version>1.12.2</version>
	<scope>test</scope>
</dependency>
<dependency>
	<groupId>org.testcontainers</groupId>
	<artifactId>junit-jupiter</artifactId>
	<version>1.12.2</version>
	<scope>test</scope>
</dependency>

The declared library org.testcontainers:kafka:1.12.2 provides KafkaContainer class that allows to define and start Kafka container with embedded Zookeeper in your tests. However, I decided to use GenericContainer class and run two containers wurstmeister/zookeeper and wurstmeister/kafka. Because Kafka needs to communicate with Zookeeper both containers should be run in the same network. We will also have to override Zookeeper container’s name and host name to allow Kafka call it by the hostname.
When running Kafka container we need to set some important environment variables. Variable KAFKA_ADVERTISED_HOST_NAME sets the hostname under which Kafka is visible for external client and KAFKA_ZOOKEEPER_CONNECT Zookeeper lookup address. Although it is not recommended we should disable dynamic expose port generation by setting static port number equal to the container binding port 9092. That helps us to avoid some problems with setting Kafka advertised port and injecting it into Micronaut configuration.

@MicronautTest
@Testcontainers
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class OrderKafkaContainerTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderKafkaContainerTest.class);

    static Network network = Network.newNetwork();

	@Container
	public static final GenericContainer ZOOKEEPER = new GenericContainer("wurstmeister/zookeeper")
		.withCreateContainerCmdModifier(it -> ((CreateContainerCmd) it).withName("zookeeper").withHostName("zookeeper"))
		.withExposedPorts(2181)
		.withNetworkAliases("zookeeper")
		.withNetwork(network);

	@Container
	public static final GenericContainer KAFKA_CONTAINER = new GenericContainer("wurstmeister/kafka")
		.withCreateContainerCmdModifier(it -> ((CreateContainerCmd) it).withName("kafka").withHostName("kafka")
			.withPortBindings(new PortBinding(Ports.Binding.bindPort(9092), new ExposedPort(9092))))
		.withExposedPorts(9092)
		.withNetworkAliases("kafka")
		.withEnv("KAFKA_ADVERTISED_HOST_NAME", "192.168.99.100")
		.withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181")
		.withNetwork(network);
		
	// TESTS ...
	
}

The test scenarios may be the same as for embedded Kafka or we may attempt to define some more advanced integration tests. To do that we first create Docker image of every microservice during the build. We can use io.fabric8:docker-maven-plugin for that. Here’s the example for driver-service.

<plugin>
	<groupId>io.fabric8</groupId>
	<artifactId>docker-maven-plugin</artifactId>
	<version>0.31.0</version>
	<configuration>
		<images>
			<image>
				<name>piomin/driver-service:${project.version}</name>
				<build>
					<dockerFile>${project.basedir}/Dockerfile</dockerFile>
					<tags>
						<tag>latest</tag>
						<tag>${project.version}</tag>
					</tags>
				</build>
			</image>
		</images>
	</configuration>
	<executions>
		<execution>
			<id>start</id>
			<phase>pre-integration-test</phase>
			<goals>
				<goal>build</goal>
				<goal>start</goal>
			</goals>
		</execution>
		<execution>
			<id>stop</id>
			<phase>post-integration-test</phase>
			<goals>
				<goal>stop</goal>
			</goals>
		</execution>
	</executions>
</plugin>

If we have Docker image of every microservice we can easily run it using Testcontainers during our integration tests. In the fragment of test class visible below I’m running the container with driver-service in addition to Kafka and Zookeeper containers. The test is implemented inside order-service. We are building the same scenario as in test with embedded Kafka – sending the NEW_TRIP order. But this time we are verifying if the message has been received and processed by the driver-service. This verification is performed by listening for notification event sent by driver-service started on Docker container to the drivers topic. Normally, order-service does not listen for messages incoming to drivers topic, but we were created such integration just for the integration test purpose.

@Container
public static final GenericContainer DRIVER_CONTAINER = new GenericContainer("piomin/driver-service")
	.withNetwork(network);

@Inject
OrderClient client;
@Inject
OrderInMemoryRepository repository;
@Inject
DriverHolder driverHolder;

@Test
@org.junit.jupiter.api.Order(1)
public void testNewTrip() throws InterruptedException {
	Order order = new Order(OrderType.NEW_TRIP, 1L, 50, 30);
	order = repository.add(order);
	client.send(order);
	Driver driverReceived = null;
	for (int i = 0; i < 10; i++) {
		driverReceived = driverHolder.getCurrentDriver();
		if (driverReceived != null)
			break;
		Thread.sleep(1000);
	}
	driverHolder.setCurrentDriver(null);
	Assertions.assertNotNull(driverReceived);
}

Summary

In this article I have described an approach to component tests with embedded Kafka and integration tests with Docker and Testcontainers. This is the first part of article, in the second I’m going to show you how to build contract tests for Micronaut application with Pact.

Author: Piotr Mińkowski

IT Architect, Java Software Developer

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.