Java EE 7: JMS with ActiveMQ

Another interesting topic in a JEE 7 application is messaging. So in this article I’ll take a peek at ActiveMQ and Java Message (JMS) to see how to tie them together to a working application.

In this article I’ll use the Java EE 7 project set up in the last article.

Java Message Service is a good thing if you want to communicate with a remote system but do not want REST, SOAP or anything extra protocol in-between your systems. Another scenario is for example: another application needs your data (or sends you some) — and both applications are located on the same server. In this case a web service would be overhead. So with JMS you can set the boundaries of the applications but remove some overhead from the communication.

Messaging models

There are two models in messaging: Point-to-Point or Publisher-Subscriber.

Point-to-Point

In this model a publisher sends a message to a specific consumer destination: a queue. There can be multiple senders of messages but there is only one consumer which works off the data in the queue. The messages remain in the queue until they are consumed or expire.

Publisher-Subscriber

In this model there is one publisher who pushes its messages to a topic. A consumer can then subscribe itself to this topic and get the broadcasted messages. There can be multiple publishers who send messages and multiple subscribers who consume the messages. A subscriber receives the messages which are published while it has an active connection. However there exists the possibility to create a durable subscription where those messages get republished when the consumer did not have an active connection.

Synchronous and Asynchronous messaging

As the two models there are two types of messaging: synchronous and asynchronous.

When synchronous then the producer sends a message and a consumer tells the producer that the message arrived — confirms the message. In the asynchronous way this confirmation is left behind.

But this is enough of introduction to messaging. If you are interested to get more deeply into this topic then look at google or ask me to write a longer article in this topic — and I’ll do it within a week. I promise.

Requirements

For this article to follow I assume the following system architecture: a Java EE 7 project (perhaps running with JDK 8), a WildFly application server (I use 8.1.0) and ActiveMQ (I use 5.10.0). I will not go into installation details for this system landscape.

I assume you can compile, deploy and run your Java application. This article is not about creating and building a project but extending the functionality with JMS. About how to create a Java EE project I’ll prepare another article later in the near future.

Configuring ActiveMQ

What you need to configure in ActiveMQ are the Queues and the Topics you want messages sent to / consumed from.

You can do this either in the administration GUI (http://localhost:8161 per default, username and password are both admin). Here you can create queues and topics with simply given them a name and click the Create button. This is the easiest way to configure.

However if you have a project where you co-develop with other developers this is a bit problematic because everyone needs to create the same queues / topics which is error-prone (mostly typos). So in this case you should use a configuration file where you define your queues and topics.

First we need to add a management-context tag to the already existing management-context tag. This defines the context where the connection port and JMS domain is.

The second we have to add are the destinations in ActiveMQ: they come into the broker tag as follows:

<destinations>
   <queuephysicalName="example.queue"/>
   <topicphysicalName="example.topic"/>
</destinations>

For this we are done with ActiveMQ.

The configuration you can find in the activemq.xmlfile in the config folder of the repository.

Configuring WildFly

To be able to use the configured ActiveMQ we need to add some configuration to our WildFly too.

For this we need to add some entries into the standalone.xml file: we have to provide values to the resource-adapters subsystem tag. Here we define the resource-adapter and configure the JNDI names for our connection factory and queue and topic. You can see the configuration in the standalone.xml file again in the config folder.

As you can see the archive of the resource adapter is the activemq-rar.rar file. This we have to place in the deployments folder in WildFly’s standalone folder to deploy the resource adapter to our runtime. The file is provided in the config folder too.

Tying it all together in Java

After everything is set let’s tie all those configurations together in Java.

Sennding to a queue asynchronously

Point-to-Point messaging is considered mostly synchronous. For me it is a bit much overhead because I do not want to “sit and wait” until some other system acknowledges my message. So for this example I’ll take the asynchronous way. For this I have to create the session with the AUTO_ACKNOWLEDGE acknowledge mode. For this if you have your messaging running you can send messages and if they arrive at the queue (for this article in ActiveMQ) you are done — and do not have to consider if there is a consumer at the other side.

But first we need to get our ConnectionFactory and Queue as resources. For this we have to define two variables:

@Resource(lookup = "ExampleConnectionFactory")
private ConnectionFactory connectionFactory;

@Resource(lookup = "example.queue")
private Queue queue;

Here the lookup is the name of the reference where the resource points to. As you can see, those lookup names are from the ActiveMQ configuration above. Java looks for the connection factory and the queue by their JNDI name.

To get a Session from those resources above we need to create a connection and a session to the connection.

Note: a connection needs to be closed explicitly.

Connection connection = connectionFactory.createConnection(); 

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

To be able to send a message, we need a message producer. MessageProducer are simply created from the session:

MessageProducer messageProducer = session.createProducer(queue);

And sending goes with the messageProducer created above:

messageProducer.send(session.createTextMessage("Hello JMS example!"));
messageProducer.send(session.createObjectMessage(new QueueObject("This is the test message to a queue", 1L)));

In the example above we send two different kind of messages: the first one is a simple text message, the second one is an object message containing a special object — created by us or a third-party.

Receiving the messages asynchronously

Because the message is sent asynchronously we will receive them in that manner too. For this we create again a session with AUTO_ACKNOWLEDGE.

This mode is the default, so we do not need to worry about setting it. But how do we get the message? The solution is simple: we’ll use a MessageBean (or MBean) which reads the contents of the queue.

An MBean has two properties which have to be there to be a real MBean:

  • it has to implement the javax.jms.MessageListener interface
  • it need the javax.ejb.MessageDriven annotation with the configurations of the queue it should listen to

Let’s see what those requirements mean.

Implementing the interface brings the onMessage method which needs to be implemented. This method is called every time the queue gets a message and the consumer is active. It gets a Message as a parameter. This can be casted to a particular type for further use.  A plain message does not really have much valuable to use so it is better to convert it to the particular instance type. In this example I’ll show you how to use TextMessages (this is far too easy) and ObjectMessages.

@Override
public void onMessage(Message message) {
    try {
        if (message instanceof TextMessage) {
            System.out.println("TextMessage received: " + ((TextMessage) message).getText());
        } else if (message instanceof ObjectMessage) {
            System.out.println("ObjectMessage received.");
        } else {
            System.out.println("Unknown message type:" + message.getClass());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

The configuration is done via the MessageDriven annotation. This gets a bunch of javax.ejb.ActivationConfigPropertys. The two minimum required properties are destination and destinationType. First is the name of the queue, the second is in this case javax.jms.Queue as a plain string. Later with topics we’ll see another case when we listen to a javax.jms.Topic.

@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "example.queue"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})

Additionally you can configure other properties such as acknowledge for example.

Multiple listeners

It can happen, that there are multiple MBeans listening to the same queue. What is happening in this case?

The answer seems simple: there is no exception, every MBean can register itself to the queue however when dealing with messages they get into a round-robin: every time a new message arrives it will be delivered to the next MBean registered to the queue. I will not demonstrate it in the example but simply copy the QueueReader class into the same package, rename it respectively and start the server and send messages into the queue.

Broadcasting to a topic

Broadcasting to a topic is almost the same as sending messages to a queue: you only send the same messages (through a MessageSender and a Session) but to a topic instead of a Queue.

@Resource(lookup = "example.topic")
private Topic topic;

Creating  a session and a message producer goes the same, but you have to change the queue to the topic.

Sending messages is the same as above:

messageProducer.send(session.createTextMessage("Hello JMS example!"));
messageProducer.send(session.createObjectMessage(new TopicObject("Message for broadcasted topic", 2L)));

Receiving broadcasted messages

Receiving is the same mechanism as it was with Queues: we need an MBean but now the destinationType will be set to javax.jms.Topic. And the destination will point to a topic instead of a queue.

The main difference between queues and topics is that topics broadcast messages among the subscribers: so every active subscriber gets the message sent to the topic. So if you have two consumers both will get the message sent to the topic.

 @Override
public void onMessage(Message message) {
    try {
        if (message instanceof TextMessage) {
            print("TextMessage received: " + ((TextMessage) message).getText());
        } else if (message instanceof ObjectMessage) {
            print("ObjectMessage received.");
        } else {
            print("Unknown message type:" + message.getClass());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

The print method prints the name of the current subscriber and the received message. For subscribers, if you will do the same in every instance, you can create an abstract class to write the onMessage method only once — so you follow the DRY principle and your code is easier maintained.

Testing the application

After configuring the application and starting the server, you can access and test the contents of the article at http://localhost:8080/example/test.jsf

If you click the buttons, you get something similar to this:

send text message to queue

09:15:32,608 INFO  [stdout] (default-threads - 4) TextMessage received: Hello JMS example!

send object message to queue

09:15:34,989 INFO  [stdout] (default-threads - 5) ObjectMessage received of typeclass biz.hahamo.dev.enterprise.example.QueueObject

broadcast text message

09:15:38,800 INFO  [stdout] (default-threads - 6) Subscriber 2 TextMessage received: Hello JMS example!

09:15:38,800 INFO  [stdout] (default-threads - 7) Subscriber1 TextMessage received: Hello JMS example!

broadcast object message

09:15:39,811 INFO  [stdout] (default-threads - 8) Subscriber 2 ObjectMessage received.

09:15:39,811 INFO  [stdout] (default-threads - 9) Subscriber1 ObjectMessage received.

Source Code

As mostly the source code of this article is available at my GitHub repository. In this case in the JMS branch of the Java-EE repo. If you have any question or find problems feel free to fire up an issue at GitHub or write me a mail.

Advertisements

One thought on “Java EE 7: JMS with ActiveMQ

  1. Pingback: Java EE 7: JMS with ActiveMQ | Dinesh Ram Kali.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s