Event driven microservices using Spring Cloud Stream and RabbitMQ

Before we start let’s look at site Spring Cloud Quick Start. There is a list of spring-cloud releases available grouped as release trains. We use the newest release Camden.SR5 with 1.4.4.RELEASE Spring Boot and Brooklyn.SR2 Spring Cloud Stream version.

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>1.4.4.RELEASE</version>
</parent>
<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-dependencies</artifactId>
			<version>Camden.SR5</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>

Here’s our architecture visualization. Order service sends message to RabbitMQ topic exchange. Product and shipment services listen on that topic for incoming order messages and then process them. After processing they send reply message to the topic on which payment service listens to. Payment service stores incoming messages aggregating reply from product and shipment services, then count prices and sends final response.

sample1

Each service has the following dependencies. We have sample-common module where object for messages sent to topics are stored. They’re shared between all services. We’re also using Spring Cloud Sleuth for distributed tracing with one request id between all microservices.

	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-sleuth</artifactId>
		</dependency>
		<dependency>
			<groupId>pl.piomin.services</groupId>
			<artifactId>sample-common</artifactId>
			<version>${project.version}</version>
		</dependency>
	</dependencies>

Let me start with a few words on the theoretical aspects of Spring Cloud stream. Here’s short reference of that framework Spring Cloud Stream Reference Guide. It’s based on Spring Integration. It provides three predefined interfaces out of the box:

  • Source  – can be used for an application which has a single outbound channel
  • Sink – can be used for an application which has a single inbound channel
  • Processor – can be used for an application which has both an inbound channel and an outbound channel

I’m going to show you sample usage of all of these interfaces. In order service we’re using Source class. Using @InboundChannelAdapter and @Poller annotations we’are sending order message to output once per 10 seconds.

@SpringBootApplication
@EnableBinding(Source.class)
public class Application {

	protected Logger logger = Logger.getLogger(Application.class.getName());

	private int index = 0;

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
	public MessageSource<Order> orderSource() {
		return () -> {
			Order o = new Order(index++, OrderType.PURCHASE, LocalDateTime.now(), OrderStatus.NEW, new Product(), new Shipment());
			logger.info("Sending order: " + o);
			return new GenericMessage<>(o);
		};
	}

	@Bean
	public AlwaysSampler defaultSampler() {
	  return new AlwaysSampler();
	}

}

Here’s output configuration in application.yml file.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: ex.stream.in
          binder: rabbit1
        output:
          destination: ex.stream.out
          binder: rabbit1
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.99.100
                port: 30000
                username: guest
                password: guest

Product and shipment services use Processor interface. They listen on stream input and after processing send message to their outputs.

@SpringBootApplication
@EnableBinding(Processor.class)
public class Application {

	@Autowired
	private ProductService productService;

	protected Logger logger = Logger.getLogger(Application.class.getName());

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Processor.INPUT)
	@SendTo(Processor.OUTPUT)
	public Order processOrder(Order order) {
		logger.info("Processing order: " + order);
		return productService.processOrder(order);
	}

	@Bean
	public AlwaysSampler defaultSampler() {
		return new AlwaysSampler();
	}

}

Here’s service configuration. It listens on order service output exchange and also defines its group named product. That group name will be used for automatic queue creation and exchange binding on RabbitMQ. There is also output exchange defined.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: ex.stream.out
          group: product
          binder: rabbit1
        output:
          destination: ex.stream.out2
          binder: rabbit1
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.99.100
                port: 30000
                username: guest
                password: guest

We use docker container for running RabbitMQ instance.

docker run -d --name rabbit1 -p 30001:15672 -p 30000:5672 rabbitmq:management

Let’s look at the management console. It’s available on http://192.168.99.100:30001. Here’s ex.stream.out topic exchange configuration. Below we see the list of declared queues.

rabbit1

rabbit2

Here’s main application class from payment service. We use Sink interface for listening on incoming messages. Input order is processed and we print final price of order sent by order service. Sample application source code is available on GitHub.

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	@Autowired
	private PaymentService paymentService;

	protected Logger logger = Logger.getLogger(Application.class.getName());

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void processOrder(Order order) {
		logger.info("Processing order: " + order);
		Order o = paymentService.processOrder(order);
		if (o != null)
			logger.info("Final response: " + (o.getProduct().getPrice() + o.getShipment().getPrice()));
	}

	@Bean
	public AlwaysSampler defaultSampler() {
		return new AlwaysSampler();
	}

}

By using @Bean AlwaysSampler in every main class of our microservices we propagate one trace and span id between all calls of single order. Here’s fragment from our microservices logging console. And also I get the following warning message which is not understable for me: ‘Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class’. Version 1.1.2.RELEASE of Spring Cloud Sleuth is not applicable Camden.SR5 release?

logs

Advertisements

Author: Piotr Mińkowski

IT Architect, Java Software Developer

4 thoughts on “Event driven microservices using Spring Cloud Stream and RabbitMQ”

  1. Hi / Siemano Piotr 😉 !

    Very good catch with the warning

    Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class’.

    It should state please upgrade Sleuth to 1.2 . When I first wrote this we weren’t planning on releasing a 1.2 but then the plans have changed and I have forgotten to bump the warning message. I’ll file an issue and update the warning message.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s