本文主要是介绍ZMQ管道模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
案例一
生产者Producer
#include <zmq.hpp>
#include <iostream>
#include <string>
#include<chrono>
#include<thread>using namespace std;
using namespace zmq;int main() {context_t context(1);// 创建 PUSH 套接字,用于发送消息到代理socket_t push_socket(context, ZMQ_PUSH);push_socket.connect("tcp://localhost:8888");cout << "connect success" << endl;int i = 0;for (; ;) {string message = "Message " + to_string(++i);push_socket.send(buffer(message), send_flags::none);cout << "Sending: " << message << endl;this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒}return 0;
}
队列queue()
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>// 休眠指定的时间长度using namespace std;
using namespace zmq;int main() {context_t context(2);// 创建 PULL 套接字,用于接收生产者的消息socket_t pull_socket(context, ZMQ_PULL);pull_socket.bind("tcp://*:8888");// 创建 PUSH 套接字,用于发送消息给消费者socket_t push_socket(context, ZMQ_PUSH);push_socket.bind("tcp://*:8889");cout << "bind success" << endl;while (true) {message_t message;pull_socket.recv(message, recv_flags::none);cout << "broken recv :" << message.to_string() << endl;// 睡眠一段时间模拟处理时间this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒cout << "broken send :" << message.to_string() << endl;push_socket.send(message, send_flags::none);}return 0;
}```## 消费者(Consumer)```cpp
// client.cpp
#include <zmq.hpp>
#include <iostream>
#include <string>using namespace std;
using namespace zmq;int main() {context_t context(1);// 创建 PULL 套接字,用于接收代理发送的消息socket_t pull_socket(context, ZMQ_PULL);pull_socket.connect("tcp://localhost:8889");cout << "connect success" << endl;for (; ; ) {message_t message;pull_socket.recv(message, recv_flags::none);cout << "Received: " << message.to_string() << endl;}return 0;
}
编译
g++ -o server server.cpp `pkg-config --cflags --libs libzmq`
g++ -o client client.cpp `pkg-config --cflags --libs libzmq`
g++ -o queue queue.cpp `pkg-config --cflags --libs libzmq`
队列程序逻辑
队列作为管道需要接受来只上游生产者的消息,又需要发送消息给下游的消费者。
绑定端点
context_t context(1);// 创建 PULL 套接字,用于接收生产者的消息
socket_t pull_socket(context, ZMQ_PULL);
pull_socket.bind("tcp://*:8888");// 创建 PUSH 套接字,用于发送消息给消费者
socket_t push_socket(context, ZMQ_PUSH);
push_socket.bind("tcp://*:8889");
收发消息
message_t message;pull_socket.recv(message, recv_flags::none);cout << "broken recv :" << message.to_string() << endl;// 睡眠一段时间模拟处理时间this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒cout << "broken send :" << message.to_string() << endl;push_socket.send(message, send_flags::none);
生产者程序逻辑
连接管道
context_t context(1);// 创建 PUSH 套接字,用于发送消息到代理socket_t push_socket(context, ZMQ_PUSH);push_socket.connect("tcp://localhost:8888");
发送消息
string message = "Message " + to_string(++i);push_socket.send(buffer(message), send_flags::none);
消费者程序逻辑
连接管道
context_t context(1);// 创建 PULL 套接字,用于接收代理发送的消息socket_t pull_socket(context, ZMQ_PULL);pull_socket.connect("tcp://localhost:8889");
接受消息
message_t message;pull_socket.recv(message, recv_flags::none);
总结
在管道两边,可以有n个生产者,m个消费者,生产者的消息会以先进先出的顺序经过管道到达消费者,管道发给m个消费者的消息是负载均衡的。
这篇关于ZMQ管道模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!