重头戏!ZeroMQ的管道模式详解:ZMQ_PUSH、ZMQ_PULL

2024-02-16 03:30

本文主要是介绍重头戏!ZeroMQ的管道模式详解:ZMQ_PUSH、ZMQ_PULL,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、ØMQ模式总览

  • ØMQ支持多种模式,具体可以参阅:https://blog.csdn.net/qq_41453285/article/details/106865539
  • 本文介绍ØMQ的管道模式

二、管道模式

  • 管道模式在有的地方也称为“流水线”模式
  • 管道模式用于将数据分发到布置在流水线中的节点。数据始终沿流水线向下流动,流水线的每一级都连接到至少一个节点。当流水线级连接到多个节点时,数据在所有连接的节点之间进行轮询
  • 管道模式由http://rfc.zeromq.org/spec:30正式定义
  • 管道模式支持的套接字类型有4种:
    • ZMQ_PUSH
    • ZMQ_PULL

三、“PUSH-PULL”套接字类型

ZMQ_PUSH

  • 管道节点使用类型为ZMQ_PUSH的套接字将消息发送到下游流水线节点。消息循环到所有连接的下游节点
  • 该套接字类型不支持zmq_msg_recv()等接收数据的函数
  • 当ZMQ_PUSH套接字由于已达到所有下游节点的高水位线而进入静音状态时,或者如果根本没有下游节点,则套接字上的任何zmq_send()操作都应阻塞,直到静音状态结束或处于至少一个下游节点可用于发送;消息不会被丢弃
                                                                                                         ZMQ_PUSH特性摘要 
兼容的对等套接字ZMQ_PULL
方向单向
发送/接收模式仅发送
入网路由策略不适用(N/A)

外发路由策略

轮询
静音状态下的操作阻塞

MQ_PULL

  • 管道节点使用ZMQ_PULL类型的套接字从上游管道节点接收消息
  • 消息从所有连接的上游节点中公平排队
  • 该套接字类型不支持zmq_msg_send()等发送数据的函数
                                                                                                       ZMQ_PULL特性摘要 
兼容的对等套接字ZMQ_PUSH
方向单向
发送/接收模式仅接收
入网路由策略公平排队

外发路由策略

不适用(N/A)
静音状态下的操作阻塞

四、演示案例

  • 本文介绍的一个例子,是一个典型的并行处理模式。内容有:
    • 一台发生器(taskvent.c),它产生可以并行执行的任务
    • 一组工人(taskwork.c),用于处理任务。在现实中,工人在超快的电脑上运行,例如使用GPU(图形处理单元)来完成这个艰难的数学运算
    • 一个接收器(tasksink.c),用于收集工作进程返回的结果

  • 发生器代码如下所示:发生器产生100个任务,每个任务包含一个消息,用来告诉工人某个休眠所需的毫秒数

// taskvent.c
// 源码链接: https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/taskvent.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <time.h>
#include <zmq.h>#define randof(num)  (int)((float)(num) * random() / (RAND_MAX + 1.0))// 向套接字socket发送消息string
static int s_send(void *socket, char *string);int main()
{int rc;// 1.创建新的上下文void *context = zmq_ctx_new();assert(context != NULL);// 2.创建PUSH套接字、绑定套接字,//   工人会连接这个套接字, 用来给工人发送消息的void *sender = zmq_socket(context, ZMQ_PUSH);assert(sender != NULL);rc = zmq_bind(sender, "tcp://*:5557");assert(rc != -1);// 3.创建PUSH套接字、并连接到接收器,//   该套接字给接收器发送一个消息, 告诉接收器开始工作, 只使用一次void *sink = zmq_socket(context, ZMQ_PUSH);assert(sink != NULL);rc = zmq_connect(sink, "tcp://localhost:5558");assert(rc != -1);// 4.输入回车, 回车之后发生器开始产生任务, 并且向接收器发送一条消息, 用于指示当前发生器要开始工作了printf("Press Enter when the workers are ready:");getchar();printf("Sending tasks to workers...\n");// 随意发什么, 此处我们发送字符0// 如果接收器未工作, 则s_send()阻塞rc = s_send(sink, "0");assert(rc != -1);// 5.初始化随机数发生器srandom((unsigned)time(NULL));// 6.生成100个任务, 然后将任务发送给工人int task_nbr;int total_msec = 0;int workload;for(task_nbr = 0; task_nbr < 100; ++task_nbr){workload = randof(100) + 1;      //生成一个毫秒数, 1-100之间total_msec += workload;          //加到总的毫秒数上char string[10];sprintf(string, "%d", workload);s_send(sender, string);          //将毫秒发送给工人}//打印总的毫秒数printf("Total expected cost: %d msec\n", total_msec);// 7.休眠1秒, 给ZeroMQ时间来传递消息sleep(1);// 8.关闭套接字、销毁上下文zmq_close(sender);zmq_close(sink);zmq_ctx_destroy(context);return 0;
}static int s_send(void *socket, char *string)
{int rc;zmq_msg_t msg;zmq_msg_init_size(&msg, strlen(string));memcpy(zmq_msg_data(&msg), string, strlen(string));rc = zmq_msg_send(&msg, socket, 0);zmq_msg_close(&msg);return rc;
}
  • 工人代码如下所示:工人从发生器接收到消息,该消息是一个毫秒数,接收到该毫秒数之后,工人按照指定的毫秒数进行休眠,休眠完成之后发出它完成任务的信号
// taskwork.c
// 源码链接: https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/taskwork.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <assert.h>
#include <zmq.h>// 从socket上接收数据并返回
static char *s_recv(void *socket);// 向套接字socket发送消息string
static int s_send(void *socket, char *string);// 休眠msecs毫秒
static int s_sleep(int msecs);int main()
{int rc;// 1.初始化新的上下文void *context = zmq_ctx_new();assert(context != NULL);// 2.创建套接字、连接发生器//   该套接字用来从发生器接收数据void *reciver = zmq_socket(context, ZMQ_PULL);assert(reciver != NULL);rc = zmq_connect(reciver, "tcp://localhost:5557");assert(rc != -1);// 3.创建套接字、连接接收器//   该套接字用来向接收器发送数据void *sender = zmq_socket(context, ZMQ_PUSH);assert(sender != NULL);rc = zmq_connect(sender, "tcp://localhost:5558");assert(rc != -1);// 4.永久循环处理任务while(1){// 5.从发生器接收数据, 这里接收到的将是一个毫秒数char *string = s_recv(reciver);assert(string != NULL);//打印接收到的毫秒数fflush(stdout);printf("%s.", string);// 然后工人休眠指定的毫秒继续工作s_sleep(atoi(string));free(string);// 6.工人在处理完任务之后, 发送一条消息给接收器, 表示完成了一条任务rc = s_send(sender, "");}// 7.关闭套接字、销毁上下文zmq_close(reciver);zmq_close(sender);zmq_ctx_destroy(context);return 0;
}static char *s_recv(void *socket)
{int rc;zmq_msg_t msg;zmq_msg_init(&msg);rc = zmq_msg_recv(&msg, socket, 0);if(rc == -1)return NULL;char *string = (char*)malloc(rc + 1);if(string == NULL)return NULL;memcpy(string, zmq_msg_data(&msg), rc);string[rc] = 0;return string;
}static int s_send(void *socket, char *string)
{int rc;zmq_msg_t msg;zmq_msg_init_size(&msg, strlen(string));memcpy(zmq_msg_data(&msg), string, strlen(string));rc = zmq_msg_send(&msg, socket, 0);zmq_msg_close(&msg);return rc;
}static int s_sleep(int msecs)
{ 
#if (defined (WIN32))Sleep (msecs);
#elsestruct timespec t;t.tv_sec  =  msecs / 1000;t.tv_nsec = (msecs % 1000) * 1000000;nanosleep (&t, NULL);
#endif
}
  •  接收器代码如下所示:下面是接收器的代码,它收集100条消息,然后计算整体处理用了多长时间,所以我们可以证实,如果有一个以上的工人,它们确实是并行运行的
// tasksink.c
// 源码链接: https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/tasksink.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <zmq.h>// 从socket中接收数据
static char* s_recv(void* socket);// 返回当前系统时钟,以毫秒返回
static int64_t s_clock (void);int main()
{// 1.创建新的上下文void *context = zmq_ctx_new();// 2.创建、绑定套接字//   发生器、工人都会连接这个套接字, 并向这个套接字发送数据void *receiver = zmq_socket(context, ZMQ_PULL);zmq_bind(receiver, "tcp://*:5558");// 3.这是接收的第一条消息, 从发生器那里接收的, 是发生器用来告诉当前接收器, 表示开始工作了char *string = s_recv(receiver);free(string);// 4.启动时钟int64_t start_time = s_clock();// 5.从工人那里接收100个确认, 因为发生器只分配了100个任务给工人int task_nbr;for(task_nbr = 0; task_nbr < 100; task_nbr++){// 6.接收数据char *string = s_recv(receiver);free(string);// 7.每接收10个任务打印一次:号, 其余打印.if((task_nbr / 10) * 10 == task_nbr)printf(":");elseprintf(".");fflush(stdout);}// 8.处理完成之后打印一下总共执行了多长时间, 也就是工人一共工作了多长时间printf("Total elapsed time: %d msec\n",(int)(s_clock() - start_time));// 9.关闭套接字、销毁上下文zmq_close(receiver);zmq_ctx_destroy(context);return 0;
}static char* s_recv(void* socket)
{zmq_msg_t msg;zmq_msg_init(&msg);int rc = zmq_msg_recv(&msg, socket, 0);if(rc == -1)return NULL;char *string = (char*)malloc(rc + 1);memcpy(string, zmq_msg_data(&msg), rc);zmq_msg_close(&msg);string[rc] = 0;return string;
}static int64_t s_clock (void)
{
#if (defined (WIN32))SYSTEMTIME st;GetSystemTime (&st);return (int64_t) st.wSecond * 1000 + st.wMilliseconds;
#elsestruct timeval tv;gettimeofday (&tv, NULL);return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000);
#endif
}
  • 代码总的结构如下:

  • 演示效果①,当只启用1个工作者时,效果如下:
    • 最左侧为发生器:其给工作者发送的时间时间为5032毫秒
    • 中间为工作者:从发生器那里拉取任务,打印从发生器那里处理的任务(打印秒数)
    • 最右侧为接收器:用来在工作者完成工作时打印工作的总时间,可以看到只有1个工作者时显示的是5050毫秒,与发生器指定的差不多

  • 演示效果②,当启用2个工作者时,效果如下:
    • 最左侧为发生器:其给工作者发送的时间时间为5098毫秒
    • 中间的2个为工作者:从发生器那里拉取任务,打印从发生器那里处理的任务(打印秒数)
    • 最右侧为接收器:用来在工作者完成工作时打印工作的总时间,可以看到有2个工作者时显示的是2687毫秒,说明有两个工作者并行工作时,时间被平分

  • 演示效果③,当启用3个工作者时,效果如下:
    • 最左侧为发生器:其给工作者发送的时间时间为5598毫秒
    • 中间的3个为工作者:从发生器那里拉取任务,打印从发生器那里处理的任务(打印秒数)
    • 最右侧为接收器:用来在工作者完成工作时打印工作的总时间,可以看到有3个工作者时显示的是1945毫秒,说明工作者越多,任务处理的越快

五、模式总结

  • 工人向上连接到发生器,并且向下连接到接收器。这意味着你可以随意添加工人。 因此,发生器和接收器是架构的固定部分,而工人是动态部分
  • 我们必须同步开始同批次的所有工人的启动和运行。这是ZeroMQ中存在的一个相当普遍的疑难杂症,并没有简单的解决办法。connect方法需要一定的时间,所以当一组工人连接到发生器时,第一个成功连接的工人会在这短短的时间得到消息的整个负载,而其他工人仍在进行连接。如果不知何故批次的开始不同步,那么系统就将无法并行运行
  • 发生器的PUSH套接字将任务均匀地分配给工人。这就是所谓的负载均衡,以后还会详细介绍
  • 接收器的PULL套接字均匀地收集来自工人的结果。这就是所谓的公平排队(如下图所示)

管道模式也有类似“慢木匠”的现象

  • 它导致了对PUSH套接字不能正确地负载均衡的指责。如果你使用的是PUSH和PULL,并且你的某个工人得到比其他工人更多的信息,这是因为PULL套接字已比别人更快地连接,并在其他工人试图连接之前抓取了很多消息。

  • 我是小董,V公众点击"笔记白嫖"解锁更多【ZeroMQ】资料内容。

这篇关于重头戏!ZeroMQ的管道模式详解:ZMQ_PUSH、ZMQ_PULL的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/713362

相关文章

Mysql 中的多表连接和连接类型详解

《Mysql中的多表连接和连接类型详解》这篇文章详细介绍了MySQL中的多表连接及其各种类型,包括内连接、左连接、右连接、全外连接、自连接和交叉连接,通过这些连接方式,可以将分散在不同表中的相关数据... 目录什么是多表连接?1. 内连接(INNER JOIN)2. 左连接(LEFT JOIN 或 LEFT

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Linux内核之内核裁剪详解

《Linux内核之内核裁剪详解》Linux内核裁剪是通过移除不必要的功能和模块,调整配置参数来优化内核,以满足特定需求,裁剪的方法包括使用配置选项、模块化设计和优化配置参数,图形裁剪工具如makeme... 目录简介一、 裁剪的原因二、裁剪的方法三、图形裁剪工具四、操作说明五、make menuconfig

详解Java中的敏感信息处理

《详解Java中的敏感信息处理》平时开发中常常会遇到像用户的手机号、姓名、身份证等敏感信息需要处理,这篇文章主要为大家整理了一些常用的方法,希望对大家有所帮助... 目录前后端传输AES 对称加密RSA 非对称加密混合加密数据库加密MD5 + Salt/SHA + SaltAES 加密平时开发中遇到像用户的

Springboot使用RabbitMQ实现关闭超时订单(示例详解)

《Springboot使用RabbitMQ实现关闭超时订单(示例详解)》介绍了如何在SpringBoot项目中使用RabbitMQ实现订单的延时处理和超时关闭,通过配置RabbitMQ的交换机、队列和... 目录1.maven中引入rabbitmq的依赖:2.application.yml中进行rabbit

C语言线程池的常见实现方式详解

《C语言线程池的常见实现方式详解》本文介绍了如何使用C语言实现一个基本的线程池,线程池的实现包括工作线程、任务队列、任务调度、线程池的初始化、任务添加、销毁等步骤,感兴趣的朋友跟随小编一起看看吧... 目录1. 线程池的基本结构2. 线程池的实现步骤3. 线程池的核心数据结构4. 线程池的详细实现4.1 初

Python绘制土地利用和土地覆盖类型图示例详解

《Python绘制土地利用和土地覆盖类型图示例详解》本文介绍了如何使用Python绘制土地利用和土地覆盖类型图,并提供了详细的代码示例,通过安装所需的库,准备地理数据,使用geopandas和matp... 目录一、所需库的安装二、数据准备三、绘制土地利用和土地覆盖类型图四、代码解释五、其他可视化形式1.

SpringBoot使用Apache POI库读取Excel文件的操作详解

《SpringBoot使用ApachePOI库读取Excel文件的操作详解》在日常开发中,我们经常需要处理Excel文件中的数据,无论是从数据库导入数据、处理数据报表,还是批量生成数据,都可能会遇到... 目录项目背景依赖导入读取Excel模板的实现代码实现代码解析ExcelDemoInfoDTO 数据传输

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

使用Spring Cache时设置缓存键的注意事项详解

《使用SpringCache时设置缓存键的注意事项详解》在现代的Web应用中,缓存是提高系统性能和响应速度的重要手段之一,Spring框架提供了强大的缓存支持,通过​​@Cacheable​​、​​... 目录引言1. 缓存键的基本概念2. 默认缓存键生成器3. 自定义缓存键3.1 使用​​@Cacheab