RabbitMQ Cluster with Consul and Vault

Almost two years ago I wrote an article about RabbitMQ clustering RabbitMQ in cluster. It was one of the first post on my blog, and it’s really hard to believe it has been two years since I started this blog. Anyway, one of the question about the topic described in the mentioned article inspired me to return to that subject one more time. That question pointed to the problem of an approach to setting up the cluster. This approach assumes that we are manually attaching new nodes to the cluster by executing the command rabbitmqctl join_cluster with cluster name as a parameter. If I remember correctly it was the only one available method of creating cluster at that time. Today we have more choices, what illustrates an evolution of RabbitMQ during last two years. RabbitMQ cluster can be formed in a number of ways:

  • Manually with rabbitmqctl (as described in my article RabbitMQ in cluster)
  • Declaratively by listing cluster nodes in config file
  • Using DNS-based discovery
  • Using AWS (EC2) instance discovery via a dedicated plugin
  • Using Kubernetes discovery via a dedicated plugin
  • Using Consul discovery via a dedicated plugin
  • Using etcd-based discovery via a dedicated plugin

Today, I’m going to show you how to create RabbitMQ cluster using service discovery based on HashiCorp’s Consul. Additionally, we will include Vault to our architecture in order to use its interesting feature called secrets engine for managing credentials used for accessing RabbitMQ. We will setup this sample on the local machine using Docker images of RabbitMQ, Consul and Vault. Finally, we will test our solution using simple Spring Boot application that sends and listens for incoming messages to the cluster. That application is available on GitHub repository sample-haclustered-rabbitmq-service in the branch consul.

Architecture

We use Vault as a credentials manager when applications try to authenticate against RabbitMQ node or user tries to login to RabbitMQ web admin console. Each RabbitMQ node registers itself after startup in Consul and retrieves list of nodes running inside a cluster. Vault is integrated with RabbitMQ using dedicated secrets engine. Here’s an architecture of our sample solution.

rabbit-consul-logo (1)

1. Configure RabbitMQ Consul plugin

The integration between RabbitMQ and Consul is realized via plugin rabbitmq-peer-discovery-consul. This plugin is not enabled by default on the official RabbitMQ Docker container. So, the first step is to build our own Docker image based on official RabbitMQ image that installs and enables required plugin. By default, RabbitMQ main configuration file is available under path /etc/rabbitmq/rabbitmq.conf inside Docker container. To override it we just use the COPY statement as shown below. The following Dockerfile definition takes RabbitMQ with management web console as base image and enabling rabbitmq_peer_discovery_consul plugin.

FROM rabbitmq:3.7.8-management
COPY rabbitmq.conf /etc/rabbitmq
RUN rabbitmq-plugins enable --offline rabbitmq_peer_discovery_consul

Now, let’s take a closer look on our plugin configuration settings. Because I run Docker on Windows Consul is not available under default localhost address, but on 192.168.99.100. So, first we need to set that IP address using property cluster_formation.consul.host. We also need to set Consul as a default peer discovery implementation by setting the name of plugin for property cluster_formation.peer_discovery_backend. Finally, we have to set two additional properties to make it work in our local Docker environment. It is related with the address of RabbitMQ node sent to Consul during registration process. It is important to compute it properly, and not to send for example localhost. After setting property cluster_formation.consul.svc_addr_use_nodename to false node will register itself using host name instead of node name. We can set the name of host for container inside its running command. Here’s my full RabbitMQ configuration file used in demo for this article.

loopback_users.guest = false
listeners.tcp.default = 5672
hipe_compile = false
management.listener.port = 15672
management.listener.ssl = false
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_consul
cluster_formation.consul.host = 192.168.99.100
cluster_formation.consul.svc_addr_auto = true
cluster_formation.consul.svc_addr_use_nodename = false

After saving the configuration visible above in the file rabbitmq.conf we can proceed to building our custom Docker image with RabbitMQ. This image is available in my Docker repository under alias piomin/rabbitmq, but you can also build it by yourself from Dockerfile by executing the following command.

$ docker build -t piomin/rabbitmq:1.0 .
Sending build context to Docker daemon  3.072kB
Step 1 : FROM rabbitmq:3.7.8-management
 ---> d69a5113ceae
Step 2 : COPY rabbitmq.conf /etc/rabbitmq
 ---> aa306ef88085
Removing intermediate container fda0e21178f9
Step 3 : RUN rabbitmq-plugins enable --offline rabbitmq_peer_discovery_consul
 ---> Running in 0892a42bffef
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_peer_discovery_common
  rabbitmq_peer_discovery_consul
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@fda0e21178f9...
The following plugins have been enabled:
  rabbitmq_peer_discovery_common
  rabbitmq_peer_discovery_consul

set 5 plugins.
Offline change; changes will take effect at broker restart.
 ---> cfe73f9d9904
Removing intermediate container 0892a42bffef
Successfully built cfe73f9d9904

2. Running RabbitMQ cluster on Docker

In the previous step we have succesfully created Docker image of RabbitMQ configured to run in cluster mode using Consul discovery. Before running this image we need to start instance of Consul. Here’s the command that starts Docker container with Consul and exposing it on port 8500.

$ docker run -d --name consul -p 8500:8500 consul

We will also create Docker network to enable communication between containers by hostname. It is required in this scenario, because each RabbitMQ container is register itself using container hostname.

$ docker network create rabbitmq

Now, we can run our three clustered RabbitMQ containers. We will set unique hostname for every single container (using -h option) and set the same Docker network everywhere. We also have to set container environment variable RABBITMQ_ERLANG_COOKIE.

$ docker run -d --name rabbit1 -h rabbit1 --network rabbitmq -p 30000:5672 -p 30010:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0
$ docker run -d --name rabbit2 -h rabbit2 --network rabbitmq -p 30001:5672 -p 30011:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0
$ docker run -d --name rabbit3 -h rabbit3 --network rabbitmq -p 30002:5672 -p 30012:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0

After running all three instances of RabbitMQ we can first take a look on Consul web console. You should see there the new service called rabbitmq. This value is the default name of cluster set by RabbitMQ Consul plugin. We can override inside rabbitmq.conf using cluster_formation.consul.svc property.

rabbit-consul-1

We can check out if cluster has been succesfully started using RabbitMQ web management console. Every node is exposing it. I just had to override default port 15672 to avoid port conflicts between three running instances.

rabbit-consul-10

3. Integrating RabbitMQ with Vault

In the two previous steps we have succesfully run the cluster of three RabbitMQ nodes based on Consul discovery. Now, we will include Vault to our sample system to dynamically generate user credentials. Let’s begin from running Vault on Docker. You can find detailed information about it in my previous article Secure Spring Cloud Microservices with Vault and Nomad. We will run Vault in development mode using the following command.

$ docker run --cap-add=IPC_LOCK -d --name vault -p 8200:8200 vault

You can copy the root token from container logs using docker logs -f vault command. Then you have to login to Vault web console available under address http://192.168.99.100:8200 using this token and enable RabbitMQ secret engine as shown below.

rabbit-consul-2

And confirm.

rabbit-consul-3

You can easily run Vault commands using terminal provided by web admin console or do the same thing using HTTP API. The first command visible below is used for writing connection details. We just need to pass RabbitMQ address and admin user credentials. The provided configuration settings points to #1 RabbitMQ node, but the changes are then replicated to the whole cluster.

$ vault write rabbitmq/config/connection connection_uri="http://192.168.99.100:30010" username="guest" password="guest"

The next step is to configure a role that maps a name in Vault to virtual host permissions.

$ vault write rabbitmq/roles/default vhosts='{"/":{"write": ".*", "read": ".*"}}'

We can test our newly created configuration by running command vault read rabbitmq/creds/default as shown below.

rabbit-consul-4

4. Sample application

Our sample application is pretty simple. It consists of two modules. First of them sender is responsible for sending messages to RabbitMQ, while second listener for receiving incoming messages. Both of them are Spring Boot applications that integrates with RabbitMQ and Vault using the following dependencies.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-vault-config-rabbitmq</artifactId>
	<version>2.0.2.RELEASE</version>
</dependency>

We need to provide some configuration settings in bootstrap.yml file to integrate our application with Vault. First, we need to enable plugin for that integration by setting property spring.cloud.vault.rabbitmq.enabled to true. Of course, Vault address and root token are required. It is also important to set property spring.cloud.vault.rabbitmq.role with the name of Vault role configured in step 3. Spring Cloud Vault injects username and password generated by Vault to the application properties spring.rabbitmq.username and spring.rabbitmq.password, so the only thing we need to configure in bootstrap.yml file is the list of available cluster nodes.

spring:
  rabbitmq:
    addresses: 192.168.99.100:30000,192.168.99.100:30001,192.168.99.100:30002
  cloud:
    vault:
      uri: http://192.168.99.100:8200
      token: s.7DaENeiqLmsU5ZhEybBCRJhp
      rabbitmq:
        enabled: true
        role: default
        backend: rabbitmq

For the test purposes you should enable high-available queues on RabbitMQ. For instructions how to configure them using policies you can refer to my article RabbitMQ in cluster. The application works at the level of exchanges. Auto-configured connection factory is injected into the application and set for RabbitTemplate bean.

@SpringBootApplication
public class Sender {
	
	private static final Logger LOGGER = LoggerFactory.getLogger("Sender");
	
	@Autowired
	RabbitTemplate template;

	public static void main(String[] args) {
		SpringApplication.run(Sender.class, args);
	}

	@PostConstruct
	public void send() {
		for (int i = 0; i < 1000; i++) {
			int id = new Random().nextInt(100000);
			template.convertAndSend(new Order(id, "TEST"+id, OrderType.values()[(id%2)]));
		}
		LOGGER.info("Sending completed.");
	}
    
    @Bean
    public RabbitTemplate template(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("ex.example");
        return rabbitTemplate;
    }
    
}

Our listener app is connected only to the third node of the cluster (spring.rabbitmq.addresses=192.168.99.100:30002). However, the test queue is mirrored between all clustered nodes, so it is able to receive messages sent by sender app. You can easily test using my sample applications.

@SpringBootApplication
@EnableRabbit
public class Listener {

	private static final  Logger LOGGER = LoggerFactory.getLogger("Listener");

	private Long timestamp;

	public static void main(String[] args) {
		SpringApplication.run(Listener.class, args);
	}

	@RabbitListener(queues = "q.example")
	public void onMessage(Order order) {
		if (timestamp == null)
			timestamp = System.currentTimeMillis();
		LOGGER.info((System.currentTimeMillis() - timestamp) + " : " + order.toString());
	}

	@Bean
	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setConcurrentConsumers(10);
		factory.setMaxConcurrentConsumers(20);
		return factory;
	}
	
}
Advertisements

Apache Ignite Cluster together with Spring Boot

I have already introduced Apache Ignite in one of my previous articles In-memory data grid with Apache Ignite. Apache Ignite can be easily launched locally together with Spring Boot application. The only thing we have to do is to include artifact org.apache.ignite:ignite-spring-data to the project dependencies and then declare Ignite instance @Bean. Sample @Bean declaration is visible below.

@Bean
public Ignite igniteInstance() {
	IgniteConfiguration cfg = new IgniteConfiguration();
	cfg.setIgniteInstanceName("ignite-cluster-node");
	CacheConfiguration ccfg1 = new CacheConfiguration("PersonCache");
	ccfg1.setIndexedTypes(Long.class, Person.class);
	CacheConfiguration ccfg2 = new CacheConfiguration("ContactCache");
	ccfg2.setIndexedTypes(Long.class, Contact.class);
	cfg.setCacheConfiguration(ccfg1, ccfg2);
	IgniteLogger log = new Slf4jLogger();
	cfg.setGridLogger(log);
	return Ignition.start(cfg);
}

In this article I would like to show you a little more advanced sample where we will start multiple Ignite’s nodes inside cluster, Ignite’s web console for monitoring cluster, and Ignite’s agent for providing communication between cluster’s nodes and web console. Let’s begin by looking on the picture with an architecture of our sample solution.

ignite-2-1

We have three nodes which are part of the cluster. If you carefully take a look at the picture illustrating an architecture you have probably noticed that there are two nodes called as Server Node, and one called as Client Node. By default, all Ignite nodes are started as server nodes. Client mode needs to be explicitly enabled. Server nodes participate in caching, compute execution, stream processing, while client nodes provide an ability to connect to the servers remotely. However, they allow using the whole set of Ignite APIs, including near caching, transactions, compute and streaming.

Here’s Ignite’s client instance @Bean declaration.

@Bean
public Ignite igniteInstance() {
	IgniteConfiguration cfg = new IgniteConfiguration();
	cfg.setIgniteInstanceName("ignite-cluster-node");
	cfg.setClientMode(true);
	CacheConfiguration ccfg1 = new CacheConfiguration("PersonCache");
	ccfg1.setIndexedTypes(Long.class, Person.class);
	CacheConfiguration ccfg2 = new CacheConfiguration("ContactCache");
	ccfg2.setIndexedTypes(Long.class, Contact.class);
	cfg.setCacheConfiguration(ccfg1, ccfg2);
	return Ignition.start(cfg);
}

The fact is that we don’t have to do anything more to make our nodes working together within the cluster. Every new node is automatically detected by all other cluster’s nodes using multicast communication. When starting our sample application we only have to guarantee that each instance’s server would listen of different port by overriding server.port Spring Boot property. Here’s command that starts the sample application, which is available on GitHub (https://github.com/piomin/sample-ignite-jpa.git) under branch cluster (https://github.com/piomin/sample-ignite-jpa/tree/cluster). Each node exposes the same REST API, which may be easily tested using Swagger2 just by opening its dashboard available under address http://localhost:port/swagger-ui.html.

java -jar -Dserver.port=8901 -Xms512m -Xmx1024m -XX:+UseG1GC -XX:+DisableExplicitGC -XX:MaxDirectMemorySize=256m target/ignite-rest-service-1.0-SNAPSHOT.jar

If you have successfully started a new node you should see the similar information in your application logs.

>>> +----------------------------------------------------------------------+
>>> Ignite ver. 2.4.0#20180305-sha1:aa342270b13cc1f4713382a8eb23b2eb7edaa3a5
>>> +----------------------------------------------------------------------+
>>> OS name: Windows 10 10.0 amd64
>>> CPU(s): 4
>>> Heap: 1.0GB
>>> VM name: 14132@piomin
>>> Ignite instance name: ignite-cluster-node
>>> Local node [ID=9DB1296A-7EEC-4564-BAAD-14E5D4A3A08D, order=2, clientMode=false]
>>> Local node addresses: [piomin/0:0:0:0:0:0:0:1, piomin/127.0.0.1, piomin/192.168.1.102, piomin/192.168.116.1, /192.168.226.1, /192.168.99.1]
>>> Local ports: TCP:8082 TCP:10801 TCP:11212 TCP:47101 UDP:47400 TCP:47501

Let’s move back for a moment to the source code of our sample application. I assume you have already cloned a given repository from GitHub. There are two Maven modules available. The module ignite-rest-service is responsible for starting Ignite’s cluster node in server mode, while ignite-client-service for starting node in client mode. Because we run only a single instance of client’s node, we would not override its default port set inside application.yml file. You can build the project using mvn clean install command and then start with java -jar or just run the main class IgniteClientApplication from your IDE.

There is also JUnit test class inside module ignite-client-service, which defines one test responsible for calling HTTP endpoints (POST /person, POST /contact) that put data into Ignite’s cache. This test performs two operations. It puts some data to the Ignite’s in-memory cluster by calling endpoints exposed by client node, and then check if that data has been propagated through the cluster by calling GET /person/{id}/withContacts endpoint exposed by one of the selected server nodes.

public class TestCluster {

	TestRestTemplate template = new TestRestTemplate();
	Random r = new Random();
	int[] clusterPorts = new int[] {8901, 8902};

	@Test
	public void testCluster() throws InterruptedException {
		for (int i=0; i<1000; i++) {
			Person p = template.postForObject("http://localhost:8090/person", createPerson(), Person.class);
			Assert.notNull(p, "Create person failed");
			Contact c1 = template.postForObject("http://localhost:8090/contact", createContact(p.getId(), 0), Contact.class);
			Assert.notNull(c1, "Create contact failed");
			Contact c2 = template.postForObject("http://localhost:8090/contact", createContact(p.getId(), 1), Contact.class);
			Assert.notNull(c2, "Create contact failed");
			Thread.sleep(10);
			Person result = template.getForObject("http://localhost:{port}/person/{id}/withContacts", Person.class, clusterPorts[r.nextInt(2)], p.getId());
			Assert.notNull(result, "Person not found");
			Assert.notEmpty(result.getContacts(), "Contacts not found");
		}
	}

	private Contact createContact(Long personId, int index) {
		...
	}

	private Person createPerson() {
		...
	}

}

Before running any tests, we should launch two additional elements being a part of our architecture: Ignite's web console and agent. The most suitable way to run Ignite's web console on the local machine is through its Docker image apacheignite/web-console-standalone.  Here's Docker command that starts Ignite's web console and exposes it on port 80. Because I run Docker on Windows, it is now available under default VM address http://192.168.99.100/.

docker run -d -p 80:80 -p 3001:3001 -v /var/data:/var/lib/mongodb --name ignite-web-console apacheignite/web-console-standalone

In order to access it you should first register your user. Although mail server is not available on the Docker container, you would be logged in after it. You can configure your cluster using Ignite’s web console, and also run some SQL queries on that cluster. Of course, we still need to connect our cluster consisting of three nodes with the instance of web console started on Docker container. To achieve it you have to download a web agent. Probably it is not very intuitive, but you have to click button Start Demo, which is located on the right corner of Ignite’s web console. Then you would be redirected to the download page, where you can accept download of ignite-web-agent-2.4.0.zip file, which contains all needed libraries and configuration to start web agent locally.

ignite-2-2

After downloading and unpacking web agent go to its main directory and change property server-uri to http://192.168.99.100 inside default.properties file. Then you may run script ignite-web-agent.bat (or .sh if you are testing it on Linux), which starts web agent. Unfortunately, it’s not all what has to be done. Every server node’s application should include artifact ignite-rest-http in order to be able to communicate with the agent. It is responsible for exposing HTTP endpoint that is accessed by a web agent. It is based on Jetty server, what causes some problems in conjunction with Spring Boot. Spring Boot sets default versions of Jetty libraries used inside the project. The problem is that ignite-rest-http requires older versions of that libraries, so we also have to override some default managed versions in pom.xml file according to the sample visible below.

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-http</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-server</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-io</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-continuation</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-util</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
		<dependency>
			<groupId>org.eclipse.jetty</groupId>
			<artifactId>jetty-xml</artifactId>
			<version>9.2.11.v20150529</version>
		</dependency>
	</dependencies>
</dependencyManagement>

After implementing the changes described above, we may finally proceed to running all the elements being a part of our sample system. If you start Ignite Web Agent locally it should automatically detect all running cluster nodes. Here’s the screen with the logs displayed by the agent after startup.

ignite-2-3

At the same time you should see that a new cluster has been detected by Ignite Web Console.

ignite-2-4

You can configure a new or a currently existing cluster using web console, or just run a test query on the selected managed cluster. You have to include a name of cache as a prefix to the table name when defining a query.

ignite-2-5

Similar queries have be declared inside a repository interface. Here are additional methods used for finding entities stored in PersonCache. If you would like to include results stored in other cache, you have to explicitly declare its name together with table name.

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository {

	List findByFirstNameAndLastName(String firstName, String lastName);

	@Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.id=?")
	List<List> findByIdWithContacts(Long id);

	@Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
	List selectContacts(String firstName, String lastName);

	@Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
	List<List> selectContacts2(String firstName, String lastName);
}

We are nearing the end. Now, let’s run our JUnit test TestCluster in order to generate some test data and put it into the clustered cache. You can monitor a size of a cache using web console. All you have to do is to run SELECT COUNT(*) query, and set graph mode as a default mode for result displaying. The chart visible below illustrates number of entities stored inside Ignite’s cluster at 5s intervals.

ignite-2-6

JPA caching with Hazelcast, Hibernate and Spring Boot

Preface

In-Memory Data Grid is an in-memory distributed key-value store that enables caching data using distributed clusters. Do not confuse this solution with in-memory or nosql database. In most cases it is used for performance reasons – all data is stored in RAM not in the disk like in traditional databases. For the first time I had a touch with in-memory data grid while we considering moving to Oracle Coherence in one of organizations I had been working before. The solution really made me curious. Oracle Coherence is obviously a paid solution, but there are also some open source solutions among which the most interesting seem to be Apache Ignite and Hazelcast. Today I’m going to show you how to use Hazelcast for caching data stored in MySQL database accessed by Spring Data DAO objects. Here’s the figure illustrating architecture of presented solution.

hazelcast-1

Implementation

  • Starting Docker containers

We use three Docker containers. First with MySQL database, second with Hazelcast instance and third for Hazelcast Management Center – UI dashboard for monitoring Hazelcast cluster instances.

docker run -d --name mysql -p 33306:3306 mysql
docker run -d --name hazelcast -p 5701:5701 hazelcast/hazelcast
docker run -d --name hazelcast-mgmt -p 38080:8080 hazelcast/management-center:latest

If we would like to connect with Hazelcast Management Center from Hazelcast instance we need to place custom hazelcast.xml in /opt/hazelcast catalog inside Docker container. This can be done in two ways, by extending hazelcast base image or just by copying file to existing hazelcast container and restarting it.

docker run -d --name hazelcast -p 5701:5701 hazelcast/hazelcast
docker stop hazelcast
docker start hazelcast

Here’s the most important Hazelcast’s configuration file fragment.

<hazelcast xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.hazelcast.com/schema/config http://www.hazelcast.com/schema/config/hazelcast-config-3.8.xsd">
     <group>
          <name>dev</name>
          <password>dev-pass</password>
     </group>
     <management-center enabled="true" update-interval="3">http://192.168.99.100:38080/mancenter</management-center>
...
</hazelcast>

Hazelcast Dashboard is available under http://192.168.99.100:38080/mancenter address. We can monitor there all running cluster members, maps and some other parameters.

hazelcast-mgmt-1

  • Maven configuration

Project is based on Spring Boot 1.5.3.RELEASE. We also need to add Spring Web and MySQL Java connector dependencies. Here’s root project pom.xml.


	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.3.RELEASE</version>
	</parent>
	...
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>runtime</scope>
		</dependency>
	...
	</dependencies>

Inside person-service module we declared some other dependencies to Hazelcast artifacts and Spring Data JPA. I had to override managed hibernate-core version for Spring Boot 1.5.3.RELEASE, because Hazelcast didn’t worked properly with 5.0.12.Final. Hazelcast needs hibernate-core in 5.0.9.Final version. Otherwise, an exception occurs when starting application.

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		<dependency>
			<groupId>com.hazelcast</groupId>
			<artifactId>hazelcast</artifactId>
		</dependency>
		<dependency>
			<groupId>com.hazelcast</groupId>
			<artifactId>hazelcast-client</artifactId>
		</dependency>
		<dependency>
			<groupId>com.hazelcast</groupId>
			<artifactId>hazelcast-hibernate5</artifactId>
		</dependency>
		<dependency>
			<groupId>org.hibernate</groupId>
			<artifactId>hibernate-core</artifactId>
			<version>5.0.9.Final</version>
		</dependency>
	</dependencies>
  • Hibernate Cache configuration

Probably you can configure it in several different ways, but for me the most suitable solution was inside application.yml. Here’s YAML configurarion file fragment. I enabled L2 Hibernate cache, set Hazelcast native client address, credentials and cache factory class HazelcastCacheRegionFactory. We can also set HazelcastLocalCacheRegionFactory. The differences between them are in performance – local factory is faster since its operations are handled as distributed calls. While if you use HazelcastCacheRegionFactory, you can see your maps on Management Center.

spring:
  application:
    name: person-service
  datasource:
    url: jdbc:mysql://192.168.99.100:33306/datagrid?useSSL=false
    username: datagrid
    password: datagrid
  jpa:
    properties:
      hibernate:
        show_sql: true
        cache:
          use_query_cache: true
          use_second_level_cache: true
          hazelcast:
            use_native_client: true
            native_client_address: 192.168.99.100:5701
            native_client_group: dev
            native_client_password: dev-pass
          region:
            factory_class: com.hazelcast.hibernate.HazelcastCacheRegionFactory
  • Application code

First, we need to enable caching for Person @Entity.

@Cache(usage = CacheConcurrencyStrategy.READ_WRITE)
@Entity
public class Person implements Serializable {

	private static final long serialVersionUID = 3214253910554454648L;

	@Id
	@GeneratedValue
	private Integer id;
	private String firstName;
	private String lastName;
	private String pesel;
	private int age;

	public Integer getId() {
		return id;
	}

	public void setId(Integer id) {
		this.id = id;
	}

	public String getFirstName() {
		return firstName;
	}

	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}

	public String getLastName() {
		return lastName;
	}

	public void setLastName(String lastName) {
		this.lastName = lastName;
	}

	public String getPesel() {
		return pesel;
	}

	public void setPesel(String pesel) {
		this.pesel = pesel;
	}

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

	@Override
	public String toString() {
		return "Person [id=" + id + ", firstName=" + firstName + ", lastName=" + lastName + ", pesel=" + pesel + "]";
	}

}

DAO is implemented using Spring Data CrudRepository. Sample application source code is available on GitHub.

public interface PersonRepository extends CrudRepository<Person, Integer> {
	public List<Person> findByPesel(String pesel);
}

Testing

Let’s insert a little more data to the table. You can use my AddPersonRepositoryTest for that. It will insert 1M rows into the person table. Finally, we can call enpoint http://localhost:2222/persons/{id} twice with the same id. For me, it looks like below: 22ms for first call, 3ms for next call which is read from L2 cache. Entity can be cached only by primary key. If you call http://localhost:2222/persons/pesel/{pesel} entity will always be searched bypassing the L2 cache.

2017-05-05 17:07:27.360 DEBUG 9164 --- [nio-2222-exec-9] org.hibernate.SQL                        : select person0_.id as id1_0_0_, person0_.age as age2_0_0_, person0_.first_name as first_na3_0_0_, person0_.last_name as last_nam4_0_0_, person0_.pesel as pesel5_0_0_ from person person0_ where person0_.id=?
Hibernate: select person0_.id as id1_0_0_, person0_.age as age2_0_0_, person0_.first_name as first_na3_0_0_, person0_.last_name as last_nam4_0_0_, person0_.pesel as pesel5_0_0_ from person person0_ where person0_.id=?
2017-05-05 17:07:27.362 DEBUG 9164 --- [nio-2222-exec-9] o.h.l.p.e.p.i.ResultSetProcessorImpl     : Starting ResultSet row #0
2017-05-05 17:07:27.362 DEBUG 9164 --- [nio-2222-exec-9] l.p.e.p.i.EntityReferenceInitializerImpl : On call to EntityIdentifierReaderImpl#resolve, EntityKey was already known; should only happen on root returns with an optional identifier specified
2017-05-05 17:07:27.363 DEBUG 9164 --- [nio-2222-exec-9] o.h.engine.internal.TwoPhaseLoad         : Resolving associations for [pl.piomin.services.datagrid.person.model.Person#444]
2017-05-05 17:07:27.364 DEBUG 9164 --- [nio-2222-exec-9] o.h.engine.internal.TwoPhaseLoad         : Adding entity to second-level cache: [pl.piomin.services.datagrid.person.model.Person#444]
2017-05-05 17:07:27.373 DEBUG 9164 --- [nio-2222-exec-9] o.h.engine.internal.TwoPhaseLoad         : Done materializing entity [pl.piomin.services.datagrid.person.model.Person#444]
2017-05-05 17:07:27.373 DEBUG 9164 --- [nio-2222-exec-9] o.h.r.j.i.ResourceRegistryStandardImpl   : HHH000387: ResultSet's statement was not registered
2017-05-05 17:07:27.374 DEBUG 9164 --- [nio-2222-exec-9] .l.e.p.AbstractLoadPlanBasedEntityLoader : Done entity load : pl.piomin.services.datagrid.person.model.Person#444
2017-05-05 17:07:27.374 DEBUG 9164 --- [nio-2222-exec-9] o.h.e.t.internal.TransactionImpl         : committing
2017-05-05 17:07:30.168 DEBUG 9164 --- [nio-2222-exec-6] o.h.e.t.internal.TransactionImpl         : begin
2017-05-05 17:07:30.171 DEBUG 9164 --- [nio-2222-exec-6] o.h.e.t.internal.TransactionImpl         : committing

Query Cache

We can enable JPA query caching by marking repository method with @Cacheable annotation and adding @EnableCaching to main class definition.

public interface PersonRepository extends CrudRepository<Person, Integer> {

	@Cacheable("findByPesel")
	public List<Person> findByPesel(String pesel);

}

In addition to the @EnableCaching annotation we should declare HazelcastIntance and CacheManager beans. As a cache manager HazelcastCacheManager from hazelcast-spring library is used.

@SpringBootApplication
@EnableCaching
public class PersonApplication {

	public static void main(String[] args) {
		SpringApplication.run(PersonApplication.class, args);
	}

	@Bean
	HazelcastInstance hazelcastInstance() {
		ClientConfig config = new ClientConfig();
		config.getGroupConfig().setName("dev").setPassword("dev-pass");
		config.getNetworkConfig().addAddress("192.168.99.100");
		config.setInstanceName("cache-1");
		HazelcastInstance instance = HazelcastClient.newHazelcastClient(config);
		return instance;
	}

	@Bean
	CacheManager cacheManager() {
		return new HazelcastCacheManager(hazelcastInstance());
	}

}

Now, we should try find person by PESEL number by calling endpoint http://localhost:2222/persons/pesel/{pesel}. Cached query is stored as a map as you see in the picture below.

hazelcast-3

Clustering

Before final words let me say a little about clustering, what is the key functionality of Hazelcast in memory data grid. In the previous chapters we based on single Hazelcast instance. Let’s begin from running second container with Hazelcast exposed on different port.

docker run -d --name hazelcast2 -p 5702:5701 hazelcast/hazelcast

Now we should perform one change in hazelcast.xml configuration file. Because data grid is ran inside docker container the public address has to be set. For the first container it is 192.168.99.100:5701, and for second 192.168.99.100:5702, because it is exposed on 5702 port.

     <network>
        ...
	<public-address>192.168.99.100:5701</public-address>
        ...
     </network>

When starting person-service application you should see in the logs similar to visible below – connection with two cluster members.

Members [2] {
Member [192.168.99.100]:5702 - 04f790bc-6c2d-4c21-ba8f-7761a4a7422c
Member [192.168.99.100]:5701 - 2ca6e30d-a8a7-46f7-b1fa-37921aaa0e6b
}

All Hazelcast running instances are visible in Management Center.

hazelcast-2

Conclusion

Caching and clustering with Hazelcast are simple and fast. We can cache JPA entities and queries. Monitoring is realized via Hazelcast Management Center dashboard. One problem for me is that I’m able to cache entities only by primary key. If I would like to find entity by other index like PESEL number I had to cache findByPesel query. Even if entity was cached before by id query will not find it in the cache but perform SQL on database. Only next query call is cached. I’ll show you smart solution for that problem in my next article about that subject In memory data grid with Hazelcast.

RabbitMQ in cluster

RabbitMQ grown into the most popular message broker software. It is written in Erlang and implements Advanced Message Queueing Protocol (AMQP). It is easy to use and configure even if we are talking about such mechanisms as clustering or high availibility. In this post I’m going to show you how to run some instances of RabbitMQ provided in docker containers in the cluster with highly available (HA) queues. Based on the sample Java application we’ll see how to send and receive messages from the RabbitMQ cluster and check how this message broker handles a large number of incoming messages. Sample Spring Boot application is available on GitHub. Here is picture ilustrating architecture of the presented solution.

rabbitmq-cluster

We use docker official repository of RabbitMQ. Here are commands for running three RabbitMQ nodes. First node is the master of cluster – two other nodes will join him. We use container management to enable an UI administration console for each node. Every node has default connection and UI management ports exposed. Important thing is to link rabbit2 and rabbit3 constainers to rabbit1, which is necessary while joining to cluster mastering by rabbit1.

docker run -d --hostname rabbit1 --name rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30000:5672 -p 30001:15672 rabbitmq:management
docker run -d --hostname rabbit2 --name rabbit2 --link rabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30002:5672 -p 30003:15672 rabbitmq:management
docker run -d --hostname rabbit3 --name rabbit3 --link rabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30004:5672 -p 30005:15672 rabbitmq:management

Ok, now there are three RabbitMQ running instances. We can go to the UI management console for all of those instances available as docker containers, for example http://192.168.99.100:30001 (rabbitmq). Each instance is available on its independent cluster like we see in the pictures below. We would like to make all instances working in same cluster rabbit@rabbit1.

rabbit_cluster

rabbit_cluster2

Here’s set of commands run on rabbit2 instance for joining cluster rabbit@rabbit1. The same set should be run on rabbit3 node. In the beginning we have to connect to docker container and run bash command. Before running rabbitmq join_cluster command we have to stop broker.

docker exec -i -t rabbit2 \bash
root@rabbit2:/# rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...
root@rabbit2:/# rabbitmqctl join_cluster rabbit@rabbit1
Clustering node rabbit@rabbit2 with rabbit@rabbit1 ...
root@rabbit2:/# rabbitmqctl start_app
Starting node rabbit@rabbit2 ...

If everything was successful we should see cluster name rabbit@rabbit1 in upper right corner of rabbit2 management console. You should also see list of running nodes in the Nodes section. You can also check cluster status by running on every node command rabbitmqctl cluster_status, which should also display list of all cluster nodes.

rabbit_cluster3

After starting all nodes go to UI managent console on one of nodes. Now we are going to configure High Availibility for selected queue. It is not important which node you choose, because they are in one cluster. In the Queues tab create queue with name q.example. Then go to Admin tab and select Policies section and create new policy. In the picture below you can see policy I have created. I selected ha-mode=all which means that is mirrored across all nodes in the cluster and when new node is added to the cluster, the queue will be mirrored to that node. There are also available exactly, nodes modes – more about RabbitMQ High Availibility you can find here. In pattern field enter your queue name and in apply to select Queues. If everything was succeded you should see ha-all feature in queue row.

rabbit_cluster5.png

One of the greatest advantage of RabbitMQ is monitoring. You can see many statistics like memory, disk usage, I/O statistics, detailed message rates, graphs etc. Some of them you could see below.

rabbit_cluster6

rabbit_cluster7

RabbitMQ has a great support in Spring framework. There many projects in which use RabbitMQ implementation by default, for example Spring Cloud Stream, Spring Cloud Sleuth. I’m going to show you sample Spring Boot application that sends messages to RabbitMQ cluster and receives them from HA queue. Application source code is available on GitHub. Here’s main class of application. We enable RabbitMQ listener by declaring @EnableRabbit on class and @RabbitListener on receiving method. We also have to declare listened queue, broker connection factory and listener container factory to allow listener concurrency. Inside CachingConnectionFactory we set all three addresses of RabbitMQ cluster instances: 192.168.99.100:30000, 192.168.99.100:30002, 192.168.99.100:30004.

@SpringBootApplication
@EnableRabbit
public class Listener {

	private static Logger logger = Logger.getLogger("Listener");

	public static void main(String[] args) {
		SpringApplication.run(Listener.class, args);
	}

	@RabbitListener(queues = "q.example")
	public void onMessage(Order order) {
		logger.info(order.toString());
	}

	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setUsername("guest");
		connectionFactory.setPassword("guest");
		connectionFactory.setAddresses("192.168.99.100:30000,192.168.99.100:30002,192.168.99.100:30004");
		connectionFactory.setChannelCacheSize(10);
		return connectionFactory;
	}

	@Bean
	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		factory.setConcurrentConsumers(10);
		factory.setMaxConcurrentConsumers(20);
		return factory;
	}

	@Bean
	public Queue queue() {
		return new Queue("q.example");
	}

}

Conclusion

Clustering and High Availibility configuration with RabbitMQ is pretty simple. I like Rabbit MQ for support in the cluster monitoring process with UI management console. In my opinion it is user friendly and intuitive. In the sample application I send 100k messages into the sample queue. Using 20 concurrent consumers they were processed 65 seconds (~80/s per consumer thread) and memory usage at its peak was about 400MB on each node. Of cource our application is just receiving object message and logging it in console.