Message Oriented Middleware: Java JMS

Message Oriented Middleware: Java JMS
LSUB
GSYC
29 de abril de 2015
(cc) 2013 Laboratorio de Sistemas,
Algunos derechos reservados. Este trabajo se entrega bajo la licencia Creative Commons Reconocimiento NoComercial - SinObraDerivada (by-nc-nd). Para obtener la licencia completa, véase
http://creativecommons.org/licenses/. También puede solicitarse a Creative Commons, 559 Nathan Abbott Way,
Stanford, California 94305, USA.
Las imágenes de terceros conservan su licencia original.
MOM
• MOM: Message Oriented Middleware. JMS (Java Message
Service) es el MOM de Java.
• Permite la comunicación débilmente acoplada, fiable, sı́ncrona
y ası́ncrona entre clientes mediante el paso de mensajes.
• Es una especificación de API que forma parte de Java EE, hay
múltiples implementaciones.
• El MOM depende de un servicio central o broker para
gestionar los mensajes.
• Usaremos GlassFish como servidor de aplicaciones Java EE.
OOM/RPC vs MOM
OOM/RPC vs MOM
• RPC: el objetivo es ejecutar un procedimiento: ”haz esto”.
• MOM: el objetivo es notificar un evento: ”ha pasado esto”.
JMS
¿Cuándo se usa una comunicación débilmente acoplada?
• Si el emisor quiere no saber nada del receptor y viceversa.
Sólo se debe saber el formato de los mensajes.
• No se quiere depender de las interfaces del resto de
componentes, como en RCP/RMI.
• No se quiere depender de la ejecución de los otros
componentes: los mensajes pueden quedar almacenados en el
sistema y ser atendidos más tarde.
JMS
Una aplicación JMS se compone de:
• Clientes: componentes que envı́an y reciben mensajes. Se
garantiza que el mensaje se entrega justo una vez.
• Mensajes: objetos de comunicación entre componentes.
• Proveedor JMS: el sistema que implementa el API de JMS. La
plataforma Java EE proporciona un proveedor de JMS. Hay
distintas implementaciones.
• Los objetos administrados (administered objects) necesarios
para interactuar con el proveedor. Se administran mediante
una herramienta (p. ej. asadmin).
JMS
Hay tipos de comunicación en JMS:
• Point-to-point.
• Publish/subscribe.
Point-to-Point
• Un consumidor por mensaje.
• Colas de mensajes persistentes: emisor y receptor no tienen
dependencia temporal.
• Los mensajes se entregan ordenados.
• El receptor confirma el procesamiento de un mensaje.
Imagen: (c) Oracle
Publish/Subscribe
• Múltiples consumidores por mensaje.
• Un consumidor se subscribe a un tema (topic).
• El receptor debe estar activo para recibir: emisor y receptor
tienen dependencia temporal.
Imagen: (c) Oracle
JMS
Tipos de mensajes:
• Text: Strings.
• Object: Un objeto Serializable.
• Bytes: Un array de bytes.
• Map: Un diccionario.
• Stream: Un stream de valores primitivos.
Arquitectura
Imagen: (c) Oracle
Administered Objects
La herramienta de configuración de Java EE nos permite
administrar dos tipos de objetos:
• ConnectionFactory: crea conexiones con el proveedor de
JMS.
• Queue connection factory: para Point-To-Point.
• Topic connection factory: para Publish/Subscribe.
• Connection factory: para ambas.
• Destination: representa emisores y destinatarios.
• Queue: para Point-To-Point.
• Topic: para Publish/Subscribe.
Administración: GlassFish
• Crear un dominio (una o más instancias del servidor de apliaciones) que tiene
asociados estos puertos TCP entre otros:
•
•
•
•
Un
Un
Un
Un
puerto
puerto
puerto
puerto
para el servidor de aplicaciones (8080 por omisión).
de administración (4848 por omisión).
de JMS (7070 por omisión).
de IIOP para RMI/CORBA (3700 por omisión).
asadmin create-domain --adminport 5000
asadmin list-domains
mydomain
• Arrancar el dominio y la base de datos con la herramienta asadmin:
asadmin start-domain mydomain
asadmin start-database
• Arrancar un navegador: http://localhost:4848
Administración: GlassFish
• Configurar JMS Default Host con la dirección de la máquina que ejecuta el
broker de JMS (en este caso la máquina en la que ejecuta GlassFish).
Configurations → JavaMessageService → default JMS host
• Crear los objetos administrados de JMS (JMS Resources):
Resources → JMSResources → ConnectionFactories
Resources → JMSResources → DestinationResources
• Nombre (JNDI).
• Tipo de recurso.
Administración: GlassFish
Administración: GlassFish
Administración: GlassFish
JNDI
• Los recursos se registran en un espacio de nombres de JNDI
(Java Naming and Directory Interface).
• JNDI es una interfaz homogenea para distintos sistemas de
nombrado (LDAP, CORBA, NDS, etc.).
• En JMS se usa para localizar los objetos administrados.
• Por omisión se conecta a localhost:3700. Si el servidor
JNDI está en otra parte, hay que definirlo en las propiedades.
• Para resolver los nombres:
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;
QueueConnectionFactory qFactory =
( QueueConnectionFactory ) j n d i . lookup ( ” Factoria1 ” ) ;
// Lookup q ue u e
Queue que ue = ( Queue ) j n d i . l o o k u p ( ” C o l a 1 ” ) ;
Modelo
• Conexión: objeto que representa la conexión con el proveedor
de JMS.
• Sesión: contexto para enviar y recibir mensajes desde un único
thread. La sesión crea mensajes, productores y consumidores
para su thread (no se deben usar desde otros threads).
Modelo
Imagen: (c) Oracle
Interfaces
Conexión Point-to-Point
• El método createQueueConnection de la factorı́a crea una
conexión con el proveedor de JMS.
• Dentro de una conexión podemos crear una o más sesiones
para enviar, recibir, etc.
• El método close() cierra la conexión y todo lo que depende
de ella (sesiones, etc.).
QueueConnection c o n n e c t i o n = f a c t o r y . createQueueConnection ( ) ;
QueueSession s e s s i o n =
c o n n e c t i o n . c r e a t e Q u e u e S e s s i o n ( f a l s e , Q u e u e S e s s i o n . AUTO ACKNOWLEDGE ) ;
Sesión Point-to-Point
• El primer argumento de createQueueSession inica si la
sesión es transaccional.
• Si no es transaccional, hay que indicar un modo de
asentimiento del mensaje (si es transaccional se ignora el
segundo argumento):
• AUTO ACKNOWLEDGE: se hace automáticamente.
• CLIENT ACKNOWLEDGE: el receptor debe invocar el método
acknowledge() manualmente. Si una sesión termina sin
confirmar la recepción de un mensaje, el mensaje vuelve a
estar disponible en la cola. La confirmación de un mensaje
confirma todos los mensajes anteriores recibidos en la sesión.
Sesión
La sesión nos permite crear objetos de tipo:
• QueueReceiver para recibir mensajes. Aunque es posible
tener dos sesiones distintas con QueueReceivers para la
misma cola, el estándar de JMS no define como se reparten
los mensajes. Sólo un consumidor recibe el mensaje.
• QueueSender para enviar mensajes.
• QueueBrowser para inspeccionar mensajes en la cola sin
sacarlos.
• TemporaryQueue para crear una cola temporal que solo
sobrevive a la conexión en la que se crea.
Sesión Transaccional
• La transacción es entre un extremo y el proveedor de JMS, no
entre los extremos.
• Semántica: los mensajes se ponen/quitan de la colas todos o
ninguno.
• Forman parte de la transacción todos los mensajes enviados y
recibidos en la sesión entre dos invocaciones de
commit()/rollback(), potencialmente de distintas colas.
• Transaction Commit: todos los mensajes producidos están
enviados y todos los mensajes consumidos entán asentidos.
• Transaction Rollback: todos los mensajes producidos se han
destruido y todos los mensajes consumidos se han devuelto.
Sesión Transaccional
Envı́o:
• Después de enviar, los mensajes no están disponibles en la cola
hasta que no se invoque el método commit() de la sesión.
• Si se invoca rollback() en su lugar, los mensajes de la
transacción no llegan nunca a estar disponibles en la cola.
Sesión Transaccional
Recepción:
• Los mensajes recibidos en la transacción no se eliminan de la
cola mientras se van recibiendo; se eliminan de la cola cuando
el receptor invoca commit(), después de recibirlos.
• Si se invoca rollback() en su lugar, todos los mensajes de la
transacción vuelven a estar disponibles en la cola.
Cierre
• Cuando ya no se van a utilizar, hay que cerrar los
receptores/emisores, sesiones y conexiones, llamando a
close().
• El cierre de una sesión cierra los receptores/emisores que la
pertenecen.
• El cierre de una conexión cierra las sesiones correspondientes.
Ejemplo Point-To-Point: Sender
// e x c e p t i o n s a r e i g n o r e d i n t h i s e x a m p l e
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;
QueueConnectionFactory f a c t o r y =
( QueueConnectionFactory ) j n d i . lookup ( ” Factoria1 ” ) ;
Queue q ue ue = ( Queue ) j n d i . l o o k u p ( ” C o l a 1 ” ) ;
QueueConnection c o n n e c t i o n = f a c t o r y . createQueueConnection ( ) ;
QueueSession s e s s i o n =
c o n n e c t i o n . c r e a t e Q u e u e S e s s i o n ( f a l s e , Q u e u e S e s s i o n . AUTO ACKNOWLEDGE ) ;
Qu eue Sender s e n d e r = s e s s i o n . c r e a t e S e n d e r ( queue ) ;
f o r ( i n t i = 0 ; i < 1 0 ; i ++){
T e x t M e s s a g e msg = s e s s i o n . c r e a t e T e x t M e s s a g e ( ) ;
msg . s e t T e x t ( ” Message ” + i + ” t o C o l a 1 ! ” ) ;
s e n d e r . s e n d ( msg ) ;
System . e r r . p r i n t ( ” . ” ) ;
Thread . s l e e p ( 1 0 0 0 ) ;
}
connection . close ( ) ;
// c l o s e s t h e c o n n e c t i o n , t h e s e s s i o n and t h e r e c e i v e r
Ejemplo Point-To-Point: Synchronous Receiver
// e x c e p t i o n s a r e i g n o r e d i n t h i s e x a m p l e
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;
QueueConnectionFactory f a c t o r y =
( QueueConnectionFactory ) j n d i . lookup ( ” Factoria1 ” ) ;
Queue q ue ue = ( Queue ) j n d i . l o o k u p ( ” C o l a 1 ” ) ;
QueueConnection c o n n e c t i o n = f a c t o r y . createQueueConnection ( ) ;
QueueSession s e s s i o n =
c o n n e c t i o n . c r e a t e Q u e u e S e s s i o n ( f a l s e , Q u e u e S e s s i o n . AUTO ACKNOWLEDGE ) ;
Q u e u e R e c e i v e r r e c e i v e r = s e s s i o n . c r e a t e R e c e i v e r ( queue ) ;
connection . s t ar t ( ) ;
System . e r r . p r i n t l n ( ” L i s t e n i n g . . . ” ) ;
for ( ; ; ) {
Message msg = r e c e i v e r . r e c e i v e ( ) ;
i f ( msg == n u l l ){
System . o u t . p r i n t l n ( ” no more m e s s a g e s ! ” ) ;
break ;
}
i f ( msg i n s t a n c e o f T e x t M e s s a g e ){
T e x t M e s s a g e m = ( T e x t M e s s a g e ) msg ;
System . o u t . p r i n t l n ( ” Message r e c e i v e d : ” + m. g e t T e x t ( ) ) ;
}
}
connection . close ( ) ;
// c l o s e s t h e c o n n e c t i o n , t h e s e s s i o n and t h e r e c e i v e r
Ejemplo Point-To-Point: Asynchronous Receiver
// e x c e p t i o n s and c l o s e ( ) c a l l s a r e i g n o r e d i n t h i s e x a m p l e
p r i v a t e c l a s s A s y n c R e c e i v e r i m p l e m e n t s Runnable , M e s s a g e L i s t e n e r {
p r i v a t e QueueConnection c o n n e c t i o n ;
p r i v a t e Queue queue ;
p u b l i c A s y n c R e c e i v e r ( Q u e u e C o n n e c t i o n con , Queue queue ){
t h i s . c o n n e c t i o n = con ;
t h i s . queue = queue ;
}
@Override
p u b l i c void run (){
try {
QueueSession s e s s i o n =
c o n n e c t i o n . c r e a t e Q u e u e S e s s i o n ( f a l s e , Q u e u e S e s s i o n . AUTO ACKNOWLEDGE ) ;
Q u e u e R e c e i v e r r e c e i v e r = s e s s i o n . c r e a t e R e c e i v e r ( queue ) ;
receiver . setMessageListener ( this );
System . o u t . p r i n t l n ( Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” l i s t e n i n g ! ” ) ;
//
The t h r e a d w i l l be d o i n g i t s j o b
doJob ( ) ;
} c a t c h ( E x c e p t i o n e ){ . . . } f i n a l l y { . . . }
}
@Override
p u b l i c v o i d onMessage ( Message msg ) {
try {
T e x t M e s s a g e m = ( T e x t M e s s a g e ) msg ;
System . o u t . p r i n t l n ( ” L i s t e n e r , Thread ” +
Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” m e s s a g e r e c e i v e d : ” + m. g e t T e x t ( ) ) ;
} c a t c h ( JMSException e ) { . . . } f i n a l l y { . . . }
}
}
Publish/Subscribe
• La conexión y la sesión se crea de forma similar, pero usando
las interfaces de Topic en lugar de las de Queue.
• El emisor se crea con el método createPublisher() de la
sesión. Recibe como parámetro el Topic al que se quiere
enviar.
• Para recepción sı́ncrona, debemos instanciar un objeto
MessageConsumer invocando createConsumer().
• Para recepción ası́ncrona, debemos instanciar un objeto
TopicSubscriber invocando createSubscriber() e
instalar un manejador: un objeto que implemente la interfaz
MessageListener.
Ejemplo Pub/Sub: Sender
try {
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;
TopicConnectionFactory factory =
( TopicConnectionFactory ) j n d i . lookup (” Factoria2 ” ) ;
Topic t o p i c = ( Topic ) j n d i . lookup ( ” Topic1 ” ) ;
TopicConnection connection = f a c t o r y . createTopicConnection ( ) ;
TopicSession session =
c o n n e c t i o n . c r e a t e T o p i c S e s s i o n ( f a l s e , T o p i c S e s s i o n . AUTO ACKNOWLEDGE ) ;
TopicPublisher publisher = session . createPublisher ( topic );
f o r ( i n t i = 0 ; i < 1 0 ; i ++){
T e x t M e s s a g e msg = s e s s i o n . c r e a t e T e x t M e s s a g e ( ) ;
msg . s e t T e x t ( ” Message ” + i + ” t o C o l a 1 ! ” ) ;
p u b l i s h e r . p u b l i s h ( msg ) ;
System . e r r . p r i n t ( ” . ” ) ;
Thread . s l e e p ( 1 0 0 0 ) ;
}
connection . close ( ) ;
} c a t c h ( E x c e p t i o n e ){
e . printStackTrace ( ) ;
}
Ejemplo Pub/Sub: Synchronous Receiver
try {
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;
TopicConnectionFactory factory =
( TopicConnectionFactory ) j n d i . lookup (” Factoria2 ” ) ;
Topic t o p i c = ( Topic ) j n d i . lookup ( ” Topic1 ” ) ;
TopicSession session =
c o n n e c t i o n . c r e a t e T o p i c S e s s i o n ( f a l s e , T o p i c S e s s i o n . AUTO ACKNOWLEDGE ) ;
MessageConsumer c o n s u m e r = s e s s i o n . c r e a t e C o n s u m e r ( t o p i c ) ;
System . o u t . p r i n t l n ( ” Thread ” + Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” l i s t e n i n g ! ” ) ;
for (;;){
for ( ; ; ) {
Message msg = c o n s u m e r . r e c e i v e ( ) ;
i f ( msg == n u l l ){
System . o u t . p r i n t l n ( ” no more m e s s a g e s ! ” ) ;
break ;
}
i f ( msg i n s t a n c e o f T e x t M e s s a g e ){
T e x t M e s s a g e m = ( T e x t M e s s a g e ) msg ;
System . o u t . p r i n t l n ( ” Consumer , Thread ” +
Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) +
” m e s s a g e r e c e i v e d : ” + m. g e t T e x t ( ) ) ;
}
}
}catch ( Exception e ) { . . . } f i n a l l y { . . . }
Ejemplo Pub/Sub: Asynchronous Receiver
// e x c e p t i o n s and c l o s e ( ) c a l l s a r e i g n o r e d i n t h i s e x a m p l e
p r i v a t e s t a t i c c l a s s A s y n c R e c e i v e r i m p l e m e n t s Runnable , M e s s a g e L i s t e n e r {
p ri v at e TopicConnection connection ;
p r i v a t e Topic t o p i c ;
p u b l i c A s y n c R e c e i v e r ( T o p i c C o n n e c t i o n con , T o p i c t o p i c ){
t h i s . c o n n e c t i o n = con ;
this . topic = topic ;
}
@Override
p u b l i c void run (){
try {
TopicSession session =
c o n n e c t i o n . c r e a t e T o p i c S e s s i o n ( f a l s e , T o p i c S e s s i o n . AUTO ACKNOWLEDGE ) ;
TopicSubscriber subscriber = session . createSubscriber ( topic );
subscriber . setMessageListener ( this );
System . o u t . p r i n t l n ( ” Thread ” + Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” s u b s c r i b e d ! ” ) ;
//
The t h r e a d w i l l be d o i n g i t s j o b
doJob ( ) ;
}catch ( Exception e ) { . . . } f i n a l l y { . . . }
}
@Override
p u b l i c v o i d onMessage ( Message msg ) {
try {
T e x t M e s s a g e m = ( T e x t M e s s a g e ) msg ;
System . o u t . p r i n t l n ( ” S u b s c r i b e r , Thread ” +
Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” m e s s a g e r e c e i v e d : ” + m. g e t T e x t ( ) ) ;
} c a t c h ( JMSException e ) { . . . } f i n a l l y { . . . }
}
}
Ejecución
En la máquina en la que ejecuta GlassFish:
# $GLASSFISH i s t h e p a t h t o t h e G l a s s F i s h d i r e c t o r y :
j a v a −cp j m s e x a m p l e . j a r : $GLASSFISH/ g l a s s f i s h / l i b / g f − c l i e n t . j a r
org . l s u b . jmsexample . P2PReceiver
Ejecución
En otras máquinas:
• No hace falta ejecutar GlassFish en ellas.
• Es necesario tener todas las clases en el classpath. Podemos
instalar GlassFish o copiar todos los JAR necesarios a mano.
• Hay que indicar la máquina en la que ejecuta el proveedor de
JMS.
# $GLASSFISH i s t h e p a t h t o t h e G l a s s F i s h d i r e c t o r y :
j a v a −Dorg . omg . CORBA . O R B I n i t i a l H o s t=omac . l s u b . o r g
−cp j m s e x a m p l e . j a r : $GLASSFISH/ g l a s s f i s h / l i b / g f − c l i e n t . j a r
org . l s u b . jmsexample . P2PReceiver
asadmin
Otros comandos útiles:
• flush-jmsdest --desttype queue cola : drena la cola
indicada.
• flush-jmsdest --desttype topic topic : drena el topic.
• stop-domain domain : para la ejecución del dominio.
• delete-domain domain : borra el dominio.
• ...