ActiveMQ
ActiveMQ消息特性:延迟和定时消息投递AMQ_SCHEDULED_DELAYlong延迟投递的时间AMQ_SCHEDULED_PERIODlong重复投递的时间间隔AMQ_SCHEDULED_REPEATint重复投递次数AMQ_SCHEDULED_CRONStringCron表达式应用场景 最经典的就是当用户注册时,我们就需要用activeMQ来做为中间件,当用户注册后,我门把用户的邮箱号和验证码等信息通过activeMQ的生产端发送到activeMQ的消息队列中,而一旦消息队列中出现了数据,我们的邮件模块通过实时的监控activeMQ的消息队列就能通过消费端获取到这个数据,染回邮件模块就会自行的去对数据进行解析,给用户发送邮件 cmis应用场景:定时的向ActiveMQ队列获取消息,主系统添加用户,子系统监听队列同步 添加用户、修改、激活、禁言ActiveMQ 消息发送尽量都是异步,系统解耦用消息中间件是什么通过高效可靠的消息传递机制进行与平台无关的数据交流通过提供的消息传递和消息队列模型在分布式环境下提供系统解耦发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给接受者在这个过程中,发送和接受是异步的,也就是发送无需等待,而且发送者和接受者的声明周期也没有必然关系(主题)尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者特点:发送者个接受者不必了解对方,只需要确认消息 发送者和接受者不必同时在线能干嘛异步通信应用解耦流量削峰数据同步冗余存储如: 订单系统 ->发送消息 MQ 消费消息 -> 仓储系统怎么玩控制消息的消费顺序配置集群容错的MQ集群生产端,负责发送消息到交换机;消费端,监听队列,获取队列的消息,进行消费;整体架构:连接工厂 获得连接 通过连接得到session 通过session创建消息目的地队列/主题 再通过session创建消息生产者/消费者,再通过session创建消息以生产者发送至MQ、监听接收消息receive()/receive(4000):区别不见不散一直等/过时不候时间到直接停止消费者消息监听时,System.in.read()禁止停止服务消息头:常用如 1.jmsdestination投到哪队列还是主题、2.jmsdeliverymode持久或非持久 3.jmsexpiration过期时间、4.jmspriority消息优先级、0-4普通消息,5-9加急消息5.jmsmessageid消息唯一标识id消息体:1.TextMessage文本消息 常用3.MapMessage映射消息常用4.ByteMessage字节消息5.StreamMessage流消息2.ObjectMessage对象消息消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性识别/去重/重点标注等操作非常有用的方法:如:setStringProperty("","");JMS消息可靠性:(分为持久性、事务、签收Acknowledge) ActiveMQ消息持久化:KahaDB、JDBC、LevelDBmessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)当MQ服务宕机消息不存在messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)当MQ服务宕机消息依然存在queue 队列默认是持久化的topic 将connection.start()放置topic后边,为启动持久化的主题Subscribers发布订阅:先启动订阅再e启动生产事务偏生产者/签收偏消费者事务时需commit()异常时回滚,场景:避免消息重复消费事务消息和非事务消息参数1:事务,2.签收connection.createSession(Boolean.true,Session.AUTO_ACKNOWLEDGE)connection.createSession(Boolean.false,Session.AUTO_ACKNOWLEDGE)签收Session.AUTO_ACKNOWLEDGE自动签收Session.CLIENT_ACKNOWLEDGE手动签收,客户端调用message.acknowledge()方法手动签收ActiveMq-嵌入式Broker:其实就是用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可靠性用ActiveMQ Broker作为独立的消息服务器来构建Java应用,相当于是一个ActiveMQ服务器的实例ActiveMQ也支持在vm中通信基于嵌入式的broker,能够无缝的集成其他java应用引入jar:com.fasterxml.jackson.coreBrokerService bs = new BrokerService();bs.setUseJmx(true);bs.addConnector("tcp://localhost:61616");bs.start();ActiveMQ整合Spring队列生产者队列消费者主题生产消费监听配置ActiveMQ整合SpringBoot队列生产者队列生产者间隔定投队列消费者主题生产者主题消费者ActiveMQ的传输协议查看/修改conf/activemq.xml activemq的web窗口传输协议分为:tcp/openwire、amqp、stomp(关于流)、mqtt(物联网)、ws(webservice)、Nio、网络协议:tcp、nio、udp(比tcp好、但不具备可靠性)、ssl(安全协议)、https、VMssl(安全协议)TCP连接的url形式如:tcp://hostname:port?key=value&key=valu后面的参数是可选Nio连接的url形式如:nio://hostname:port?key=value生产环境一般为:NIO,Nio比tcp更偏重于底层访问操作Nio协议的使用场景:可能有大量的Client去连接到Broker上,一般情况下大量的client去连接Broker(tcp)是被操作系统的线程所限制的,因此,Nio的实现比tcp需要更少的线程去运行,所以建议使用nio协议。演示:<transportConnector name="nio" url="nio://0.0.0.0:61618?trace=true">如何使端口即支持NIO网络Io模型,又支持多协议?使用auto关键字,auto+nio使用"+"符号来为端口设置多种特性ActiveMQ的消息存储和持久化MQ的高可用:事务、持久、签收(自身)、(物理持久化将MQ文件复制到其他机器磁盘存储)持久化分为:JDBC(基于第三方数据库)、5.3AMQ(基于日志)、5.4KahaDB(可汗DB) 5.9LeveDB(基于文件的本地数据库存储)5.9Replicated LevelDB Store(带复制功能)kahadb:基于日志文件,kahadb存储原理:消息存储使用一个事务日期和仅仅用一个索引文件来存储它所有的地址。KahaDB是一个专门针对消息持久化的解决方案,他对典型的消息使用模式进行了优化数据被追加到data logs中,当不再需要log文件中的数据的时候,log文件会被丢弃四类文件一把锁:db-1.log(每满如32M,增加)、db.data(包含索引),db.free(当前db.data文件那些页面是空闲的,所有空闲ID),db.redo(进行消息恢复)、lock(文件锁、表示获得kahadb读写权限的broker)JDBC存储原理:1.MQ+Mysql2.MQ lib添加mysql驱动包3.activemq.xml配置 <persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysql-ds" 数据库名称createTablesOnStartup="true">是否在启动的时候创建数据表</persistenceAdapter>4.数据库连接池配置(数据源)5.建仓sql和建表建名为activemq库表为:ACTIVEMQ_MSGS(对列表) ACTIVEMQ_ACKS (主题表) ACTIVEMQ_LOCK (锁) 表自动生成,配置过如:createTablesOnStartup="true"6.代码运行验证一定要开启:messageProducer.setDeliveryMode(DeliveryMode.PERSISTENCE)队列:当DeliveryMode设置为NON_PERSISTENOE时,消息保存到内存中 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库 消息一旦被Consumer消费就从broker(数据库)中同步删除主题7.数据库情况queue:在运行消费看看表activemq_msgs数据变化topic:先启动消费者订阅再运行生产,activemq_acks8.小总结ACTIVEMQ_MSGS(对列表)ACTIVEMQ_ACKS (主题表)ACTIVEMQ_LOCK (锁) 在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,在可能成为下一个Master Broker,这个表用于记录那个Broker是当前的Master Broker9.常见坑1.数据库驱动jar包2.表自动生成,配置过如:createTablesOnStartup="true"3.下划线坑爹“Java.lang.IIlegalStateExcption:BeanFactory not initialized or already closed”这是因为你的操作系统的机器名中有“_”符号,更改机器名并重启解决JDBC高速缓存配置:ActiveMQ joual:克服JDBC Store的不足1.<joualPersistenceAdapterFactory joualLogFiles="4" joualLogFileSize="32768" useJoual="true" useQuickJoual="true" dataSource="#mysql-ds" dataDirectory="activemq-data"/>2.重启配置文件3.运行降低mysql压力,写入joual高速缓存ActiveMQ持久化总结MQ所在服务器宕机消息不会丢失ActiveMQ的多节点集群未学完zookeeper+replicated-leveldb-store的主从集群,避免单点故障ActiveMQ的高级特性:未学完异步投递延迟投递定时投递消费重试机制死信队列防止重复调用
作者:Bk小凯笔记
来源链接:https://www.cnblogs.com/Bkxk/p/11489845.html
版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。
2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。