java - JMS consume multiple topics -
i new java , working on project consumes multiple (different) topics , sends server. wondering best way handling multiple topics is.
from understand each consumer tied 1 topic, if had consume multiple topics need 1 consumer each different topic. since consumer makes blocking call need invoke thread per consumer consume these topics in parallel.
and if wanted improve throughput further practice have 1 boss thread per consumer (attached topic) , allow each boss thread setup worker threads improve performance accordingly?
please advice if practice , if not other alternative options? , there known design patterns handle problem
why chose consumer model instead of listener model?
i have 1 more constraint is, after consumer receives message needs send message receiving server. if receiving server down (during new version push) have pause consuming messages until receiving server up. in case having message listener wouldn't because wouldn't able pause listener when receiving server down. right when or there way pause listener , stop consuming messages until receiving server up?
the way go use listener feature.
your object implements messagelistener
interface , add consumer message listener. in case client library handle threading in reading messages queue , despatching them listeners.
import javax.jms.connection; import javax.jms.destination; import javax.jms.message; import javax.jms.messageconsumer; import javax.jms.messagelistener; import javax.jms.session; import org.apache.activemq.activemqconnectionfactory; public class mymessageconsumer implements messagelistener { public static void main() { try { mymessageconsumer mymessageconsumer = new mymessageconsumer(); // example using activemq client library activemqconnectionfactory connectionfactory = new activemqconnectionfactory("nio://localhost:61616"); connection connection = connectionfactory.createconnection(); connection.start(); session session = connection.createsession(false, session.auto_acknowledge); destination destination1 = session.createqueue("mytopic1"); messageconsumer consumer1 = session.createconsumer(destination1); consumer1.setmessagelistener(mymessageconsumer); destination destination2 = session.createqueue("mytopic2"); messageconsumer consumer2 = session.createconsumer(destination2); consumer2.setmessagelistener(mymessageconsumer); } catch (exception e) { system.out.println("caught: " + e); e.printstacktrace(); } } @override public void onmessage(message message) { // handle messages here } }
session transactions
in option, use transacted messages , deliver message if session.rollback() called. acknowledge() when operation successful or rollback() when not.
package io.bessel.test;
import javax.jms.connection; import javax.jms.destination; import javax.jms.jmsexception; import javax.jms.message; import javax.jms.messageconsumer; import javax.jms.messagelistener; import javax.jms.session; import javax.jms.textmessage; import org.apache.activemq.activemqconnectionfactory; public class mymessageconsumer implements messagelistener { public static void main(string ... arguments) { try { // example using activemq client library activemqconnectionfactory connectionfactory = new activemqconnectionfactory("nio://localhost:61616"); connection connection = connectionfactory.createconnection(); connection.start(); session session = connection.createsession(true, session.session_transacted); mymessageconsumer mymessageconsumer = new mymessageconsumer(session); destination destination1 = session.createqueue("mytopic1"); messageconsumer consumer1 = session.createconsumer(destination1); consumer1.setmessagelistener(mymessageconsumer); destination destination2 = session.createqueue("mytopic2"); messageconsumer consumer2 = session.createconsumer(destination2); consumer2.setmessagelistener(mymessageconsumer); } catch (exception e) { system.out.println("caught: " + e); e.printstacktrace(); } } private final session session; public mymessageconsumer(session session) { this.session = session; } public void onmessage(message message) { if (message instanceof textmessage) { try { string text = ((textmessage) message).gettext(); system.out.println(string.format("received message: %s", text)); this.session.rollback(); } catch (jmsexception e) { e.printstacktrace(); } } } }
Comments
Post a Comment