nsq

2024-02-11 14:38
文章标签 nsq

本文主要是介绍nsq,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

做一下保存,copy过来

转载自:地址

按照[官网(http://nsq.io/overview/quick_start.html)执行的时候

nsqd --lookupd-tcp-address=127.0.0.1:4160

要指定 broadcast-address 是localhost,否则的话会是你默认的主机名,比如(admin),最后导致连接不上

nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1

测试代码

package mainimport ("fmt""time""github.com/nsqio/go-nsq"
)// ConsumerHandler 消费者处理者
type ConsumerHandler struct{}// HandleMessage 处理消息
func (*ConsumerHandler) HandleMessage(msg *nsq.Message) error {fmt.Println(string(msg.Body))return nil
}// Producer 生产者
func Producer() {producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())if err != nil {fmt.Println("NewProducer", err)panic(err)}i := 1for {if err := producer.Publish("test", []byte(fmt.Sprintf("Hello World %d", i))); err != nil {fmt.Println("Publish", err)panic(err)}time.Sleep(time.Second * 5)i++}
}// ConsumerA 消费者
func ConsumerA() {consumer, err := nsq.NewConsumer("test", "test-channel-a", nsq.NewConfig())if err != nil {fmt.Println("NewConsumer", err)panic(err)}consumer.AddHandler(&ConsumerHandler{})if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {fmt.Println("ConnectToNSQLookupd", err)panic(err)}
}// ConsumerB 消费者
func ConsumerB() {consumer, err := nsq.NewConsumer("test", "test-channel-b", nsq.NewConfig())if err != nil {fmt.Println("NewConsumer", err)panic(err)}consumer.AddHandler(&ConsumerHandler{})if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {fmt.Println("ConnectToNSQLookupd", err)panic(err)}
}func main() {ConsumerA()ConsumerB()Producer()
}

命令如下

#cmd1
nsqlookupd
#cmd2
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1

运行结果

banjakukutekiiMac:test panshiqu$ ./main 
2016/11/24 09:52:49 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:52:49 ERR    1 [test/test-channel-a] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
2016/11/24 09:52:49 INF    2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:52:49 ERR    2 [test/test-channel-b] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
2016/11/24 09:52:49 INF    3 (127.0.0.1:4150) connecting to nsqd
2016/11/24 09:53:57 INF    2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:53:57 INF    2 [test/test-channel-b] (127.0.0.1:4150) connecting to nsqd
Hello World 1
Hello World 2
Hello World 3
Hello World 4
Hello World 5
Hello World 6
Hello World 7
Hello World 8
Hello World 9
Hello World 10
Hello World 11
Hello World 12
Hello World 13
Hello World 14
Hello World 15
2016/11/24 09:54:01 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:54:01 INF    1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
Hello World 16
Hello World 16
Hello World 17
Hello World 17
Hello World 18
Hello World 18

对于输出我作如下理解,因为初次启动 nsq 相关程序,ConsumerA[test/test-channel-a] 查询 nsqlookupd 主题为 test,返回错误,主题不存在。ConsumerB[test/test-channel-b] 也执行上面的动作。这个时候应该不会创建两个 channel,test-channel-a 和 test-channel-b,也不会创建主题。接下来 Producer 成功连接 nsqd,这个时候会创建 test 主题。等待了一会后 ConsumerB 尝试查询主题成功,进而连接 nsqd,成功建立 test-channel-b,消费已被生产出的 15 条消息,因为 test-channel-a 还未被创建,所以目前已有的消息是不会被复制分发的。接着 ConsumerA 尝试查询主题成功,进而连接 nsqd,成功建立 test-channel-a,接下来的消息都是被复制分发的,两个消费者都能收到

两个 channel 都指定为 test-channel-a 将得到如下输出,可以确定的是多个消费者守在同一个 channel 中,同一条消息将只会被一个消费者处理

banjakukutekiiMac:test panshiqu$ go run main.go
2016/11/24 10:23:52 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 10:23:52 INF    1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
2016/11/24 10:23:52 INF    2 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 10:23:52 INF    2 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
2016/11/24 10:23:52 INF    3 (127.0.0.1:4150) connecting to nsqd
Hello World 1
Hello World 2
Hello World 3

这里写图片描述

参考:地址

这篇关于nsq的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/700044

相关文章

NSQ:分布式消息队列

目录 NSQ 基本概念NSQ 安装与运行安装运行 NSQ 使用示例生产者发布消息消费者接收消息 生产者与消费者NSQ 的管理与监控总结 NSQ 是一个高可用、分布式、实时的消息队列系统,广泛用于分布式应用之间的消息传递。NSQ 以其简单、高效、易于扩展的特性深受开发者的喜爱。本文将介绍 NSQ 的基本概念、安装配置以及如何使用。 NSQ 基本概念 在 NSQ 中,有几个重要

nsq(消息队列)

nsq(消息队列) 搭建服务 启动 nsqd 指定 -broadcast-address=127.0.0.1 来配置广播地址:nsqd -broadcast-address=127.0.0.1启动 nsqd,如果是在搭配 nsqlookupd 使用的模式下,需要首先指定 nsqlookupd 地址:1.启动 nsqlookupd 命令:nsqlookupd2.启动nsqd命令:n

消息队列(kafka/nsq等)与任务队列(celery/ytask等)到底有什么不同?

原文链接:https://www.ikaze.cn/article/43 写这篇博文的起因是,我在论坛宣传我开源的新项目YTask(go语言异步任务队列)时,有小伙伴在下面回了一句“为什么不用nsq?”。这使我想起,我在和同事介绍celery时同事说了一句“这不就是kafka吗?”。 那么YTask和nsq,celery和kafka?他们之间到底有什么不同呢?下面我结合自己的理解。简单

NSQ消息队列---总结篇

架构 概念 nsqlookup:存储了nsqd的元数据和服务信息(endpoind),向消费者提供服务发现功能, 向nsqadmin提供数据查询功能。 nsqd: 是接收、队列和传送消息到客户端的守护进程。 nsqadmin:简单的管理界面,展示了topic, channel以及channel上的消费者,也可以创建topic,channel。 消息可靠性 (1)生产者不保证

go实现NSQ消息队列的集群部署

1 安装 官方下载页面根据自己的平台下载并解压即可。 我安装的是windows版本的 2 NSQ的工作模式 每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。 topic和channel不是预先配置的。t

go操作nsq框架

http://www.coder55.com/article/4123 https://www.cnblogs.com/nickchen121/p/11517436.html#%E5%85%AB%E3%80%81go%E6%93%8D%E4%BD%9Cnsq

nsq源码阅读(二)nsqlookup-1

nsqlookupd 源码阅读(1) daemon的启动过程 1. Init 2. start func (p *program) Start() error {opts := nsqlookupd.NewOptions()flagSet := nsqlookupdFlagSet(opts)..............daemon := nsqlookupd.New(opts)daemon

nsq源码阅读(一)目录 结构

源码的目录结构 apps目录下生成相对应的exe文件,其他目录是一些相对应的工具文件。

nsq 的AddConcurrentHandlers函数注意事项

AddConcurrentHandlers 在使用这个函数的时候,可以指定n个线程来处理这个消息。 在handler函数内,如果不手动调用msg.FInsh函数,表现的结果是: 即使指定了多个协程来处理消息,若某一个hander堵塞了,其他的协程无法获取到消息。 所以在使用这个handler函数时,最好的使用方式是:进入handler函数后,就通知msg.finsh。这样handler的错

NSQ的golang客户端简单使用

NSQ的golang客户端简单使用 NSQ 是由国外的一个短链服务商bitly使用golang开发的一个消息队列系统,正好使用到了这个东西,在这里简单的记录下。 获取客户端 nsq的golang客户端是官方版本的 go get github.com/nsqio/go-nsq 即可 简单的消费者和生产者使用 该客户端有原始的command函数用于一些基础操作,也有consumer和p