本文主要是介绍NSQ的golang客户端简单使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
NSQ的golang客户端简单使用
NSQ 是由国外的一个短链服务商bitly使用golang开发的一个消息队列系统,正好使用到了这个东西,在这里简单的记录下。
获取客户端
nsq的golang客户端是官方版本的
go get github.com/nsqio/go-nsq
即可
简单的消费者和生产者使用
该客户端有原始的command函数用于一些基础操作,也有consumer和producer的封装,我这里是直接使用了封装了。
- consumer
消费者比较简单,只要监听队列消息,并处理就可以了,下面是一个简单的例子。
type NSQHandler struct {
}func (this *NSQHandler) HandleMessage(message *nsq.Message) error {log.Println("recv:", string(message.Body))return nil
}func testNSQ() {waiter := sync.WaitGroup{}waiter.Add(1)go func() {defer waiter.Done()consumer, err := nsq.NewConsumer("test", "ch1", nsq.NewConfig())if nil != err {log.Println(err)return}consumer.AddHandler(&NSQHandler{})err = consumer.ConnectToNSQD("10.100.156.207:4150")if nil != err {log.Println(err)return}select {}}()waiter.Wait()
}
创建好consumer后,只需要自己创建一个struct并实现HandleMessage方法即可,当有消息时候,再去处理消息。
需要注意的是,AddHandler的回调是在别的routine中执行的,并且可以添加多个handler用于处理消息,这里可能需要注意下线程的同步问题。
- producer
生产者也和消费者差不多,首先需要创建一个producer
func (this *MsgQueue) Init(addr string) error {var err errorthis.addr = addr// try to connectcfg := nsq.NewConfig()this.producer, err = nsq.NewProducer(addr, cfg)if nil != err {return err}// try to pingerr = this.producer.Ping()if nil != err {this.producer.Stop()this.producer = nilreturn err}return nil
}
producer封装了较多的方法,分为同步和异步两种。带Async后缀的,都是异步的。
同步是收到了nsq的回应后再返回的函数,所以可能会堵塞,而异步的操作,则调用方需要传入一个chan用于接收结果,当有结果返回或者是超时的情况下,相应的内容会写到该chan中。
在这里我用了同步的api,毕竟消息队列假如出了什么问题,那么整个服务就不可用了,而且同步改异步也不会太麻烦,以后可以做下修改。
publish的方法也很简单,提供一个topic和数据就行了。
这篇关于NSQ的golang客户端简单使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!