nsq 是什么东西,这里就不长篇大论啦
我在 Mac 上尝试 nsq.io 中提供的 QUICK START 遇到问题,这里作简要说明
当执行到第 6 步时,nsq_to_file 报如下错误: error connecting to nsqd - dial tcp: i/o timeout 同时当你访问 http://127.0.0.1:4171/ 点击某些菜单的时候也会报错
对于第一次部署的我,这种问题犹如晴天霹雳,因为那可是官方提供的教程啊
还好这种问题经不起推敲,大意就是连不上 nsqd 呗,但是为什么会连不上呢?我在 nsqd 命令行参数里找到了答案 -broadcast-address="": address that will be registered with lookupd (defaults to the OS hostname) 在 Mac 上应该显式指定 nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 我想正如上面说的那样,默认是操作系统主机名,这样 nsqd 虽然能绑定成功,但是客户端就是无法识别
关于 nsq 我还发现一个小小细节 nsqd 提供的 HTTP API /pub publish a message to a topic(发布消息到话题) 但真正在实际操作的时候却是这样的,两种方式都可以?
curl -d "<message>" http://127.0.0.1:4151/put?topic=name curl -d "<message>" http://127.0.0.1:4151/pub?topic=name关于 nsq 入门我也仅有一天多的功力,分享不了太多有价值的东西,但是通过文档的阅读和实践,这里我作一下总结: 官方文档(其实就是上面的 QUICK START) 中文翻译(拜读部分翻译,表示感谢) go-nsq Golang客户端库(官方客户端开发库)
这里给出我用客户端开发库写的测试代码
package main import ( "fmt" "time" "github.com/nsqio/go-nsq" ) // ConsumerHandler 消费者处理者 type ConsumerHandler struct{} // HandleMessage 处理消息 func (*ConsumerHandler) HandleMessage(msg *nsq.Message) error { fmt.Println(string(msg.Body)) return nil } // Producer 生产者 func Producer() { producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) if err != nil { fmt.Println("NewProducer", err) panic(err) } i := 1 for { if err := producer.Publish("test", []byte(fmt.Sprintf("Hello World %d", i))); err != nil { fmt.Println("Publish", err) panic(err) } time.Sleep(time.Second * 5) i++ } } // ConsumerA 消费者 func ConsumerA() { consumer, err := nsq.NewConsumer("test", "test-channel-a", nsq.NewConfig()) if err != nil { fmt.Println("NewConsumer", err) panic(err) } consumer.AddHandler(&ConsumerHandler{}) if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil { fmt.Println("ConnectToNSQLookupd", err) panic(err) } } // ConsumerB 消费者 func ConsumerB() { consumer, err := nsq.NewConsumer("test", "test-channel-b", nsq.NewConfig()) if err != nil { fmt.Println("NewConsumer", err) panic(err) } consumer.AddHandler(&ConsumerHandler{}) if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil { fmt.Println("ConnectToNSQLookupd", err) panic(err) } } func main() { ConsumerA() ConsumerB() Producer() }命令执行顺序如下
nsqlookupd nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 nsqadmin --lookupd-http-address=127.0.0.1:4161测试程序执行打印如下
banjakukutekiiMac:test panshiqu$ ./main 2016/11/24 09:52:49 INF 1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2016/11/24 09:52:49 ERR 1 [test/test-channel-a] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}" 2016/11/24 09:52:49 INF 2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2016/11/24 09:52:49 ERR 2 [test/test-channel-b] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}" 2016/11/24 09:52:49 INF 3 (127.0.0.1:4150) connecting to nsqd 2016/11/24 09:53:57 INF 2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2016/11/24 09:53:57 INF 2 [test/test-channel-b] (127.0.0.1:4150) connecting to nsqd Hello World 1 Hello World 2 Hello World 3 Hello World 4 Hello World 5 Hello World 6 Hello World 7 Hello World 8 Hello World 9 Hello World 10 Hello World 11 Hello World 12 Hello World 13 Hello World 14 Hello World 15 2016/11/24 09:54:01 INF 1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2016/11/24 09:54:01 INF 1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd Hello World 16 Hello World 16 Hello World 17 Hello World 17 Hello World 18 Hello World 18 ...对于输出我作如下理解,因为初次启动 nsq 相关程序,ConsumerA[test/test-channel-a] 查询 nsqlookupd 主题为 test,返回错误,主题不存在。ConsumerB[test/test-channel-b] 也执行上面的动作。这个时候应该不会创建两个 channel,test-channel-a 和 test-channel-b,也不会创建主题。接下来 Producer 成功连接 nsqd,这个时候会创建 test 主题。等待了一会后 ConsumerB 尝试查询主题成功,进而连接 nsqd,成功建立 test-channel-b,消费已被生产出的 15 条消息,因为 test-channel-a 还未被创建,所以目前已有的消息是不会被复制分发的。接着 ConsumerA 尝试查询主题成功,进而连接 nsqd,成功建立 test-channel-a,接下来的消息都是被复制分发的,两个消费者都能收到
两个 channel 都指定为 test-channel-a 将得到如下输出,可以确定的是多个消费者守在同一个 channel 中,同一条消息将只会被一个消费者处理
banjakukutekiiMac:test panshiqu$ go run main.go 2016/11/24 10:23:52 INF 1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2016/11/24 10:23:52 INF 1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd 2016/11/24 10:23:52 INF 2 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2016/11/24 10:23:52 INF 2 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd 2016/11/24 10:23:52 INF 3 (127.0.0.1:4150) connecting to nsqd Hello World 1 Hello World 2 Hello World 3提醒大家在执行上面流程的时候多去 http://127.0.0.1:4171/ 查看运行状态,将会在那里发现很多内部细节 还有很多情况建议大家也尝试尝试,多 nsqd 和 多 nsqlookupd,毕竟人家是支持集群的