Docker部属Nsq集群
用一了段时间NSQ还是很稳定的。除了稳定,还有一个特别值的说的就是部署非常简单。总想写点什么推荐给大家使用nsq来做一些东西。但是就是因为他太简单易用,文档也比较简单易懂。一直不知道要写啥!!!!!
nsq官网: http://nsq.io/
还有我写的: 剖析nsq消息队列目录
为了容灾需要对nsqd多机器部属,有了Docker后,快速扩还是很方便的。
部署完后我会用go和c#写一些代码方便大家学习。
准备工作:
》两台服务器:192.168.0.49; 192.168.0.105.
》需要在两台机器上安装好Docker
》两台机器上镜像的拉取
docker pull nsqio/nsq
我们在105上启动lookup, nsqd和客户端都需要连接这个lookup。
docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
在105和49上启动nsqd, lookup的地址要写105
docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=192.168.0.105 --lookupd-tcp-address=192.168.0.105:4160
docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=192.168.0.49 --lookupd-tcp-address=192.168.0.105:4160
到了这一步就可以写代码发送和接收信息了。但是还有一个管理系统需要启动一下。nsqadmin
docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=192.168.0.105:4161
用浏览器看一下管理端:http://192.168.0.105:4171/nodes。找开Nodes标签里面有两个节点。192.168.0.105 和 192.168.0.49。其他的你可以点开看看。
我用go语言 简单写一个发送信息的例子:
go使用的库是 go-nsq 地址 : github.com/nsqio/go-nsq
func main() {config := nsq.NewConfig()// 随便给哪个ip发都可以//w1, _ := nsq.NewProducer("192.168.0.105:4150", config)w1, _ := nsq.NewProducer("192.168.0.49:4150", config)err1 := w1.Ping()if err1 != nil {log.Fatal("should not be able to ping after Stop()")retu}defer w1.Stop()topicName := "publishtest"msgCount := 2for i := 1; i < msgCount; i++ {err1 := w1.Publish(topicName, []byte("测试测试publis test case"))if err1 != nil {log.Fatal("error")}}}
可以尝试给49和105都发送一次试试。再看一下我们的管理页面:
publishtest被ip105和49都发送过。但是还没有channel:
客户端golang代码
package mainimport ("fmt""github.com/nsqio/go-nsq""log""os""os/signal""strconv""time""sync")func main() {topicName := "publishtest"msgCount := 2for i := 0; i < msgCount; i++ {//time.Sleep(time.Millisecond * 20)go readMessage(topicName, i)}//cleanup := make(chan os.Signal, 1)cleanup := make(chan os.Signal)signal.Notify(cleanup, os.Interrupt)fmt.Println("server is running....")quit := make(chan bool)go func() {select {case <- cleanup:fmt.Println("Received an interrupt , stoping service ...")for _, ele := range consumers {ele.StopChan <- 1ele.Stop()}quit <- true}}()<-quitfmt.Println("Shutdown server....")}type ConsumerHandle struct {q*nsq.ConsumermsgGood int}var consumers []*nsq.Consumer = make([]*nsq.Consumer, 0)var mux *sync.Mutex = &sync.Mutex{}func (h *ConsumerHandle) HandleMessage(message *nsq.Message) error {msg := string(message.Body) + " " + strconv.Itoa(h.msgGood)fmt.Println(msg)retu nil}func readMessage(topicName string, msgCount int) {defer func() {if err := recover(); err != nil {fmt.Println("error: ", err)}}()config := nsq.NewConfig()config.MaxInFlight = 1000config.MaxBackoffDuration = 500 * time.Second//q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount), config)//q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount) + "#ephemeral", config)q, _ := nsq.NewConsumer(topicName, "ch"+strconv.Itoa(msgCount), config)h := &ConsumerHandle{q: q, msgGood: msgCount}q.AddHandler(h)err := q.ConnectToNSQLookupd("192.168.0.105:4161")//err := q.ConnectToNSQDs([]string{"192.168.0.105:4161"})//err := q.ConnectToNSQD("192.168.0.49:4150")//err := q.ConnectToNSQD("192.168.0.105:4415")if err != nil {fmt.Println("conect nsqd error")log.Println(err)}mux.Lock()consumers = append(consumers, q)mux.Unlock()<-q.StopChanfmt.Println("end....")}
运行一下,会启动两个终端:
用我们的发送代码发送信息,再看我们的客户端
c# 使用的库为NsqSharp.Core 地址为:
https://github.com/tonyredondo/NsqSharp
简单客户端代码为:
class Program{static void Main(){// Create a new Consumer for each topic/channelvar consumerCount = 2;var listC = new List<Consumer>();for (var i = 0; i < consumerCount; i++){var consumer = new Consumer("publishtest", $"channel{i}" );consumer.ChangeMaxInFlight(2500);consumer.AddHandler(new MessageHandler());consumer.ConnectToNsqLookupd("192.168.0.105:4161");listC.Add(consumer);}var exitEvent = new ManualResetEvent(false);Console.CancelKeyPress += (sender, eventArgs) => {eventArgs.Cancel = true;listC.ForEach(x => x.Stop());exitEvent.Set();};exitEvent.WaitOne();}}public class MessageHandler : IHandler{/// <summary>Handles a message.</summary>public void HandleMessage(IMessage message){string msg = Encoding.UTF8.GetString(message.Body);Console.WriteLine(msg);}/// <summary>/// Called when a message has exceeded the specified <see cref="Config.MaxAttempts"/>./// </summary>/// <param name="message">The failed message.</param>public void LogFailedMessage(IMessage message){// Log failed messages}}
作者:li-peng
来源链接:https://www.cnblogs.com/li-peng/p/7729174.html
版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。
2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。