使用Zeromq和protobuf实现的socket通信

    xiaoxiao2021-03-25  58

    本文介绍使用ZeroMQ(下文简称ZMQ),结合protobuf序列化实现客户端和服务端的通信。在之前的一篇文章中(http://blog.csdn.net/cjf_wei/article/details/52894560)介绍了Google的protobuf序列化的使用,以及结合unix环境的socket编程实现简单的客户端到服务端的通信。在接触了zmq之后,尝试使用这个“极速消息通信库”来重构之前的实现。 ZMQ是iMatix开发的以消息为导向的开源中间件库,它类似于Berkeley套接字,它支持多种传输协议,它小巧、简单,但速度足够快,可以用作一个并发框架。它支持多种模式的传输,不管是客户端到服务端的1:1关系,还是M:N关系,亦或是订阅/发布它都能轻松应对。本文使用客户端到服务端的1:1的应答模式。

    Zmq应答模式的基本使用 使用ZMQ进行通信,首先要创建一个上下文环境,然后使用它创建套接字。

    void *context = zmq_ctx_new();//创建上下文

    客户端和服务端使用的socket类型并不一样。

    void *requester = zmq_socket(context, ZMQ_REQ); //for client void *responder = zmq_socket(context, ZMQ_REP); //for server

    随后服务端将socket绑定到一个周知的地址和端口

    zmq_bind(responder,"tcp://*:5555");

    而客户端则要尝试连接到服务端提供的地址

    zmq_connect(requester,"tcp://localhost:5555");

    要把数据写入消息需要使用zmq_msg_init_size()来初始化消息,而读取消息由于未知消息的长度只能使用zmq_msg_init()来创建一个空的消息。 消息初始化后,发送消息使用zmq_send_send(),接收消息则使用zmq_msg_recv(); 访问消息可以使用zmq_msg_data(),要想知道消息的大小可以使用zmq_msg_size();

    最后需要关闭套接字,并销毁上下文。

    zmq_close(&requester); //关闭套接字 zmq_ctx_destroy(context); //销毁上下文

    protobuf的使用请参考(http://blog.csdn.net/cjf_wei/article/details/52894560),在此不再赘述。

    代码实现

    客户端 //for client #include <iostream> #include <string> //for protobuf #include "Test.pb.h" //for zmq #include <zmq.h> using namespace std; using namespace Test::protobuf ; const int BUFFSIZE = 128; int main() { //socket通信所需的上下文环境 void *context = zmq_ctx_new(); //根据context建立的socket的链接,客户端使用ZMQ_REQ套接字 void *requester = zmq_socket(context, ZMQ_REQ); if( -1 == zmq_connect(requester,"tcp://localhost:5555")) { cout<<"Connect to server failed..."<<endl; zmq_ctx_destroy(context); return -1; } cout<<"Connect to server success..."<<endl; HeartInfo myprotobuf; while(1) { myprotobuf.set_type("client"); myprotobuf.set_ip("192.168.1.100"); myprotobuf.set_port(5555); char buff[BUFFSIZE]; myprotobuf.SerializeToArray(buff,BUFFSIZE); //客户端发送请求 int len = strlen(buff); zmq_msg_t req; if(0 != zmq_msg_init_size(&req,len)) { cout<<"zmq_msg_init failed..."<<endl; break; } memcpy(zmq_msg_data(&req),buff,len); if(len != zmq_msg_send(&req,requester,0)) { zmq_msg_close(&req); cout<<"send faliled..."<<endl; break; } //成功发送后,在控制台打印发送消息的内容 cout<<"Type:"<<myprotobuf.type()<<"\t" <<"IP:"<<myprotobuf.ip()<<"\t" <<"Port:"<<myprotobuf.port()<<"\n"; zmq_msg_close(&req); //清空发送缓存 memset(buff,0,BUFFSIZE*sizeof(char)); //客户端接收来自服务端的相应 zmq_msg_t reply; zmq_msg_init(&reply); int size = zmq_msg_recv(&reply,requester,0); memcpy(buff,zmq_msg_data(&reply),size); HeartInfo receive; receive.ParseFromArray(buff,BUFFSIZE); cout<<"Type:"<<receive.type()<<"\t" <<"IP:"<<receive.ip()<<"\t" <<"Port:"<<receive.port()<<"\n"; zmq_msg_close(&reply); } zmq_close(&requester); zmq_ctx_destroy(context); return 0; } 服务端 #include <iostream> #include <string> //for protobuf #include "Test.pb.h" //for zmq #include <zmq.h> using namespace std; using namespace Test::protobuf ; const int BUFFSIZE = 128; int main() { //socket通信所需的上下文环境 void *context = zmq_ctx_new(); //根据context建立的socket的链接,服务端使用ZMQ_REP套接字 void *responder = zmq_socket(context, ZMQ_REP); if( -1 == zmq_bind(responder,"tcp://*:5555")) { cout<<"bind socket to server failed..."<<endl; return -1; } HeartInfo myprotobuf; while(1) { char buff[BUFFSIZE]; //接收客户端请求 zmq_msg_t request; zmq_msg_init(&request); int size = zmq_msg_recv(&request,responder,0); memcpy(buff,zmq_msg_data(&request),size); HeartInfo receive; receive.ParseFromArray(buff,BUFFSIZE); cout<<"Type:"<<receive.type()<<"\t" <<"IP:"<<receive.ip()<<"\t" <<"Port:"<<receive.port()<<"\n"; zmq_msg_close(&request); //清空接收缓存 memset(buff,0,BUFFSIZE*sizeof(char)); sleep(2); myprotobuf.set_type("server"); myprotobuf.set_ip("192.168.1.100"); myprotobuf.set_port(5555); myprotobuf.SerializeToArray(buff,BUFFSIZE); //服务端发送响应 int len = strlen(buff); zmq_msg_t reply; if(0 != zmq_msg_init_size(&reply,len)) { cout<<"zmq_msg_init failed..."<<endl; break; } memcpy(zmq_msg_data(&reply),buff,len); if(len != zmq_msg_send(&reply,responder,0)) { zmq_msg_close(&reply); cout<<"send faliled..."<<endl; break; } //成功发送后,在控制台打印发送消息的内容 cout<<"Type:"<<myprotobuf.type()<<"\t" <<"IP:"<<myprotobuf.ip()<<"\t" <<"Port:"<<myprotobuf.port()<<"\n"; zmq_msg_close(&reply); } zmq_close(&responder); zmq_ctx_destroy(context); return 0; }

    在本文中使用的是protobuf来序列化要传输的内容,当然直接传输字符串也是可以的,但是 需要注意的是“除了字节大小外,zmq对你发送的数据一无所知”。这意味着在C/C++中,传输的字符串是否以’\0’结尾,你要自己决定并负责安全的处理。


    1.《ZeroMQ云时代极速消息通信库》.电子工业出版社,2015. 2. 使用protobuf和socket实现服务器间消息的传递.http://blog.csdn.net/cjf_wei/article/details/52894560

    转载请注明原文地址: https://ju.6miu.com/read-36098.html

    最新回复(0)