可嵌入式JMS消息队列FFMQ

2019-07-12 19:08发布

 经过几个开源项目比较,最终发现合适的项目FFMQ:http://sourceforge.net/projects/ffmq/,项目大小才600KB,支持JMS1.1规范
以下代码仅用于供测试参考,不具备生产环境下的严谨,具体FFMQ配置请看说明文档(下载包中有) 启动: import java.io.FileInputStream; import java.util.Properties; import net.timewalker.ffmq3.listeners.ClientListener; import net.timewalker.ffmq3.listeners.tcp.io.TcpListener; import net.timewalker.ffmq3.local.FFMQEngine; import net.timewalker.ffmq3.management.destination.definition.QueueDefinition; import net.timewalker.ffmq3.management.destination.definition.TopicDefinition; import net.timewalker.ffmq3.utils.Settings; /** * Embedded FFMQ sample */ public class EmbeddedFFMQSample implements Runnable { private FFMQEngine engine; public void run() { try { // Create engine settings Settings settings = createEngineSettings(); // Create the engine itself engine = new FFMQEngine("myLocalEngineName", settings); // -> myLocalEngineName will be the engine name. // - It should be unique in a given JVM // - This is the name to be used by local clients to establish // an internal JVM connection (high performance) // Use the following URL for clients : vm://myLocalEngineName // // Deploy the engine System.out.println("Deploying engine : "+engine.getName()); engine.deploy(); // - The FFMQ engine is not functional until deployed. // - The deploy operation re-activates all persistent queues // and recovers them if the engine was not properly closed. // (May take some time for large queues) // Adding a TCP based client listener System.out.println("Starting listener ..."); ClientListener tcpListener = new TcpListener(engine,"0.0.0.0",10002,settings,null); tcpListener.start(); // This is how you can programmatically define a new queue if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo1")) { QueueDefinition queueDef = new QueueDefinition(settings); queueDef.setName("foo2"); queueDef.setMaxNonPersistentMessages(0); queueDef.setOverflowToPersistent(false); queueDef.setPreAllocateFiles(true); queueDef.setTemporary(false); queueDef.setUseJournal(true); queueDef.setAutoExtendAmount(128); queueDef.setInitialBlockCount(32); queueDef.setMaxBlockCount(1024); queueDef.check(); engine.createQueue(queueDef); } // You could also define a queue using some java Properties if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo2")) { Properties queueProps = new Properties(); queueProps.put("name", "foo2"); queueProps.put("persistentStore.useJournal", "false"); queueProps.put("memoryStore.maxMessages", "1000"); QueueDefinition queueDef2 = new QueueDefinition(new Settings(queueProps)); engine.createQueue(queueDef2); } if(!engine.getDestinationDefinitionProvider().hasTopicDefinition("foox")) { TopicDefinition topicDef = new TopicDefinition(settings); topicDef.setName("foox"); topicDef.setMaxNonPersistentMessages(0); topicDef.setOverflowToPersistent(false); topicDef.setPreAllocateFiles(true); topicDef.setTemporary(false); topicDef.setUseJournal(true); topicDef.check(); engine.createTopic(topicDef); } // Run for some time System.out.println("Running ..."); Thread.sleep(60*1000); // Stopping the listener System.out.println("Stopping listener ..."); tcpListener.stop(); // Undeploy the engine System.out.println("Undeploying engine ..."); engine.undeploy(); // - It is important to properly shutdown the engine // before stopping the JVM to make sure current transactions // are nicely completed and storages properly closed. System.out.println("Done."); } catch (Exception e) { // Oops e.printStackTrace(); } } private Settings createEngineSettings() { // Various ways of creating engine settings // 1 - From a properties file Properties externalProperties = new Properties(); try { FileInputStream in = new FileInputStream("D:\ffmq3-distribution-3.0.5-dist\conf\ffmq-server.properties"); externalProperties.load(in); in.close(); } catch (Exception e) { throw new RuntimeException("Cannot load external properties",e); } Settings settings = new Settings(externalProperties); // 2 - Explicit Java code // Settings settings = new Settings(); // // settings.setStringProperty(FFMQCoreSettings.DESTINATION_DEFINITIONS_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.BRIDGE_DEFINITIONS_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.TEMPLATES_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.DEFAULT_DATA_DIR, "."); // ... return settings; } public static void main(String[] args) { System.setProperty("FFMQ_BASE", "D:\ffmq3-distribution-3.0.5-dist"); new EmbeddedFFMQSample().run(); } }
模拟发送: import java.util.Hashtable; import java.util.Random; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import net.timewalker.ffmq3.FFMQConstants; public class Sender implements Runnable { public static void main(String[] args) throws Exception { new Thread(new Sender("queue/foo1", "1")).start(); new Thread(new Sender("queue/foo2", "2")).start(); Thread.sleep(10000); run = false; Thread.sleep(1000); } private static volatile boolean run = true; private String queueName; private String qtmId; private Sender(String queueName, String qtmId) { super(); this.queueName = queueName; this.qtmId = qtmId; } @Override public void run() { try { // Create and initialize a JNDI context Hashtable env = new Hashtable<>(); env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY); env.put(Context.PROVIDER_URL, "tcp://localhost:10002"); Context context = new InitialContext(env); // Lookup a connection factory in the context ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME); // Obtain a JMS connection from the factory Connection conn = connFactory.createConnection("test","test"); conn.start(); Destination dest1=(Queue) context.lookup(queueName); Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE); Random rnd = new Random(System.currentTimeMillis()); long ms = (long)rnd.nextFloat() * 10 * 1000; if(ms > 8000) { ms /= 2; } else if(ms < 1000) { ms = 1500; } int i = 1; MessageProducer p = session.createProducer(dest1); while (run) { TextMessage msg = session.createTextMessage(); String t = "[" + qtmId + "] Hello " + queueName + " " + i++; System.out.println("sended..." + t); msg.setStringProperty("QTMID", qtmId); msg.setText(t); p.send(msg); Thread.sleep(ms); } p.close(); session.close(); conn.close(); context.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } 模拟接收: import java.util.Hashtable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import net.timewalker.ffmq3.FFMQConstants; public class Receiver implements Runnable { private static volatile boolean run = true; public static void main(String[] args) throws Exception { new Thread(new Receiver()).start(); Thread.sleep(10000); run = false; Thread.sleep(2000); } private Connection conn; private Session session; private MessageConsumer consumer; private void init() throws Exception { // Create and initialize a JNDI context Hashtable env = new Hashtable<>(); env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY); env.put(Context.PROVIDER_URL, "tcp://localhost:10002"); Context context = new InitialContext(env); // Lookup a connection factory in the context ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME); Destination dest1=(Queue) context.lookup("queue/foo2"); context.close(); // Obtain a JMS connection from the factory conn = connFactory.createConnection("test", "test"); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(dest1); System.err.println("INIT........."); } private void destory() { try { consumer.close(); session.close(); conn.stop(); conn.close(); System.err.println("EXIT........REC"); } catch (JMSException e) { e.printStackTrace(); } } public void run() { try { init(); // consumer.setMessageListener(new MessageListener() { // @Override // public void onMessage(Message m) { // try { // System.err.println("receive: " + ((TextMessage) m).getText()); // } catch (JMSException e) { // e.printStackTrace(); // } // } // }); while(run) { // Thread.sleep(500); Message m = consumer.receive(500); if(m != null) { System.err.println("receive: " + ((TextMessage) m).getText()); } } } catch (Exception e) { e.printStackTrace(); } finally { destory(); } } } 主题订阅: package topic; import java.util.Hashtable; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import net.timewalker.ffmq3.FFMQConstants; public class SubClient implements Runnable { private String topicName; private String qtmId; private TopicConnection conn; private TopicSession session; private TopicSubscriber subscriber; public static void main(String[] args) throws Exception { for(int i = 1; i < 5; i++) { new Thread(new SubClient("topic/foox", String.valueOf(i))).start(); } System.out.println(Thread.currentThread() + " EEXIT"); } private SubClient(String topicName, String qtmId) { super(); this.topicName = topicName; this.qtmId = qtmId; } private void init() throws Exception { // Create and initialize a JNDI context Hashtable env = new Hashtable<>(); env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY); env.put(Context.PROVIDER_URL, "tcp://localhost:10002"); Context context = new InitialContext(env); // Lookup a connection factory in the context TopicConnectionFactory connFactory = (TopicConnectionFactory)context.lookup(FFMQConstants.JNDI_TOPIC_CONNECTION_FACTORY_NAME); Topic topic = (Topic) context.lookup(topicName); context.close(); // Obtain a JMS connection from the factory conn = connFactory.createTopicConnection("test","test"); session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); String selector = "(QTMID = '" + qtmId + "')"; System.out.println("Selector: " + selector); subscriber = session.createSubscriber(topic, selector, false); System.err.println("INIT........."); } private void destory() { try { subscriber.close(); session.close(); conn.stop(); conn.close(); System.err.println(Thread.currentThread() + " Client EXIT........REC"); } catch (JMSException e) { e.printStackTrace(); } } @SuppressWarnings("static-access") @Override public void run() { try { init(); subscriber.setMessageListener(new MessageListener() { @Override public void onMessage(Message m) { try { System.err.println(Thread.currentThread() + " Client " + qtmId + " Subscriber receive: " + ((TextMessage) m).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); conn.start(); Thread.currentThread().sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { destory(); } } } 持久订阅: package topic; import java.util.Hashtable; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import net.timewalker.ffmq3.FFMQConstants; public class SubServer implements Runnable { private String topicName; /** * @param args */ public static void main(String[] args) throws Exception { new Thread(new SubServer("topic/foox")).start(); System.out.println(Thread.currentThread() + " main exit"); } private TopicConnection conn; private TopicSession session; private TopicSubscriber subscriber; private SubServer(String topicName) { super(); this.topicName = topicName; } private void init() throws Exception { // Create and initialize a JNDI context Hashtable env = new Hashtable<>(); env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY); env.put(Context.PROVIDER_URL, "tcp://localhost:10002"); Context context = new InitialContext(env); // Lookup a connection factory in the context TopicConnectionFactory connFactory = (TopicConnectionFactory)context.lookup(FFMQConstants.JNDI_TOPIC_CONNECTION_FACTORY_NAME); Topic topic = (Topic) context.lookup(topicName); context.close(); // Obtain a JMS connection from the factory conn = connFactory.createTopicConnection("test","test"); conn.setClientID("SERVER"); session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); subscriber = session.createDurableSubscriber(topic, "DB"); System.out.println("INIT........." + subscriber); } private void destory() { try { subscriber.close(); session.close(); conn.stop(); conn.close(); System.err.println(Thread.currentThread() + " EXIT........REC"); } catch (JMSException e) { e.printStackTrace(); } } @SuppressWarnings("static-access") @Override public void run() { try { init(); subscriber.setMessageListener(new MessageListener() { @Override public void onMessage(Message m) { try { System.err.println(Thread.currentThread() + " DurableSubscriber receive: " + ((TextMessage) m).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); conn.start(); Thread.currentThread().sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { destory(); } } }