:::: MENU ::::

JMS


JMS (Java Messaging Service) is a specification included in Java Enterprise Edition for the exchange of messages.

Operation of the messaging

In JMS messages are emitted by the sender, they are left in an intermediate component and, later, they are retrieved by the receiver.

Intermediate components: queues and topics

– Queues: used to send messages between a single emitter and a single receiver.
– Topics: used for an issuer to send the same message to several receivers. In this case, the emitter is called producer and the receivers, subscribers.

Implementations

As it is a specification, there are several implementations (ActiveMQ, IBM MQ). In this example we will use ActiveMQ.

JMS server

To send messages, it is necessary that a messaging server exists and is running. After, we can use using programs that issue and receive messages by connecting to that server.

ActiveMQ server installation

Download and extract apache-activemq-5.15.2-bin from (http://activemq.apache.org/activemq-5152-release.html).

Issuer and receiver programs

We are going to create an emitter and a receiver (that needs a listener).

Emitter:

package app;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class MySender {
	public static void main(String[] args) {
		
		try {
			Properties jndiParameters = new Properties();
			jndiParameters.put(Context.INITIAL_CONTEXT_FACTORY,
					"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
			// jndiParameters.put(Context.PROVIDER_URL, "tcp://localhost:61616");
			InitialContext initialContext = new InitialContext(jndiParameters);
			QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext
					.lookup("ConnectionFactory");
			QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
			queueConnection.start();
			QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
			Queue queue = (Queue) initialContext.lookup("dynamicQueues/Q1");
			QueueSender queueSender = queueSession.createSender(queue);
			TextMessage textMessage = queueSession.createTextMessage();

			BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
			
			while (true) {
				System.out.println("Send message (empty line to exit):");
				String line = bufferedReader.readLine();
				
				if (line.equals("")) {
					break;
				}
				
				textMessage.setText(line);
				queueSender.send(textMessage);
			}
			
			queueConnection.close();
		} catch (Exception exception) {
			System.out.println(exception);
		}
	}
}

 

Receiver:

package app;

import java.util.Properties;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

public class MyReceiver {
	
	public static void main(String[] args) {
		
		try {
			Properties jndiParameters = new Properties();
			jndiParameters.put(Context.INITIAL_CONTEXT_FACTORY,
					"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
			// jndiParameters.put(Context.PROVIDER_URL, "tcp://localhost:61616");
			InitialContext initialContext = new InitialContext(jndiParameters);
			QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext
					.lookup("ConnectionFactory");
			QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
			queueConnection.start();
			QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
			Queue queue = (Queue) initialContext.lookup("dynamicQueues/Q1");
			QueueReceiver queueReceiver = queueSession.createReceiver(queue);

			MyListener myListener = new MyListener();

			queueReceiver.setMessageListener(myListener);

			System.out.println("MyListener waiting...");
			
			while (true) {
				Thread.sleep(1000);
			}
			
		} catch (Exception exception) {
			System.out.println(exception);
		}
	}

}

 

Listener:

package app;

import javax.jms.*;

public class MyListener implements MessageListener {

	public void onMessage(Message message) {
		
		try {
			TextMessage textMessage = (TextMessage) message;
			System.out.println("Received: " + textMessage.getText());
		} catch (JMSException jmsException) {
			System.out.println(jmsException);
		}
	}
}

 

Operation demonstration

Start the JMS server by launching the “activemq start” command in the “bin” folder.

Start the application that receives messages.

Start the message emmiter application.

After sending a message from the issuer we will see how it appears on the receiver.

Repository

The code is available at https://github.com/luisgomezcaballero/jms.


So, what do you think ?