:::: MENU ::::

JMS


JMS (Java Messaging Service) es una especificación incluída en Java Enterprise Edition para el intercambio de mensajes.

Funcionamiento de la mensajería

En JMS los mensajes se emiten por el emisor, se dejan en un componente intermedio y tiempo más tarde se recogen por el receptor.

Componentes intermedios: colas y tópicos

– Colas: sirven para el envío de mensajes entre un único emisor y un único receptor.
– Tópicos: se utilizan para que un emisor envíe el mismo mensaje a varios receptores. En este caso el emisor se llama productor y los receptores, subscriptores.

Implementaciones

Al ser una especificación, existen varias implementaciones (ActiveMQ, IBM MQ). En este ejemplo usaremos ActiveMQ.

Servidor JMS

Para poder realizar algún envío de mensajes, es necesario que exista y esté en funcionamiento un servidor de mensajería (también llamado broker). Después podrán usarse los programas que emitan y reciban mensajes conectándose a dicho servidor.

Instalación de servidor ActiveMQ

Descargar y extraer apache-activemq-5.15.2-bin de (http://activemq.apache.org/activemq-5152-release.html).

Programas emisor y receptor

Vamos a crear un emisor y un receptor (que a su vez necesitará un listener).

Emisor:

package app;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class MySender {
	public static void main(String[] args) {
		
		try {
			Properties jndiParameters = new Properties();
			jndiParameters.put(Context.INITIAL_CONTEXT_FACTORY,
					"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
			// jndiParameters.put(Context.PROVIDER_URL, "tcp://localhost:61616");
			InitialContext initialContext = new InitialContext(jndiParameters);
			QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext
					.lookup("ConnectionFactory");
			QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
			queueConnection.start();
			QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
			Queue queue = (Queue) initialContext.lookup("dynamicQueues/Q1");
			QueueSender queueSender = queueSession.createSender(queue);
			TextMessage textMessage = queueSession.createTextMessage();

			BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
			
			while (true) {
				System.out.println("Send message (empty line to exit):");
				String line = bufferedReader.readLine();
				
				if (line.equals("")) {
					break;
				}
				
				textMessage.setText(line);
				queueSender.send(textMessage);
			}
			
			queueConnection.close();
		} catch (Exception exception) {
			System.out.println(exception);
		}
	}
}

 

Receptor:

package app;

import java.util.Properties;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

public class MyReceiver {
	
	public static void main(String[] args) {
		
		try {
			Properties jndiParameters = new Properties();
			jndiParameters.put(Context.INITIAL_CONTEXT_FACTORY,
					"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
			// jndiParameters.put(Context.PROVIDER_URL, "tcp://localhost:61616");
			InitialContext initialContext = new InitialContext(jndiParameters);
			QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext
					.lookup("ConnectionFactory");
			QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
			queueConnection.start();
			QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
			Queue queue = (Queue) initialContext.lookup("dynamicQueues/Q1");
			QueueReceiver queueReceiver = queueSession.createReceiver(queue);

			MyListener myListener = new MyListener();

			queueReceiver.setMessageListener(myListener);

			System.out.println("MyListener waiting...");
			
			while (true) {
				Thread.sleep(1000);
			}
			
		} catch (Exception exception) {
			System.out.println(exception);
		}
	}

}

 

Listener:

package app;

import javax.jms.*;

public class MyListener implements MessageListener {

	public void onMessage(Message message) {
		
		try {
			TextMessage textMessage = (TextMessage) message;
			System.out.println("Received: " + textMessage.getText());
		} catch (JMSException jmsException) {
			System.out.println(jmsException);
		}
	}
}

 

Demostración de funcionamiento

Arrancar el servidor JMS lanzando el comando “activemq start” en la carpeta “bin”.

Iniciar la aplicación receptora de mensajes.

Iniciar la aplicación emisora de mensajes.

Al enviar un mensaje desde el emisor veremos cómo aparece en el receptor.

Repositorio

El código está disponible en https://github.com/luisgomezcaballero/jms.


So, what do you think ?