ActiveMQ的静态网络链接
--------------------------------------------------------------------
(1)ActiveMQ的networkConnector是什么
在某些情况下,需要多个ActiveMQ的Broker做集群,那么就涉及到Broker到Broker的通信,这个就称为ActiveMQ的networkConnector.
ActiveMQ的networkConnector默认是单向的,一个Broker在一端发送消息,另一个Broker在另一端接收消息,这就是所谓的"桥接"。ActiveMQ也支持双向链接,创建一个双向的通道对于两个Broker不仅发送消息而且也能从相同的通道接收消息,通常作为duplex connector来映射,如下:
(2)有两种配置Client到Broker的链接方式
第一种: Client通过Staticlly配置的方式去连接Broker(静态链接)
第二种: Client通过discover agent来dynamically的发现Brokers(动态链接)
Static networks:
Static networkConnector是用于创建一个静态的配置对于网络中的多个Broker,这种协议用于复合url,一个复合url包括多个url地址,格式如下:
static:(uri1,uri2,uri3, ...)?key=value
1 <networkConnectors>2 <networkConnector name="local network" uri="static://(tcp://ip:prot,tcp://ip:port)"/>3 </networkConnectors>
在activemq.xml配置如下:
下面启动两个Broker实例,一个端口:61616,另一个端口:61716,然后通过程序往61616端口的Broker发送数据,再从61716端口的Broker接收数据
JmsSend程序如下:
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Destination; 4 import javax.jms.MessageProducer; 5 import javax.jms.Session; 6 import javax.jms.TextMessage; 7 8 import org.apache.activemq.ActiveMQConnectionFactory; 9 10 public class JmsSend {11 public static void main(String[] args) throws Exception {12 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");13 Connection connection = connectionFactory.createConnection();14 connection.start();15 16 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);17 Destination destination = session.createQueue("my-queue");18 19 MessageProducer producer = session.createProducer(destination);20 for(int i = 0;i < 10;i++){21 TextMessage message = session.createTextMessage("message,1212 --->" + i);22 Thread.sleep(1000);23 //通过生产者发出消息24 producer.send(message);25 }26 session.commit();27 session.close();28 connection.close();29 }30 }
JmsReceiver程序如下:
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Destination; 4 import javax.jms.MessageConsumer; 5 import javax.jms.Session; 6 import javax.jms.TextMessage; 7 8 import org.apache.activemq.ActiveMQConnectionFactory; 9 10 public class JmsReceiver {11 public static void main(String[] args) throws Exception {12 ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.81:61716");13 Connection connection = cf.createConnection();14 connection.start();15 16 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);17 Destination destination = session.createQueue("my-queue");18 MessageConsumer consumer = session.createConsumer(destination);19 int i = 0;20 while(i < 10){21 i++;22 TextMessage message = (TextMessage)consumer.receive();23 session.commit();24 System.out.println("接收到的消息是:"+message.getText());25 }26 session.close();27 connection.close();28 }29 }
运行结果:
(3)networkConnector配置的可用属性
1.name: 默认的bridge
2.dynamicOnly: 默认是false,如果为true,持久订阅被激活时才创建对应的网络持久订阅。默认是启动时激活
3.decreaseNetworkConsumerPriority: 默认是false。设定消费者优先权,如果为true,网络的消费者优先级降低为-5。如果为false,则默认跟本地消费者一样为0
4.networkTTL: 默认是1,网络中用于消息和订阅消费的broker数量
5.messageTTL: 默认是1,网络中用于消息的broker数量
6.consumerTTL: 默认是1,网络中用于消费的broker数量
7.conduitSubscriptions: 默认true,是否把同一个broker的多个consumer当做一个来处理(在做集群的时候如果有多个consumer,需要设置为false)
8.dynamicallyIncludedDestinations:默认为空,要包括的动态消息地址,类适于excludedDestinations,如:
1 <dynamicallyIncludedDestinations>2 <queue physicalName="include.test.foo"/>3 <topic physicalName="include.test.bar"/>4 </dynamicallyIncludedDestinations>
9.staticallyIncludedDestinations:默认为空,要包括的静态消息地址。类似于excludedDestinations,如:
1 <staticallyIncludedDestinations>2 <queue physicalName="always.include.queue"/>3 </staticallyIncludedDestinations>
10.excludedDestinations: 默认为空,指定排除的地址,示例如下:
1 <networkConnectors> 2 <networkConnector uri="static://(tcp://localhost:61617)" name="bridge" dynamicOnly="false" conduitSubscriptions="true" decreaseNetworkConsumerPriority="false"> 5 <excludedDestinations> 6 <queue physicalName="exclude.test.foo"> 7 <topic physicalName="exclude.test.bar"> 8 </excludedDestinations> 9 <dynamicallyIncludedDestinations>10 <queue physicalName="include.test.foo"/>11 <topic physicalName="include.test.bar"/>12 </dynamicallyIncludedDestinations>13 <staticallyIncludedDestinations>14 <queue physicalName="always.include.queue"/>15 </staticallyIncludedDestinations>16 </networkConnector>17 </networkConnectors>
12. prefetchSize: 默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息
13. suppressDuplicateQueueSubscriptions: 默认false,如果为true,重复的订阅关系一产生即被阻止
14. bridgeTempDestinations: 默认true,是否广播advisory messages来创建临时的destination
15. alwaysSyncSend: 默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker
16. staticBridge: 默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理
作者:火爆泡菜
来源链接:https://www.cnblogs.com/xinhuaxuan/p/6139512.html
版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。
2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。