当前位置: 首页 >服务端 > Docker部属Nsq集群

Docker部属Nsq集群

  用一了段时间NSQ还是很稳定的。除了稳定,还有一个特别值的说的就是部署非常简单。总想写点什么推荐给大家使用nsq来做一些东西。但是就是因为他太简单易用,文档也比较简单易懂。一直不知道要写啥!!!!!

  nsq官网: http://nsq.io/

      还有我写的: 剖析nsq消息队列目录

  Docker部属Nsq集群 _ JavaClub全栈架构师技术笔记

  为了容灾需要对nsqd多机器部属,有了Docker后,快速扩还是很方便的。

  部署完后我会用go和c#写一些代码方便大家学习。

 

  准备工作:

  》两台服务器:192.168.0.49; 192.168.0.105.

  》需要在两台机器上安装好Docker

  》两台机器上镜像的拉取 

docker pull nsqio/nsq

  我们在105上启动lookup, nsqd和客户端都需要连接这个lookup。  

docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd

Docker部属Nsq集群 _ JavaClub全栈架构师技术笔记

  

  在105和49上启动nsqd, lookup的地址要写105

docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=192.168.0.105 --lookupd-tcp-address=192.168.0.105:4160
docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=192.168.0.49 --lookupd-tcp-address=192.168.0.105:4160

 

 Docker部属Nsq集群 _ JavaClub全栈架构师技术笔记

 

  到了这一步就可以写代码发送和接收信息了。但是还有一个管理系统需要启动一下。nsqadmin 

docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=192.168.0.105:4161

Docker部属Nsq集群 _ JavaClub全栈架构师技术笔记

 

   用浏览器看一下管理端:http://192.168.0.105:4171/nodes。找开Nodes标签里面有两个节点。192.168.0.105  和 192.168.0.49。其他的你可以点开看看。

Docker部属Nsq集群 _ JavaClub全栈架构师技术笔记

  我用go语言 简单写一个发送信息的例子:

  go使用的库是 go-nsq 地址  : github.com/nsqio/go-nsq

  

func main() {config := nsq.NewConfig()// 随便给哪个ip发都可以//w1, _ := nsq.NewProducer("192.168.0.105:4150", config)w1, _ := nsq.NewProducer("192.168.0.49:4150", config)err1 := w1.Ping()if err1 != nil {log.Fatal("should not be able to ping after Stop()")retu}defer w1.Stop()topicName := "publishtest"msgCount := 2for i := 1; i < msgCount; i++ {err1 := w1.Publish(topicName, []byte("测试测试publis test case"))if err1 != nil {log.Fatal("error")}}}

  可以尝试给49和105都发送一次试试。再看一下我们的管理页面:

  publishtest被ip105和49都发送过。但是还没有channel:

Docker部属Nsq集群 _ JavaClub全栈架构师技术笔记

 

 客户端golang代码

package mainimport ("fmt""github.com/nsqio/go-nsq""log""os""os/signal""strconv""time""sync")func main() {topicName := "publishtest"msgCount := 2for i := 0; i < msgCount; i++ {//time.Sleep(time.Millisecond * 20)go readMessage(topicName, i)}//cleanup := make(chan os.Signal, 1)cleanup := make(chan os.Signal)signal.Notify(cleanup, os.Interrupt)fmt.Println("server is running....")quit := make(chan bool)go func() {select {case <- cleanup:fmt.Println("Received an interrupt , stoping service ...")for _, ele := range consumers {ele.StopChan <- 1ele.Stop()}quit <- true}}()<-quitfmt.Println("Shutdown server....")}type ConsumerHandle struct {q*nsq.ConsumermsgGood int}var consumers []*nsq.Consumer = make([]*nsq.Consumer, 0)var mux *sync.Mutex = &sync.Mutex{}func (h *ConsumerHandle) HandleMessage(message *nsq.Message) error {msg := string(message.Body) + "  " + strconv.Itoa(h.msgGood)fmt.Println(msg)retu nil}func readMessage(topicName string, msgCount int) {defer func() {if err := recover(); err != nil {fmt.Println("error: ", err)}}()config := nsq.NewConfig()config.MaxInFlight = 1000config.MaxBackoffDuration = 500 * time.Second//q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount), config)//q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount) + "#ephemeral", config)q, _ := nsq.NewConsumer(topicName, "ch"+strconv.Itoa(msgCount), config)h := &ConsumerHandle{q: q, msgGood: msgCount}q.AddHandler(h)err := q.ConnectToNSQLookupd("192.168.0.105:4161")//err := q.ConnectToNSQDs([]string{"192.168.0.105:4161"})//err := q.ConnectToNSQD("192.168.0.49:4150")//err := q.ConnectToNSQD("192.168.0.105:4415")if err != nil {fmt.Println("conect nsqd error")log.Println(err)}mux.Lock()consumers = append(consumers, q)mux.Unlock()<-q.StopChanfmt.Println("end....")}

 

 

 

  运行一下,会启动两个终端:

  用我们的发送代码发送信息,再看我们的客户端

  Docker部属Nsq集群 _ JavaClub全栈架构师技术笔记

 

  c# 使用的库为NsqSharp.Core 地址为:

  https://github.com/tonyredondo/NsqSharp

Docker部属Nsq集群 _ JavaClub全栈架构师技术笔记

 

  简单客户端代码为:

 

class Program{static void Main(){// Create a new Consumer for each topic/channelvar consumerCount = 2;var listC = new  List<Consumer>();for (var i = 0; i < consumerCount; i++){var consumer = new Consumer("publishtest", $"channel{i}" );consumer.ChangeMaxInFlight(2500);consumer.AddHandler(new MessageHandler());consumer.ConnectToNsqLookupd("192.168.0.105:4161");listC.Add(consumer);}var exitEvent = new ManualResetEvent(false);Console.CancelKeyPress += (sender, eventArgs) => {eventArgs.Cancel = true;listC.ForEach(x => x.Stop());exitEvent.Set();};exitEvent.WaitOne();}}public class MessageHandler : IHandler{/// <summary>Handles a message.</summary>public void HandleMessage(IMessage message){string msg = Encoding.UTF8.GetString(message.Body);Console.WriteLine(msg);}/// <summary>/// Called when a message has exceeded the specified <see cref="Config.MaxAttempts"/>./// </summary>/// <param name="message">The failed message.</param>public void LogFailedMessage(IMessage message){// Log failed messages}}

 

作者:li-peng
来源链接:https://www.cnblogs.com/li-peng/p/7729174.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/server/112422.html

标签:Docker
分享给朋友:

“Docker部属Nsq集群” 的相关文章