原先 JBoss 自带的消息服务是 HornetQ
实现的,后来 HornetQ 合并到 Apache ActiveMQ
中了,因此新版的 WildFly 集成的默认消息服务实现就是 ActiveMQ 了。
本文将使用一个小示例从零开始演示如何在 WildFly 中使用 JMS. 本示例的大纲如下:
- 怎样在 WildFly 中启用 JMS
- 怎样在 Java 代码中使用 JMS
-
- 消息发送
- 消息同步接收
- 消息异步接收
- 消息驱动Bean
怎样在 WildFly 中启用 JMS
配置文件
要使用消息服务,我们需要完整的 JBoss 配置文件 standalone-full.xml
, 默认的 standalone.xml
并未提供 JMS. 因此建议将 standalone-full 中的全部内容复制到 standalone 中,并修改自己改过的部分(如数据源配置等)。然后正常启动 WildFly 即可。(或者你可以直接使用 standalone.bat -c standalone-full.xml
命令启动)
现在就启动服务器吧。
添加用户
要使用 JMS, 必须有一个用户角色是 guest
的 WildFly 用户。如果之前没有添加的话,那么需要先添加一个用户。
在 %WildFly_HOME%\bin\
目录下执行 add-user.bat
按提示添加用户
D:\Program Files (x86)\wildfly-10.1.0.Final\bin>add-user.bat What type of user do you wish to add? a) Management User (mgmt-users.properties) b) Application User (application-users.properties) (a): b Enter the details of the new user to add. Using realm 'ApplicationRealm' as discovered from the existing property files. Username : jmstest Password recommendations are listed below. To modify these restrictions edit the add-user.properties configuration file. - The password should be different from the username - The password should not be one of the following restricted values {root, admin, administrator} - The password should contain at least 8 characters, 1 alphabetic character(s), 1 digit(s), 1 non-alphanumeric symbol(s) Password : WFLYDM0098: The password should be different from the username Are you sure you want to use the password entered yes/no? yes Re-enter Password : What groups do you want this user to belong to? (Please enter a comma separated list, or leave blank for none)[ ]: guest About to add user 'jmstest' for realm 'ApplicationRealm' Is this correct yes/no? yes Added user 'jmstest' to file 'D:\Program Files (x86)\wildfly-10.1.0.Final\standalone\configuration\application-users.properties' Added user 'jmstest' to file 'D:\Program Files (x86)\wildfly-10.1.0.Final\domain\configuration\application-users.properties' Added user 'jmstest' with groups guest to file 'D:\Program Files (x86)\wildfly-10.1.0.Final\standalone\configuration\application-roles.properties' Added user 'jmstest' with groups guest to file 'D:\Program Files (x86)\wildfly-10.1.0.Final\domain\configuration\application-roles.properties' Is this new user going to be used for one AS process to connect to another AS process? e.g. for a slave host controller connecting to the master or for a Remoting connection for server to server EJB calls. yes/no? yes To represent the user add the following to the server-identities definition <secret value="am1zdGVzdA=="></secret> 请按任意键继续. . .
添加消息队列或消息话题。
进行本步骤之前请确保服务器已正常启动。
再打开一个命令行。进入命令行的管理界面:
$ cd /PATH/TO/wildfly-10.0.0.CR3-SNAPSHOT/bin $ ./jboss-cli.sh -c [standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/jms-queue=TestQ/:add(entries=["java:/jboss/exported/jms/queue/TestQ"]) {"outcome" => "success"} [standalone@localhost:9990 /] :reload { "outcome" => "success", "result" => undefined }
上述命令将会创建一个名为 TestQ 的消息队列。若要创建一个 Topic, 只需将 queue 换为 topic 即可。
/subsystem=messaging-activemq/server=default/jms-topic=TestTopic/:add(entries=["java:/jboss/exported/jms/queue/TestTopic"])
需要注意的是, entries 中内容开头必须为 java:/jboss/exported/xxx
这样在之后的 Java 中才能顺利找到 JNDI 名称。
该命令实际上是对配置的修改,你可以打开 standalone.xml 看看前后对比。
本部分参考:
Writing java based JMS Client for WildFly10 default ActiveMQ Artemis broker.
怎样在 Java 代码中使用 JMS
首先随便建个新项目我们演示一下。
(注意:需要%WildFly_HOME%\bin\client\jboss-client.jar
)
消息发送
package com.youthlin.demo.jmsdemo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import java.util.Properties; /** * Created by lin on 2016-09-17-017. * 发送方。无论是消息队列的Queue还是Pub/Sub模式的 Topic, * 编程方式均一致,Java 中不需要管细节,至于究竟是队列还是主题,那是服务器的事, * 因为我们客户端只根据 JNDI 名称编码,不对消息的处理方式负责。 * <p> * 一般步骤: * 1. 获得一个 JBoss 上下文引用 * 2. 创建连接工厂 * 3. 使用连接工厂创建连接 * 4. 使用连接创建会话 * 5. 通过 JNDI 查找 Queue 或 Topic 作为目的地 * 6. 使用会话和目的地创建消息生产者 * 7. 使用会话创建消息 * 8. 发送消息 */ public class JMSSender { /*发送消息,返回发送成功发送的条数,代码来源:课本*/ private static int sendTextMessage(String... message) { int count = 0; InitialContext context = null; Connection connection = null; try { context = JMSUtil.getInitialContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup(JMSUtil.JMS_CONNECTION_FACTORY_JNDI); connection = factory.createConnection(JMSUtil.JMS_USERNAME, JMSUtil.JMS_PASSWORD); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = (Destination) context.lookup(JMSUtil.JMS_TOPIC_JNDI); //6. 使用会话和目的地创建消息生产者 MessageProducer producer = session.createProducer(destination); //连接开始 connection.start(); TextMessage textMessage; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.SIMPLIFIED_CHINESE); for (String msg : message) { //7. 使用会话创建消息 textMessage = session.createTextMessage(msg); textMessage.setStringProperty("time", sdf.format(new Date())); //8. 发送消息 producer.send(textMessage); count++; } System.out.println("已发送" + count + "条消息"); } catch (NamingException | JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } if (context != null) { try { context.close(); } catch (NamingException e) { e.printStackTrace(); } } } return count; } public static void main(String[] args) { String[] msgs = new String[5]; for (int i = 0; i < 5; i++) { msgs[i] = "TestMessage" + i; } JMSSender.sendTextMessage(msgs); JMSSender.sendTextMessage("quit"); } }
其中,用到的 JMSUtil 类如下:
package com.youthlin.demo.jmsdemo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; /** * Created by lin on 2016-09-17-017. * 相同的代码提取出来放在工具类中 */ public class JMSUtil { final static String JMS_CONNECTION_FACTORY_JNDI = "jms/RemoteConnectionFactory"; final static String JMS_TOPIC_JNDI = "jms/topic/MyTopic";//新建的消息队列/Topic JNDI 发布名称 final static String JMS_USERNAME = "jmsuser";//添加的用户名,角色必须是"guest"/ApplicationRealm final static String JMS_PASSWORD = "jmsuser@123";//添加的用户的密码 private final static String WILDFLY_REMOTING_URL = "http-remoting://localhost:8080";//注意前缀格式和端口,不是4447哦 static InitialContext getInitialContext() throws NamingException { InitialContext context = null; try { Properties props = new Properties(); props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory"); props.put(Context.PROVIDER_URL, WILDFLY_REMOTING_URL);// NOTICE: "http-remoting" and port "8080" props.put(Context.SECURITY_PRINCIPAL, JMS_USERNAME); props.put(Context.SECURITY_CREDENTIALS, JMS_PASSWORD); //props.put("jboss.naming.client.ejb.context", true); context = new InitialContext(props); System.out.println("\n\tGot initial Context: " + context); } catch (Exception e) { e.printStackTrace(); } return context; } }
消息同步接收
package com.youthlin.demo.jmsdemo; import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * Created by lin on 2016-09-17-017. * 消息同步接收 * <p> * 一般步骤: * 0. 确保 JBoss 服务器开启,因为我们需要分局服务器发布的 JNDI 名称获取相关对象 * 1. 获得一个 JBoss 上下文引用 * 2. 创建连接工厂 * 3. 使用连接工厂创建连接 * 4. 使用连接创建会话 * 5. 通过 JNDI 查找 Queue 或 Topic 作为目的地 * 6. 使用会话和目的地创建消息消费者 * 7. 接收消息 * 8. 处理消息 */ public class JMSSyncReceiver { public static void main(String[] args) throws NamingException, JMSException, InterruptedException { InitialContext context = JMSUtil.getInitialContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup(JMSUtil.JMS_CONNECTION_FACTORY_JNDI); Connection connection = factory.createConnection(JMSUtil.JMS_USERNAME, JMSUtil.JMS_PASSWORD); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = (Destination) context.lookup(JMSUtil.JMS_TOPIC_JNDI); //6. 创建消费者 MessageConsumer consumer = session.createConsumer(destination); //连接开始 connection.start(); TextMessage msg = null; //CountDownLatch latch = new CountDownLatch(1);//同步计数器 while (msg == null) { System.out.println("等待接收消息"); //7. 等待接收消息 // 参数时超时毫秒,为0或不带参数则表示一直阻塞在这里。超时无消息将返回null msg = (TextMessage) consumer.receive(5000); //latch.await(1, TimeUnit.SECONDS);//等待(相当于休眠)特定时间 if (msg != null) { //8. 处理消息 System.out.println("接收到的消息内容:" + msg.getText() + ",Time:" + msg.getStringProperty("time")); if (!msg.getText().equals("quit")) { msg = null;//继续等待 } } } connection.close(); context.close(); } }
消息异步接收
package com.youthlin.demo.jmsdemo; import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * Created by lin on 2016-09-20-020. * 消息异步接收 */ public class JMSAsyncReceiver { static boolean quit = false; public static void main(String[] args) throws NamingException, JMSException, InterruptedException { InitialContext context = JMSUtil.getInitialContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup(JMSUtil.JMS_CONNECTION_FACTORY_JNDI); Connection connection = factory.createConnection(JMSUtil.JMS_USERNAME, JMSUtil.JMS_PASSWORD); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = (Destination) context.lookup(JMSUtil.JMS_TOPIC_JNDI); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() {//设置监听器 @Override public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println("接收到的消息内容:" + msg.getText() + ",Time:" + msg.getStringProperty("time")); if (msg.getText().equals("quit")) { quit = true; } } catch (JMSException e) { e.printStackTrace(); } } }); connection.start(); CountDownLatch latch = new CountDownLatch(1); while (!quit) { System.out.println("等待接收消息"); latch.await(5, TimeUnit.SECONDS); } connection.close(); context.close(); } }
运行效果
消息驱动Bean
如果要在容器中使用的话,那么会更简单,只需使用注解修饰 消息驱动 Bean 即可,不需要像在 Java SE 中那样手动获取上下文获取连接之类的。
不过处理 Queue 和 Topic 的方式有点略微不同。
若是 Queue, 只需使用注解:
@MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/queue/TestQ"), })
若是 Topic, 还需要指明是否为持久化的 Topic, 以及 ClientId(需要持久化时,用于标识客户端,这样消息服务器才能知道这个消息你有没有消费过)
@MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/topic/MyTopic"), @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable"), @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "myTopic"), @ActivationConfigProperty(propertyName = "clientID", propertyValue = "consumer0"), })
本文代码可在 GitHub 上找到。
MDB 需要部署到容器中,因此这里没有特意单独做一个可部署的工程示例,不过有例子可在 GitHub 上找到。
声明
- 本作品采用署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。除非特别注明, 霖博客文章均为原创。
- 转载请保留本文(《在 Wildfly 10.x 中使用自带 ActiveMQ 提供的 Java 消息服务(JMS)》)链接地址: https://youthlin.com/?p=1302
- 订阅本站:https://youthlin.com/feed/