This tutorial gives introduction of JMS and introduces JMS models with source code (Point-to-Point (PTP) model, Publish / Subscribe model, Request – Reply model). This is suitable for JMS programmer who wants to learn and explore distributed communication.

Message-Oriented-Middleware

In Message-Oriented-Middleware, the applications use messages as a method of communication between them. A MOM client can send messages to, and receive messages from, any other client. The client connects to a messaging agent that provides creating, sending, receiving, and reading message facilities. We can say that the agent provides a loosely coupled distributed communication environment between two or more applications.


JMS

Sun Microsoft provides a standard API for writing an application for working with the available standard MOM products. It is called JMS and it supports both Synchronous and Asynchronous mode of communication. It provides different models like:

Point-to-Point (PTP) model
Publish / Subscribe model
Request – Reply model

Point-to-Point (PTP) model

In Point-to-Point model, QUEUES are used as means of communication between sender and receiver. The model is simple: for each message sent by sender, we have a receiver which can get message whether or not it was running when the client sent the message. This is because the messages are stored on QUEUES and when ever receiver comes online, it can fetch the message stored in the QUEUE.

Java Message Service-jms1_.jpg

Publish / Subscribe model

This model is different from the previous model. Here sender will be referred as publisher and receiver will be referred as subscriber. Publisher publishes a message and for each message published, we can have more than one subscriber. TOPIC is used to hold the messages and it is responsible for distributing the messages arriving from a topic's multiple publishers to its multiple subscribers. We can have multiple publishers and multiple subscribers.

Java Message Service-jms2_.jpg

Request – Reply Model

This model is used when real time response is required. JMS provides two API to implement Request – Reply model.

Java Message Service-jms3.jpg

QueueRequestor
You may use this is you wish to use QUEUE. It creates a TemporaryQueue for the responses and provides a request() method that sends the request message and waits for its reply.

TopicRequestor
This API uses TOPIC to get responses. It creates a TemporaryTopic for the responses and provides a request() method that sends the request message and waits for its reply.

Lets see how this works all together. A Requestor sends request and waits for the response using request method of TopicRequestor or QueueRequestor. On other side receiver receives the request ,does processing and send back the response to the originator using Message’s getJMSReplyTo method. I will now present requester and receiver application.

Requestor application (Topic)

Let me show you how to write a requester client application for request reply model. This will show you how the requestor sends request and receives the reply from receiver.

First step is to create a Hashtable for holding the environment settings.

Java Code:
Hashtable env = new Hashtable ();
env.put (Context.SECURITY_PRINCIPAL, "user");
env.put (Context.SECURITY_CREDENTIALS, "user");
env.put (Context.PROVIDER_URL,"http://localhost:1856");
env.put (Context.INITIAL_CONTEXT_FACTORY,
"fiorano.jms.runtime.naming.FioranoInitialContextFactory");
Second step involves performing a JNDI API lookup of the TopicConnectionFactory and Topic.

Java Code:
InitialContext ic = new InitialContext (env);
TopicConnectionFactory tcf = (TopicConnectionFactory) ic.lookup ("REQ_REPLY_TCF");
Topic topic = (Topic) ic.lookup ("REQ_REPLY_TOPIC");
After lookup, we will create a connection and a session since these are required to create TopicRequester. Review the code below:

Java Code:
TopicConnection tc = tcf.createTopicConnection("user", "pwd");
TopicSession ts = tc.createTopicSession(false,Session.AUTO_ACKNOWLEDGE );
TopicRequestor tRequestor = new TopicRequestor (ts, topic);
Now is the time to create a Message that is to be sent to Topic. Message can be TextMessage/MapMessage/StreamMessage/ObjectMessage.

Java Code:
MapMessage mapmsg = ts.createMapMessage();
mapmsg.setString("NAME", "Laiq");

All set. Now we can send the request and receive response. request () methods send a request and wait for a reply from receiver. Only one response is expected for each request.

Java Code:
MapMessage replyMsg = (MapMessage) tRequestor.request (mapmsg);
Response can be retrieved as follows:

Java Code:
String replyMsgString = replyMsg.getString("MSG");
System.out.println(“Response received: ”+ replyMsgString);
All is done. Now we may close the connection. It will also close the session and
TopicRequestor.

Java Code:
tc.close();
Receiver application (Topic)

I will now present a receiver client application which responses for the requester’s request. It will show how to process the request received, how to get the handle of the originator and how to send back the reply to request originator.

First step is the same as we did for requester application. Create a Hashtable for holding the environment settings.

Java Code:
Hashtable env = new Hashtable ();
env.put (Context.SECURITY_PRINCIPAL, "user");
env.put (Context.SECURITY_CREDENTIALS, "user");
env.put (Context.PROVIDER_URL,"http://localhost:1856");
env.put (Context.INITIAL_CONTEXT_FACTORY,
"fiorano.jms.runtime.naming.FioranoInitialContextFactory");
Now the task is to performs a JNDI API lookup of the TopicConnectionFactory and Topic as shown below:

Java Code:
InitialContext ic = new InitialContext (env);
TopicConnectionFactory tcf = (TopicConnectionFactory) ic.lookup ("REQ_REPLY_TCF");
Topic topic = (Topic) ic.lookup ("REQ_REPLY_TOPIC");
Next step is to create a connection and a session. Once that is done we need to create a TopicSubscriber using the session.

Java Code:
TopicConnection topicConnection = tcf.createTopicConnection();
TopicSession topicSession = topicConnection.createTopicSession(false,1);
TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);

Now is the time to create an instance of the Message Listener class. Once instance is created, we need to register it as the message listener for the TopicSubscriber. Then we can start the connection which means the message delivery will begin.

Java Code:
topicSubscriber.setMessageListener (new RequestListener(topicSession,topic));
topicConnection.start ();

Now all is set. We have to listen for the messages published to the topic. RequestListener class must implements MessageListener and should override the onMessage
method. This method will be called once we get a message.

Java Code:
class RequestListener implements MessageListener
{
Topic topic=null;
TopicSession topicSession =null;
TopicPublisher topicReplier=null;
public RequestListener(TopicSession ts, Topic topic) throws JMSException
{
this.topicSession=ts;
this.topic=topic;
topicReplier = ts.createPublisher (topic);
}
public void onMessage (Message mapMessage)
{
// code goes here
}
…
}
A request is retrieved in onMessage() method as shown below:

Java Code:
String name = ((MapMessage)mapMessage).getString("NAME");
Response message is also created in onMessage() method. Once response message is ready, we need to get an handle of request originator so we can send the response back to the originator.

Java Code:
MapMessage replyMsg = topicSession.createMapMessage();
String replyString = "Welcome "+ name + "to the world of JMS ";
replyMsg.setString("MSG", replyString);

Topic topicreply = (Topic) mapMessage.getJMSReplyTo();
TopicPublisher topicReplier = topicSession.createPublisher (topic);
topicReplier.publish (topicreply, replyMsg);
Last step is to close the connection which will closes the session and TopicSubscriber.

Java Code:
topicConnection.close();
CLIENT_ACKNOWLEDGE mode

We may use CLIENT_ACKNOWLEDGE mode for request response model. If used, the receiver acknowledges a message by calling a message's acknowledge method. This sends acknowledgements for all previous messages received. Point to note is that each message sent by a JMS provider is assigned a message ID and it is a provider specific value. This message ID (maintained in JMSCorrelationID header field) is used to link response with request.


Example

This example will present Publish-Subscribe and Point-to-Point messaging. Let me present a scenario. We have a server (StockServer class) which just keeps sending Stock Quotes to a topic (NASDAQ_Topic). A Subscriber (Subscriber class) wishes to get the stock details published by StockServer. It then subscribes to this Topic and receives notification of the stock prices.

The Subscriber is also connected to his stock-broker’s buy queue (Buy_Queue) and sell queue (Sell_Queue) through a Sender class. The subscriber uses Sender class sends Buy and Sell messages to the appropriate queues. The Stock Agents who are responsible for Buying Stocks (represented by the BuyAgent class) and Selling Stocks (represented by the SellAgent class) are subscribed to the Buy_Queue and the Sell_Queue respectively.

We are using Publish-Subscribe messaging between the StockServer and the Subscriber and Point-to-Point or Queue based messaging between the Sender and the BuyAgent and SellAgent class.

Point-to-Point Messaging (Queueing)

In Point-to-Point messaging, there may be multiple message senders of but there is only a single receiver for the messages. Queue is used to represent the destination. The queues are created by an administrator and registered with a JNDI context. Clients can send messages to a Queue and receive messages from a Queue. Queues will expect to have many senders sending messages but only a single receiver.

Queue Sender

The source code for our Queue Sender class (Sender.java) which sends buy or sell messages to the Buy_Queue or Sell_Queue is shown below. Code is commented for better understanding.

Java Code:
import javax.jms.*;
import fiorano.jms.rtl.FioranoInitialContext;

public class Sender {

String m_customerID; // Account ID of this customer
QueueSession m_queueSession;
QueueSender m_buySender;
QueueSender m_sellSender;

public Sender (String customerID) {
m_customerID = customerID;
}

public void finalize () {
try {
m_queueSession.close();
m_buySender.close();
m_sellSender.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public void TalkToYourStockBroker () {
try {
// ==============================================================
// 1. Create the InitialContext Object used for looking up
// JMS administered objects on the Fiorano/EMS
// located on the default host.
// ==============================================================
FioranoInitialContext initialCtx = null;
initialCtx = new FioranoInitialContext ();
initialCtx.bind ();
// ==============================================================
// 1.1 Lookup Connection Factory and Queue names
// ==============================================================
QueueConnectionFactory queueConnFactory = (QueueConnectionFactory)
initialCtx.lookup ("primaryQCF");
Queue buyQueue = (Queue)initialCtx.lookup("Buy_Queue");
Queue sellQueue = (Queue)initialCtx.lookup("Sell_Queue");
// ==============================================================
// 1.2 Dispose the InitialContext resources
// ==============================================================
initialCtx.dispose();
// ==============================================================
// 2. create and start a queue connection
// ==============================================================
System.out.println("Creating Queue connections");
QueueConnection queueConnection = queueConnFactory.createQueueConnection();
queueConnection.start ();
// ==============================================================
// 3. create queue session on the connection just created
// ==============================================================
System.out.println
("Creating queue session: not transacted, auto ack");
m_queueSession = queueConnection.createQueueSession
(false, Session.AUTO_ACKNOWLEDGE);
// ==============================================================
// 4. create senders for the Queue
// ==============================================================
System.out.println("Creating Queue, senders");
m_buySender = m_queueSession.createSender(buyQueue);
m_sellSender = m_queueSession.createSender(sellQueue);
}
catch (Exception e) {
e.printStackTrace ();
}
}

void SellStocks (String symbol, int sellNumStocks, double price) {
try {
// ==============================================================
// 5. Create a text message
// ==============================================================
TextMessage textmsg = m_queueSession.createTextMessage();
textmsg.setText ("Customer ID :"+ m_customerID + ". Sell "+
sellNumStocks + " stocks of " + symbol+
" at $"+ price);
// ==============================================================
// Set and Publish the message
// ==============================================================
m_sellSender.send(textmsg, DeliveryMode.PERSISTENT, 5, 0);
} catch (Exception e) {
e.printStackTrace();
}
}

public void BuyStocks(String symbol, int buyNumStocks, double price) {
try {
// ==============================================================
// 5. Create a text message
// ==============================================================
TextMessage textmsg = m_queueSession.createTextMessage();
textmsg.setText ("Customer ID :"+ m_customerID + ". Buy "+
buyNumStocks + " stocks of " +symbol+
" at $"+ price);
// ==============================================================
// Set and Publish the message
// ==============================================================
m_buySender.send(textmsg, DeliveryMode.PERSISTENT, 5, 0);
} catch (Exception e) {
e.printStackTrace();
}
}
}


Queue receiver

The Buy Agent (BuyAgent class) is used to receive buy messages from the Buy_Queue for buying stock. The source code for our buyer class (BuyAgent.java) which receives stock buy messages from the Buy_Queue shown below.

Receiving messages from queue involves following steps:
First create the InitialContext Object that will be used for looking up JMS administered objects on the on the host. Then create the FioranoInitialContext and bind to it. Now comes the Time to lookup Connection Factory and Queue names. One you have those, dispose the InitialContext resources. Create and start a queue connection and create a queue session on this Connection. Create a receiver for this queue and when a message is received, print it out to standard output.

Java Code:
import javax.jms.*;
import java.io.*;
import java.net.*;
import fiorano.jms.rtl.*;

public class BuyAgent {

public static void main (String args[]) {
// ==================================================================
// 1. Create the initial context string to lookup the Queue
// connection factory.
// ==================================================================
FioranoInitialContext initialCtx = null;
try {
initialCtx = new FioranoInitialContext ();
initialCtx.bind ();
// ==============================================================
// 1.1 Lookup Connection Factory and Queue names
// ==============================================================
QueueConnectionFactory queueConnFactory = (QueueConnectionFactory)
initialCtx.lookup ("primaryQCF");
Queue queue = (Queue)initialCtx.lookup("Buy_Queue");
// ==============================================================
// 1.2 Dispose the InitialContext resources
// ==============================================================
initialCtx.dispose();
// ==============================================================
// 2. Create and start a Queue connection
// ==============================================================
System.out.println("Creating Queue connection");
QueueConnection queueConnection = queueConnFactory.createQueueConnection();
queueConnection.start ();
// ==============================================================
// 3. Create a Queue session on this connection
// ==============================================================
System.out.println("Creating Queue session: not trans, auto ack");
QueueSession queueSession = queueConnection.createQueueSession
(false, Session.AUTO_ACKNOWLEDGE);
// ==============================================================
// 4. Create a Queue and a Sender for it
// ==============================================================
System.out.println("Creating Queue Receiver");
QueueReceiver receiver = queueSession.createReceiver (queue);
while (true) {
// ==============================================================
// 5. Wait for a request
// ==============================================================
TextMessage request = (TextMessage) receiver.receive ();
System.out.println ("Execute Job : " + request.getText ());
}
}
catch (JMSException e) {
e.printStackTrace ();
}
} // main
}

I do hope you have now developed understanding of JMS and will start exercising it. Happy coding!