当前位置: 首页 >服务端 > ActiveMQ的介绍及使用实例.

ActiveMQ的介绍及使用实例.

今天就来说下 这个项目中使用ActiveMQ的情况, MQ: message queue, 顾名思义就是消息队列的意思. 


一: 使用场景: 

消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有这深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧。在使用队列后,用户的请求发给队列后立即返回(当然不能直接给用户提示订单提交成功,京东上提示:您“您提交了订单,请等待系统确认”),再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。

那么在babasport这个项目中, 我们可以在上架的时候使用消息队列的模式:
我们之前在点击一款商品上架的时候, 我们需要分成2步, 第一: 更新商品表中该商品的上架状态. 第二: 将该商品信息保存到Solr服务器中.  那么如果我们使用了消息队列后, 第二步就可以使用发送message来异步完成.

消息队列可以接收消息和 发送消息

消息队列类型:

队列:一对一聊天  私聊  QQ

主题(订阅模式):一对多聊天  群聊  QQ

名词解释: 
ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记

 

 二, 代码原型
ActiveMQ需要部署到Linux系统下, 这里就不再做概述.
这里也是tar包, 导入到linux下直接解压启动即可, 前面已经有过很多博文讲Linux下一些常用软件的安装步骤.
ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记

上架代码原型:
项目构件图:
ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
未使用ActiveMQ前ProductServiceImpl.cs:

 1 //上架 2 public void isShow(Long[] ids){ 3 Product product = new Product(); 4 product.setIsShow(true); 5 for (final Long id : ids) { 6 //上下架状态 7 product.setId(id); 8 productDao.updateByPrimaryKeySelective(product); 9 10 //这个地方的代码应该在babasport-solr中写, 现在使用ActiveMQ进行迁移.11 //TODO 保存商品信息到Solr服务器12 SolrInputDocument doc = new SolrInputDocument();13 //ID14 doc.setField("id", id);15 //名称16 Product p = productDao.selectByPrimaryKey(id);17 doc.setField("name_ik", p.getName());18 //图片URL19 doc.setField("url", p.getImgUrls()[0]);20 //品牌 ID21 doc.setField("brandId", p.getBrandId());22 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 123 SkuQuery skuQuery = new SkuQuery();24 skuQuery.createCriteria().andProductIdEqualTo(id);25 skuQuery.setOrderByClause("price asc");26 skuQuery.setPageNo(1);27 skuQuery.setPageSize(1);28 List<Sku> skus = skuDao.selectByExample(skuQuery);29 doc.setField("price", skus.get(0).getPrice());30 //...时间等 剩下的省略31 32 try {33 solrServer.add(doc);34 solrServer.commit();35 } catch (Exception e) {36 // TODO Auto-generated catch block37 e.printStackTrace();38 }39 40 41 42 43 //TODO 静态化44 }45 }

上面的代码 除了更改本来就该更改的商品状态信息外, 还去见商品信息保存到了Solr服务器中了. 这里我们使用ActiveMQ进行改造: 
使用ActiveMQ后的ProductServiceImpl.cs:

 1 //上架 2 public void isShow(Long[] ids){ 3 Product product = new Product(); 4 product.setIsShow(true); 5 for (final Long id : ids) { 6 //上下架状态 7 product.setId(id); 8 productDao.updateByPrimaryKeySelective(product); 9 10 //发送商品ID到ActiveMQ即可.11 jmsTemplate.send(new MessageCreator() {12 13 @Override14 public Message createMessage(Session session) throws JMSException {15 16 retu session.createTextMessage(String.valueOf(id));17 }18 });19 20 //TODO 静态化21 }22 }

接着就是配置消息发送方(JMS生产者) mq.xml:

 1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop"  5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans  9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd18 http://www.springframework.org/schema/task19 http://www.springframework.org/schema/task/spring-task-4.0.xsd20 http://code.alibabatech.com/schema/dubbo21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">22 23 24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->25 <!-- 连接工厂, 此工厂由Apache提供 -->26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">27 <!-- 连接地址 28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是6161629 -->30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>31 <!-- 设置用户名及密码 -->32 <property name="userName" value="admin"></property>33 <property name="password" value="admin"></property>34 </bean>35 36 <!-- 配置连接池管理工厂 -->37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">38 <!-- 注入工厂 -->39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>40 <!-- 设置最大连接数 -->41 <property name="maxConnections" value="5"></property>42 </bean>43 44 <!-- 把上面的工厂交给Spring管理  -->45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">46 <!-- 注入上面的工厂 -->47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>48 </bean>49 50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">52 <!-- 注入Spring单例工厂 -->53 <property name="connectionFactory" ref="singleConnectionFactory"></property>54 <!-- 设置默认的目标管道 -->55 <property name="defaultDestinationName" value="pId"/>56 </bean>57 </beans>

配置说明: 这里是首先构建一个MQ的连接工厂, 只要ActiveMQ启动后 就可以这样构建连接了. 配置登录的用户名和和密码.
接着就是配置连接池, 把连接工厂交给连接池去管理. 这些都是Apache厂商提供的. 
接着就是再将连接池交由Spring管理. 
最后我们再来配置一个jmsTemplate模板来操作ActiveMQ, 这个类似于jdbcTemplate模板. 而且我们这个里面注入了一个默认的管道, 也就是productId, 因为我们现在是 传递消息一一去对应, 关于怎么对应  就是依赖于这个管道.


接下来我们就看下消息的接收方(JMS消费者)的一些东西:
消费者的目录结构:(Solr)
ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记

Solr项目中的ActiveMQ配置文件mq.xml:

 1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop"  5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans  9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd18 http://www.springframework.org/schema/task19 http://www.springframework.org/schema/task/spring-task-4.0.xsd20 http://code.alibabatech.com/schema/dubbo21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">22 23 24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->25 <!-- 连接工厂, 此工厂由Apache提供 -->26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">27 <!-- 连接地址 28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是6161629 -->30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>31 <!-- 设置用户名及密码 -->32 <property name="userName" value="admin"></property>33 <property name="password" value="admin"></property>34 </bean>35 36 <!-- 配置连接池管理工厂 ,由Apache提供.-->37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">38 <!-- 注入工厂 -->39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>40 <!-- 设置最大连接数 -->41 <property name="maxConnections" value="5"></property>42 </bean>43 44 <!-- 把上面的工厂交给Spring管理  -->45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">46 <!-- 注入上面的工厂 -->47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>48 </bean>49 50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">52 <!-- 注入Spring单例工厂 -->53 <property name="connectionFactory" ref="singleConnectionFactory"></property>54 <!-- 设置默认的目标管道 -->55 <property name="defaultDestinationName" value="pId"/>56 </bean>57 58 <!-- 实例化一个监听到消息后 处理此消息的类 -->59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>60 61 <!-- 配置实时监听器 -->62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">63 <!-- 配置工厂, 需要配置spig的工厂 -->64 <property name="connectionFactory" ref="singleConnectionFactory"/>65 <!-- 设置监听的目标 -->66 <property name="destinationName" value="pId"/>67 <!-- 监听后获取消息的类, 接收监听到的消息 -->68 <property name="messageListener" ref="customMessageListener"></property>69 </bean>70 </beans>

我们来说下 和上面配置不同的地方, 我们在这里配置了一个监听器, 因为接收到 JMS 生产者发过来的消息后我们需要有个监听器去监听且 将监听到的消息拿过来处理.
接下来看看监听器的处理方法做了些什么事情: 
CustomMessageListener.java:

 1 /* 2  * 接收MQ中的消息 3  */ 4 public class CustomMessageListener implements MessageListener{ 5 @Autowired 6 private SearchService searchService; 7  8 @Override 9 public void onMessage(Message message) {10 //先将接收到的消息强转为ActiveMQ类型的消息11 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage12 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;13 try {14 String id = amtm.getText();15 System.out.println("接收到的ID:"+id);16 searchService.insertProductToSolr(Long.parseLong(id));17 } catch (JMSException e) {18 // TODO Auto-generated catch block19 e.printStackTrace();20 }21 }

因为我们接收到的是string类型的文本, 所以这里我们直接将接收到的消息转换为ActiveMQText类型, 然后通过getText去得到传递过来的id, 然后我们就可以通过这个productId去做相应的操作了. 

接下来就看保存商品信息到Solr服务器的逻辑:
SearchServiceImpl.java:

 1 //保存商品信息到Solr服务器中, 通过ActiveMQ 2 public void insertProductToSolr(Long productId){ 3 //TODO 保存商品信息到Solr服务器 4 SolrInputDocument doc = new SolrInputDocument(); 5 //ID 6 doc.setField("id", productId); 7 //名称 8 Product p = productDao.selectByPrimaryKey(productId); 9 doc.setField("name_ik", p.getName());10 //图片URL11 doc.setField("url", p.getImgUrls()[0]);12 //品牌 ID13 doc.setField("brandId", p.getBrandId());14 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 115 SkuQuery skuQuery = new SkuQuery();16 skuQuery.createCriteria().andProductIdEqualTo(productId);17 skuQuery.setOrderByClause("price asc");18 skuQuery.setPageNo(1);19 skuQuery.setPageSize(1);20 List<Sku> skus = skuDao.selectByExample(skuQuery);21 doc.setField("price", skus.get(0).getPrice());22 //...时间等 剩下的省略23 24 try {25 solrServer.add(doc);26 solrServer.commit();27 } catch (Exception e) {28 // TODO Auto-generated catch block29 e.printStackTrace();30 }31 }

这样就比较明朗了, ActiveMQ 队列就是这样来实现的. 

====================接下来还会有 ActiveMQ 订阅者模式的示例, 这里只是生产者发送消息给单个消费者, 下次还会更新生产者发送消息给多个消费者.

 2016/09/04 20:32 更新
上面已经说了 消息的队列模式, 及点对点发送消息, 那么接下来就来说下 消息的一对多模式, 也就是 发布/订阅模式.
项目原型: 当商品上架后(babasport-product), 发送消息id给solr(babasport-solr)来将商品信息保存到solr服务器和cms(babasport-cms)来对商品详情页面做页面静态化.

===================
babasport-product:
结构图:
ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
babasport-product下的项目结构图:
ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记

ProductServiceImpl.java中的上架:

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 @Autowired 2 private JmsTemplate jmsTemplate; 3  4 //上架 5 public void isShow(Long[] ids){ 6 Product product = new Product(); 7 product.setIsShow(true); 8 for (final Long id : ids) { 9 //上下架状态10 product.setId(id);11 productDao.updateByPrimaryKeySelective(product);12 13 //发送商品ID到ActiveMQ即可.14 jmsTemplate.send(new MessageCreator() {15 16 @Override17 public Message createMessage(Session session) throws JMSException {18 19 retu session.createTextMessage(String.valueOf(id));20 }21 });22 }23 }
View Code

mq.xml:

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop"  5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans  9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd18 http://www.springframework.org/schema/task19 http://www.springframework.org/schema/task/spring-task-4.0.xsd20 http://code.alibabatech.com/schema/dubbo21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">22 23 24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->25 <!-- 连接工厂, 此工厂由Apache提供 -->26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">27 <!-- 连接地址 28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是6161629 -->30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>31 <!-- 设置用户名及密码 -->32 <property name="userName" value="admin"></property>33 <property name="password" value="admin"></property>34 </bean>35 36 <!-- 配置连接池管理工厂 -->37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">38 <!-- 注入工厂 -->39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>40 <!-- 设置最大连接数 -->41 <property name="maxConnections" value="5"></property>42 </bean>43 44 <!-- 把上面的工厂交给Spring管理  -->45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">46 <!-- 注入上面的工厂 -->47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>48 </bean>49 50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">52 <!-- 注入Spring单例工厂 -->53 <property name="connectionFactory" ref="singleConnectionFactory"></property>54 <!-- 设置默认的目标管道 -->55 <property name="defaultDestinationName" value="pId"/>56 <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->57 <property name="pubSubDomain" value="true"/>58 </bean>59 </beans>
View Code

这里面的最大的变化就是将消息发布模式改为了: publish subject.

============================================
babasport-solr:
ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
mq.xml配置文件:

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop"  5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans  9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd18 http://www.springframework.org/schema/task19 http://www.springframework.org/schema/task/spring-task-4.0.xsd20 http://code.alibabatech.com/schema/dubbo21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">22 23 24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->25 <!-- 连接工厂, 此工厂由Apache提供 -->26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">27 <!-- 连接地址 28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是6161629 -->30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>31 <!-- 设置用户名及密码 -->32 <property name="userName" value="admin"></property>33 <property name="password" value="admin"></property>34 </bean>35 36 <!-- 配置连接池管理工厂 ,由Apache提供.-->37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">38 <!-- 注入工厂 -->39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>40 <!-- 设置最大连接数 -->41 <property name="maxConnections" value="5"></property>42 </bean>43 44 <!-- 把上面的工厂交给Spring管理  -->45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">46 <!-- 注入上面的工厂 -->47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>48 </bean>49 50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">52 <!-- 注入Spring单例工厂 -->53 <property name="connectionFactory" ref="singleConnectionFactory"></property>54 <!-- 设置默认的目标管道 -->55 <property name="defaultDestinationName" value="pId"/>56 </bean>57 58 <!-- 实例化一个监听到消息后 处理此消息的类 -->59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>60 61 <!-- 配置实时监听器 -->62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">63 <!-- 配置工厂, 需要配置spig的工厂 -->64 <property name="connectionFactory" ref="singleConnectionFactory"/>65 <!-- 设置监听的目标 -->66 <property name="destinationName" value="pId"/>67 <!-- 监听后获取消息的类, 接收监听到的消息 -->68 <property name="messageListener" ref="customMessageListener"></property>69 <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->70 <property name="pubSubDomain" value="true"/>71 </bean>72 </beans>
View Code

SearchServiceImpl.java: 保存商品信息到Solr服务器中, 通过ActiveMQ

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 //保存商品信息到Solr服务器中, 通过ActiveMQ 2 public void insertProductToSolr(Long productId){ 3 //TODO 保存商品信息到Solr服务器 4 SolrInputDocument doc = new SolrInputDocument(); 5 //ID 6 doc.setField("id", productId); 7 //名称 8 Product p = productDao.selectByPrimaryKey(productId); 9 doc.setField("name_ik", p.getName());10 //图片URL11 doc.setField("url", p.getImgUrls()[0]);12 //品牌 ID13 doc.setField("brandId", p.getBrandId());14 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 115 SkuQuery skuQuery = new SkuQuery();16 skuQuery.createCriteria().andProductIdEqualTo(productId);17 skuQuery.setOrderByClause("price asc");18 skuQuery.setPageNo(1);19 skuQuery.setPageSize(1);20 List<Sku> skus = skuDao.selectByExample(skuQuery);21 doc.setField("price", skus.get(0).getPrice());22 //...时间等 剩下的省略23 24 try {25 solrServer.add(doc);26 solrServer.commit();27 } catch (Exception e) {28 // TODO Auto-generated catch block29 e.printStackTrace();30 }31 }
View Code

CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 public class CustomMessageListener implements MessageListener{ 2 @Autowired 3 private SearchService searchService; 4  5 @Override 6 public void onMessage(Message message) { 7 //先将接收到的消息强转为ActiveMQ类型的消息 8 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage 9 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;10 try {11 String id = amtm.getText();12 System.out.println("接收到的ID:"+id);13 searchService.insertProductToSolr(Long.parseLong(id));14 } catch (JMSException e) {15 // TODO Auto-generated catch block16 e.printStackTrace();17 }18 }19 }
View Code


===============================
babasport-cms:
mq.xml:

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ--> 2 <!-- 连接工厂, 此工厂由Apache提供 --> 3 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <!-- 连接地址  5 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616 6 --> 7 <property name="brokerURL" value="tcp://192.168.200.128:61616"/> 8 <!-- 设置用户名及密码 --> 9 <property name="userName" value="admin"></property>10 <property name="password" value="admin"></property>11 </bean>12 13 <!-- 配置连接池管理工厂 ,由Apache提供.-->14 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">15 <!-- 注入工厂 -->16 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>17 <!-- 设置最大连接数 -->18 <property name="maxConnections" value="5"></property>19 </bean>20 21 <!-- 把上面的工厂交给Spring管理  -->22 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">23 <!-- 注入上面的工厂 -->24 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>25 </bean>26 27 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->28 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">29 <!-- 注入Spring单例工厂 -->30 <property name="connectionFactory" ref="singleConnectionFactory"></property>31 <!-- 设置默认的目标管道 -->32 <property name="defaultDestinationName" value="pId"/>33 </bean>34 35 <!-- 实例化一个监听到消息后 处理此消息的类 -->36 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>37 38 <!-- 配置实时监听器 -->39 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">40 <!-- 配置工厂, 需要配置spig的工厂 -->41 <property name="connectionFactory" ref="singleConnectionFactory"/>42 <!-- 设置监听的目标 -->43 <property name="destinationName" value="pId"/>44 <!-- 监听后获取消息的类, 接收监听到的消息 -->45 <property name="messageListener" ref="customMessageListener"></property>46 <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->47 <property name="pubSubDomain" value="true"/>48 </bean>
View Code

CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 public class CustomMessageListener implements MessageListener{ 2 @Autowired 3 private StaticPageService staticPageService; 4 @Autowired 5 private CMSService cmsService; 6  7 @Override 8 public void onMessage(Message message) { 9 //先将接收到的消息强转为ActiveMQ类型的消息10 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage11 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;12 try {13 String id = amtm.getText();14 System.out.println("CMS接收到的ID:"+id);15 Map<String, Object> root = new HashMap<String, Object>();16 17 Product product = cmsService.selectProductById(Long.parseLong(id));18 List<Sku> skus = cmsService.selectSkuListByProductIdWithStock(Long.parseLong(id));19 //去掉重复的颜色20 Set<Color> colors = new HashSet<Color>();21 for (Sku sku : skus) {22 colors.add(sku.getColor());23 }24 root.put("colors", colors);25 root.put("product", product);26 root.put("skus", skus);27 28 staticPageService.index(root, id);29 } catch (JMSException e) {30 // TODO Auto-generated catch block31 e.printStackTrace();32 }33 }34 }
View Code

StaticPageServiceImpl.java: 静态化页面的核心类:

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 public class StaticPageServiceImpl implements StaticPageService, ServletContextAware{ 2 //SpringMvc 管理 conf 3 private Configuration conf; 4 public void setFreeMarkerConfig(FreeMarkerConfig freeMarkerConfig) { 5 this.conf = freeMarkerConfig.getConfiguration(); 6 } 7  8 //静态化页面的方法 9 public void index(Map<String, Object> root, String id){10 //输出目录: 通过getPath方法获取的是绝对路径11 String path = getPath("/html/product/" + id +".html");12 File f = new File(path);13 File parentFile = f.getParentFile();14 if(!parentFile.exists()){15 parentFile.mkdirs();16 }17 18 //spring中已经设置了模板路径:<property name="templateLoaderPath" value="/WEB-INF/ftl/" />19 Writer out = null;20 21 try {22 //23 Template template = conf.getTemplate("product.html");24 25 //设置输出的位置26 //27 out = new OutputStreamWriter(new FileOutputStream(f), "UTF-8");28 template.process(root, out);29 } catch (Exception e) {30 // TODO Auto-generated catch block31 e.printStackTrace();32 }finally {33 if (out != null)34 {35 try {36 out.close();37 } catch (IOException e) {38 // TODO Auto-generated catch block39 e.printStackTrace();40 }41 }42 43 }44 45 }46 47 //获取webapp下的html文件夹所在的位置48 //将相对路径转换为绝对路径49 public String getPath(String path){50 retu servletContext.getRealPath(path);51 }52 53 private ServletContext servletContext;54 @Override55 public void setServletContext(ServletContext servletContext) {56 this.servletContext = servletContext;57 }58 }
View Code

Spring管理 freemarkerConfig配置类:

ActiveMQ的介绍及使用实例. _ JavaClub全栈架构师技术笔记
 1 <!-- 配置freemarker 实现类 --> 2 <bean class="cn.itcast.core.service.StaticPageServiceImpl"> 3 <property name="freeMarkerConfig"> 4 <bean class="org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer"> 5 <!-- 设置模板所在目录或文件夹的位置, 相对路径  --> 6 <property name="templateLoaderPath" value="/WEB-INF/ftl/" /> 7 <!-- 设置默认编码集 --> 8 <property name="defaultEncoding" value="UTF-8"></property> 9 </bean>10 </property>11 </bean>
View Code

更多关于freemarker的讲解请关注我以后的博客...
关于ActiveMQ的内容就更新到这么多.



作者:一枝花算不算浪漫
来源链接:https://www.cnblogs.com/wang-meng/p/5831538.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/server/112095.html

标签:ActiveMQ
分享给朋友:

“ActiveMQ的介绍及使用实例.” 的相关文章