重头戏!ZeroMQ的独家对模式详解:ZMQ_PAIR

2024-02-16 03:30

本文主要是介绍重头戏!ZeroMQ的独家对模式详解:ZMQ_PAIR,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、ØMQ模式总览

  • ØMQ支持多种模式,具体可以参阅:https://blog.csdn.net/qq_41453285/article/details/106865539
  • 本文介绍ØMQ的独家对模式

二、独家对模式

  • 在前面的文章中我们介绍过如何编写ØMQ多线程程序:https://blog.csdn.net/qq_41453285/article/details/106882216
  • 独家对模式(Exclusive pair)用于将一个对等点精确地连接到另一个对等点。此模式用于跨inproc传输的线程间通信
  • 互斥对模式由http://rfc.zeromq.org/spec:31正式定义
  • 独家对模式支持的套接字类型只有1种:
    • ZMQ_PAIR

三、“PAIR”套接字类型

  • ZMQ_PAIR类型的套接字只能一次连接到单个对等方。对通过ZMQ_PAIR套接字发送的消息不执行消息路由或筛选
  • 当ZMQ_PAIR套接字由于已达到连接对等方的高水位线而进入静音状态时,或者如果没有连接任何对等方,则套接字上的任何zmq_send()操作都应阻塞,直到对等方可用于发送;消息不会被丢弃
  • 适用协议:
    • ZMQ_PAIR套接字设计用于通过nproc传输进行线程间通信,并且不实现自动重新连接等功能
    • 尽管ZMQ_PAIR套接字可用于inproc以外的其他传输协议,但是它们无法自动重新连接,并且当以前存在任何连接(包括关闭状态的连接)时,新的传入连接将被终止,这使得它们在大多数情况下不适合TCP
                                                                                                       ZMQ_PAIR特性摘要 
兼容的对等套接字ZMQ_PAIR
方向双向
发送/接收模式无限制
入网路由策略不适用(N/A)

外发路由策略

不适用(N/A)
静音状态下的操作阻塞

四、PAIR套接字应用场景:协调线程

  • 在编写多线程应用程序时,会遇到如何“协调线程”的问题,例如一个线程状态发生改变时同时另一个线程:
    • 如果使用以往的多线程程序,你可能会使用信号量或互斥等技术
    • 但是在ØMQ中,你可以使用ZMQ_PAIR套接字来进行线程间的通信
  • 下面是一个演示案例,下面创建三个PAIR套接字:
    • PAIR3:主线程中的PAIR套接字,等待PAIR2发来通知消息
    • PAIR2:在主线程中调用pthread_create()创建线程,在线程的回调函数中创建PAIR2套接字,该套接字等待PAIR1发来通知消息
    • PAIR1:PAIR2所在的线程再调用一次pthread_create(),在线程的回调函数中创建PAIR1套接字,PAIR1会向PAIR2发送消息
  • 整体的流程就是:PAIR1发送消息给PAIR2,PAIR2接收到PAIR1的消息之后再发送消息给PAIR3,PAIR3接收到PAIR2的消息之后退出程序

  • 代码如下:
// mtrelay.c
// 源码链接: https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/mtrelay.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <pthread.h>
#include <zmq.h>// PAIR2套接字执行函数, 参数为主进程的上下文对象指针
static void *step2(void *arg);// PAIR1套接字执行函数, 参数为主进程的上下文对象指针
static void *step1(void *arg);// 从socket套接字上接收消息
static char *s_recv(void *socket);// 向socket套接字发送消息string
static int s_send(void *socket, char *string);int main()
{int rc;// 1.创建新的上下文对象void *context = zmq_ctx_new();assert(context != NULL);// 2.创建、绑定PAIR3套接字, PAIR2会连接该套接字并向该套接字发送消息void *receiver = zmq_socket(context, ZMQ_PAIR);assert(receiver != NULL);rc = zmq_bind(receiver, "inproc://step3");assert(rc != -1);// 3.创建线程, 线程中创建PAIR2套接字, PAIR2套接字会向PAIR3发送消息pthread_t thread_id;pthread_create(&thread_id, NULL, step2, context);// 4.阻塞等待接收PAIR2发来消息char *string = s_recv(receiver);assert(string != NULL);free(string);// 5.打印消息printf("Test successful!\n");// 6.关闭套接字、销毁上下文zmq_close(receiver);zmq_ctx_destroy(context);return 0;
}static void *step2(void *arg)
{// 参数为上下文对象指针int rc;// 1.创建、绑定PAIR2套接字, PAIR1会连接该套接字并向该套接字发送消息void *receiver = zmq_socket(arg, ZMQ_PAIR);assert(receiver != NULL);rc = zmq_bind(receiver, "inproc://step2");assert(rc != -1);// 2.创建线程, 线程中创建PAIR1套接字, PAIR1套接字会向PAIR2发送消息pthread_t thread_id;pthread_create(&thread_id, NULL, step1, arg);// 3.等待接收PAIR1发来消息char *string = s_recv(receiver);assert(string != NULL);free(string);// 关闭PAIR2套接字zmq_close(receiver);// 接收到PAIR1发来消息之后, 再开始向PAIR3发送通知// 4.重新创建一个PAIR2套接字, 该套接连接PAIR3void *xmitter = zmq_socket(arg, ZMQ_PAIR);assert(xmitter != NULL);rc = zmq_connect(xmitter, "inproc://step3");assert(rc != -1);// 5.向PAIR3发送通知printf("Step 2 ready, signaling step 3\n");rc = s_send(xmitter, "READY");assert(rc != -1);zmq_close(xmitter);return NULL;
}static void *step1(void *arg)
{// 参数为上下文对象指针int rc;// 创建一个PAIR1套接字, 然后连接PAIR2套接字void *xmitter = zmq_socket(arg, ZMQ_PAIR);assert(xmitter != NULL);rc = zmq_connect(xmitter, "inproc://step2");assert(rc != -1);// 5.向PAIR2发送通知printf("Step 1 ready, signaling step 2\n");rc = s_send(xmitter, "READY");assert(rc != -1);zmq_close(xmitter);return NULL;
}static char *s_recv(void *socket)
{int size;zmq_msg_t msg;zmq_msg_init(&msg);size = zmq_msg_recv(&msg, socket, 0);if(size == -1)return NULL;char *string = (char*)malloc(size + 1);if(string == NULL)return NULL;memcpy(string, zmq_msg_data(&msg), size);zmq_msg_close(&msg);string[size] = 0;return string;
}static int s_send(void *socket, char *string)
{int rc;zmq_msg_t msg;zmq_msg_init_size(&msg, strlen(string));memcpy(zmq_msg_data(&msg), string, strlen(string));rc = zmq_msg_send(&msg, socket, 0);zmq_msg_close(&msg);return rc;
}
  • 编译运行如下:
gcc -o mtrelay mtrelay.c -lzmq

案例分析

  • 这是使用ØMQ进行多线程编程的一个经典模式:
    • 1.两个线程通过inproc通信,使用的是共享的上下文
    • 2.父线程创建一个套接字,将其绑定到一个inproc端点,然后启动子线程,将上下文传递给它
    • 3.子线程创建第二个套接字,将它连接到该inproc端点,然后发信号告诉父线程,它已准备就绪
  • 使用这种模式的多线程代码是不可扩展到进程的。如果你使用inproc和套接字对,你就正在构建一个紧耦合的应用程序,也就是说,其中你的线程在结构是相互依存的,只有在低延迟真的很重要的时候才这样做。另一种设计模式是一个松耦合的应用程序,其中的线程有自己的上下文并通过ipc或tcp通信。你可以轻松地将松耦合的线程分解为单独的进程

为什么选择的是PAIR?

  • 此处使用的是PAIR套接字,其他套接字组合也能够完成上面相同的工作,但是其他台套接字都有副作用,可能会干扰信令:
    • 你可以让发送者使用PUSH并让接收者使用PULL,但是PUSH会把消息发送给所有存在的接收者,假设你启动了2个接收者,那么就会“丢失”一半的信号。PAIR具有拒绝多个连接,两个连接的组成的对是独占的
    • 你可以让发送者使用DEALER并让接收者使用ROUTER,但是ROUTER将你的信息包装在一个“封包”中,这意味着你的大小为0的信号变成了一个多部分消息。如果你不关心数据并把任何东西都当做一个有效的信号,并且如果你不会不止一次地从套接字上读取,这并不重要。但是,如果你决定要发送实际数据时,你会突然发现ROUTER为你提供了“错误”的消息。DEALER也分发传出消息,这带来与PUSH相同的风险
    • 你可以让发送者使用PUB,而让接收者使用SUB,这将完全按照你发送它们的原样正确地传递你的消息,而且PUB不像PUSH或DEALER那样分发消息。但是,你需要用空订阅配置订阅者,这比较麻烦,更糟的是,PUB-SUB链接的可靠性是与实践相关的,并且如果在PUB套接字发送消息时,SUB套接字正在连接,信息就有可能会丢失
  • 综合以上的原因,使得PAIR成为线程对之间协调的最佳选择

五、节点协调

  • 当你想协调节点时,PAIR套接字就无法正常工作了,这是线程和节点的策略不同的少数地方之一。详情参阅下一篇文章:https://blog.csdn.net/qq_41453285/article/details/106949903。

  • 我是小董,V公众点击"笔记白嫖"解锁更多【ZeroMQ】资料内容。

这篇关于重头戏!ZeroMQ的独家对模式详解:ZMQ_PAIR的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/713363

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

6.1.数据结构-c/c++堆详解下篇(堆排序,TopK问题)

上篇:6.1.数据结构-c/c++模拟实现堆上篇(向下,上调整算法,建堆,增删数据)-CSDN博客 本章重点 1.使用堆来完成堆排序 2.使用堆解决TopK问题 目录 一.堆排序 1.1 思路 1.2 代码 1.3 简单测试 二.TopK问题 2.1 思路(求最小): 2.2 C语言代码(手写堆) 2.3 C++代码(使用优先级队列 priority_queue)

K8S(Kubernetes)开源的容器编排平台安装步骤详解

K8S(Kubernetes)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。以下是K8S容器编排平台的安装步骤、使用方式及特点的概述: 安装步骤: 安装Docker:K8S需要基于Docker来运行容器化应用程序。首先要在所有节点上安装Docker引擎。 安装Kubernetes Master:在集群中选择一台主机作为Master节点,安装K8S的控制平面组件,如AP

嵌入式Openharmony系统构建与启动详解

大家好,今天主要给大家分享一下,如何构建Openharmony子系统以及系统的启动过程分解。 第一:OpenHarmony系统构建      首先熟悉一下,构建系统是一种自动化处理工具的集合,通过将源代码文件进行一系列处理,最终生成和用户可以使用的目标文件。这里的目标文件包括静态链接库文件、动态链接库文件、可执行文件、脚本文件、配置文件等。      我们在编写hellowor

LabVIEW FIFO详解

在LabVIEW的FPGA开发中,FIFO(先入先出队列)是常用的数据传输机制。通过配置FIFO的属性,工程师可以在FPGA和主机之间,或不同FPGA VIs之间进行高效的数据传输。根据具体需求,FIFO有多种类型与实现方式,包括目标范围内FIFO(Target-Scoped)、DMA FIFO以及点对点流(Peer-to-Peer)。 FIFO类型 **目标范围FIFO(Target-Sc

019、JOptionPane类的常用静态方法详解

目录 JOptionPane类的常用静态方法详解 1. showInputDialog()方法 1.1基本用法 1.2带有默认值的输入框 1.3带有选项的输入对话框 1.4自定义图标的输入对话框 2. showConfirmDialog()方法 2.1基本用法 2.2自定义按钮和图标 2.3带有自定义组件的确认对话框 3. showMessageDialog()方法 3.1

脏页的标记方式详解

脏页的标记方式 一、引言 在数据库系统中,脏页是指那些被修改过但还未写入磁盘的数据页。为了有效地管理这些脏页并确保数据的一致性,数据库需要对脏页进行标记。了解脏页的标记方式对于理解数据库的内部工作机制和优化性能至关重要。 二、脏页产生的过程 当数据库中的数据被修改时,这些修改首先会在内存中的缓冲池(Buffer Pool)中进行。例如,执行一条 UPDATE 语句修改了某一行数据,对应的缓

模版方法模式template method

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/template-method 超类中定义了一个算法的框架, 允许子类在不修改结构的情况下重写算法的特定步骤。 上层接口有默认实现的方法和子类需要自己实现的方法