Messaging with JMS and RabbitMQ

RabbitMQ + JMSPivotal has a new connector available that lets you send and consume messages using Spring’s battle tested JmsTemplate and the JMS API with RabbitMQ as your broker.

In case you’re not familiar, RabbitMQ is an AMQP broker. You can download it on just about any platform. Some even have installers (like brew install rabbitmq on the Mac, and sudo apt-get install rabbitmq-server on Ubuntu Linux).

If you like the JMS API, this connector lets you shift to using RabbitMQ’s rock solid message broker without having to alter the rest of your application.

But this connector also affords you the option to seamlessly migrate towards AMQP messaging with Rabbit by first switching to RabbitMQ as a broker, and later updating your code using Spring AMQP.

If you’re new to Spring JMS and JmsTemplate, please read the rest of this blog to see how simple it makes JMS messaging.

Note: The RabbitMQ JMS connector is focused on supporting the JMS 1.1. spec, and is compliant with the most important elements of that specification. Click to read the complete details regarding the RabbitMQ JMS connector implementation.

This example of RabbitMQ JMS starts with building a stock trade listener, as shown below:

package com.rabbitmq.jms.sample;

import javax.jms.ConnectionFactory;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;

@Configuration
public class StockConsumer {

    private static final Log log = LogFactory.getLog(StockConsumer.class);

    @Bean
    public DefaultMessageListenerContainer jmsListener(ConnectionFactory connectionFactory) {
        DefaultMessageListenerContainer jmsListener = new DefaultMessageListenerContainer();
        jmsListener.setConnectionFactory(connectionFactory);
        jmsListener.setDestinationName("rabbit-trader-channel");
        jmsListener.setPubSubDomain(true);

        MessageListenerAdapter adapter = new MessageListenerAdapter(new Receiver());
        adapter.setDefaultListenerMethod("receive");

        jmsListener.setMessageListener(adapter);
        return jmsListener;
    }

    protected static class Receiver {
        public void receive(String message) {
            log.info("Received " + message);
        }
    }
}

cta-download-rabbitmq

The StockConsumer class contains Spring bean definitions that will be added to the application context at startup. The jmsListener() method creates an object that subscribes to JMS messages and responds accordingly.

It includes:

  • DefaultMessageListenerContainer – this container will respond asynchronously to messages. Observe how the method signature defines it’s expectation to be injected with a javax.jms.ConnectionFactory. RabbitMQ’s JMS connector implements this spec interface.
  • The destination name is rabbit-trader-channel.
  • pubSubDomain has been switched to true. That means it will consume messages in a topic-oriented fashion, allowing other consumers to receive copies as well.
  • The Receiver class doesn’t implement any JMS spec interfaces. But that’s okay! It is wrapped with Spring’s MessageListenerAdapter. This lets you create message driven POJOs.

We just looked at the consumption of message. Now let’s proceed to writing a stock quote producer. You can find a simple chunk of code here:

package com.rabbitmq.jms.sample;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import com.rabbitmq.jms.admin.RMQConnectionFactory;

@EnableAutoConfiguration
@EnableScheduling
@Configuration
@ComponentScan
public class StockQuoter {

    private static final Log log = LogFactory.getLog(StockQuoter.class);

    private List<String> stocks = new ArrayList<String>();
    private Map<String, Double> lastPrice = new HashMap<String, Double>();

    {
        stocks.add("AAPL");
        stocks.add("GD");
        stocks.add("BRK.B");

        lastPrice.put("AAPL", 494.64);
        lastPrice.put("GD", 86.74);
        lastPrice.put("BRK.B", 113.59);
    }

    @Autowired
    JmsTemplate jmsTemplate;

    @Bean
    ConnectionFactory connectionFactory() {
        return new RMQConnectionFactory();
    }

    @Scheduled(fixedRate = 5000L) // every 5 seconds
    public void publishQuote() {
        // Pick a random stock symbol
        Collections.shuffle(stocks);
        final String symbol = stocks.get(0);

        // Toss a coin and decide if the price goes...
        if (RandomUtils.nextBoolean()) {
            // ...up by a random 0-10%
            lastPrice.put(symbol, new Double(Math.round(lastPrice.get(symbol) * (1 + RandomUtils.nextInt(10)/100.0) * 100) / 100));
        } else {
            // ...or down by a similar random amount
            lastPrice.put(symbol, new Double(Math.round(lastPrice.get(symbol) * (1 - RandomUtils.nextInt(10)/100.0) * 100) / 100));
        }

        // Log new price locally
        log.info("Quote..." + symbol + " is now " + lastPrice.get(symbol));

        MessageCreator messageCreator = new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage("Quote..." + symbol + " is now " + lastPrice.get(symbol));
            }
        };

        jmsTemplate.send("rabbit-trader-channel", messageCreator);
    }

    public static void main(String[] args) {
        ApplicationContext ctx = SpringApplication.run(StockQuoter.class, args);
        log.info("connectionFactory => " + ctx.getBean("connectionFactory"));
    }

}

This stock quoter has the following key parts.

  • JmsTemplate is a key component used to publish JMS messages. It removes the need for you to deal with JMS’s busy API and instead focus on sending messages.
  • By using the @EnableAutoConfiguration annotation, Spring Boot is signaled to automatically create a JmsTemplate when it detects spring-jms on your classpath.
  • The central piece of this demo is the RMQConnectionFactory. This connection factory empowers JmsTemplate to connect to RabbitMQ and use it as a JMS broker. Spring Boot automatically plugs it into the JmsTemplate so you don’t have to lift a finger.
@Bean
ConnectionFactory connectionFactory() {
    return new RMQConnectionFactory();
}
  • This stock quoting demonstration app uses Spring Scheduling to generate a new quote every five seconds. It starts with a list of stocks and an initial price, then randomly raises or lowers the price by anywhere from 0 – 10% (at random). The key method which publishes the quote can be found in this line:
jmsTemplate.send("rabbit-trader-channel", messageCreator);

Note: For demonstration purposes, the produce and the consumer run inside the same JVM as one application. This is not a requirement. You can easily have each part running in a separate application.

You launch the application by typing:

$ mvn clean package && java -jar target/trader-1.0.4-SNAPSHOT.jar

You should then expect output similar to this:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::  (v0.5.0.BUILD-SNAPSHOT)

2013-09-16 15:19:59.158  INFO 68774 --- [ckQuoter.main()] com.rabbitmq.jms.sample.StockQuoter      : Starting StockQuoter on retina with PID 68774 (/Users/gturnquist/src/trader/target/classes started by gturnquist)
2013-09-16 15:19:59.193  INFO 68774 --- [ckQuoter.main()] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@22df3d0f: startup date [Mon Sep 16 15:19:59 CDT 2013]; root of context hierarchy
2013-09-16 15:19:59.449  INFO 68774 --- [ckQuoter.main()] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.scheduling.annotation.SchedulingConfiguration' of type [class org.springframework.scheduling.annotation.SchedulingConfiguration$$EnhancerByCGLIB$$62f4010e] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2013-09-16 15:19:59.602  INFO 68774 --- [ckQuoter.main()] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2013-09-16 15:19:59.679  INFO 68774 --- [ckQuoter.main()] com.rabbitmq.jms.sample.StockQuoter      : Started StockQuoter in 0.727 seconds
2013-09-16 15:19:59.680  INFO 68774 --- [pool-3-thread-1] com.rabbitmq.jms.sample.StockQuoter      : Quote...AAPL is now 464.0
2013-09-16 15:19:59.680  INFO 68774 --- [ckQuoter.main()] com.rabbitmq.jms.sample.StockQuoter      : connectionFactory => com.rabbitmq.jms.admin.RMQConnectionFactory@68d69cac
2013-09-16 15:19:59.801  INFO 68774 --- [  jmsListener-1] com.rabbitmq.jms.sample.StockConsumer    : Received Quote...AAPL is now 464.0
2013-09-16 15:20:04.678  INFO 68774 --- [pool-3-thread-1] com.rabbitmq.jms.sample.StockQuoter      : Quote...BRK.B is now 113.0
2013-09-16 15:20:04.779  INFO 68774 --- [  jmsListener-1] com.rabbitmq.jms.sample.StockConsumer    : Received Quote...BRK.B is now 113.0
2013-09-16 15:20:09.678  INFO 68774 --- [pool-3-thread-1] com.rabbitmq.jms.sample.StockQuoter      : Quote...GD is now 91.0
2013-09-16 15:20:09.753  INFO 68774 --- [  jmsListener-1] com.rabbitmq.jms.sample.StockConsumer    : Received Quote...GD is now 91.0

You can see that it prints out the details of connectionFactory, revealing it to be an RMQConnectionFactory. Then as it produces stock quotes in one part, they are transmitted through RabbitMQ and read from the other end.

Permalink

Tags: , ,

2 comments on “Messaging with JMS and RabbitMQ

  1. Siddhu on said:

    Do you have all of this code uploaded on Github perchance? I’m not familiar with Spring (or SpringBoot) or RabbitMQ, and would find it extremely useful to look at the code.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>