ZeroMQ(1)——三个基本模型

2023-12-14 15:08
文章标签 模型 基本 三个 zeromq

本文主要是介绍ZeroMQ(1)——三个基本模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ZeroMQ(1)——三个基本模型

官方文档
我自己使用zeromq,但是其实对zeromq,并不是很了解,对于zeromq,具体解决什么问题也是不太清楚。项目中将zeromq用作一个消息队列。

引用他人的一段总结:

引用官方的说法: “ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD 套接字之上的一层封装。ZMQ 让编写高性能网络应用程序极为简单和有趣。”

近几年有关”Message Queue”的项目层出不穷,知名的就有十几种,这主要是因为后摩尔定律时代,分布式处理逐渐成为主流,业界需要一套标准来解决分布式计算环境中节点之间的消息通信。几年的竞争下来,Apache 基金会旗下的符合 AMQP/1.0标准的 RabbitMQ 已经得到了广泛的认可,成为领先的 MQ 项目。

与 RabbitMQ 相比,ZMQ 并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在 Socket API 之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的 API 接口。

其实zeromq所处理的就是使用网络通信来实现一个消息队列,用于系统,进程,线程之间的通信。其是对于socket的一层封装,类似于ACE。

ZeroMQ的几种基本模型

模型一:请求响应模型(Request-Reply)

请求响应模型是一个最基本的服务器/客户端socket通信模型:

这里写图片描述

服务器端代码:

#   Hello World server in Python
#   Binds REP socket to tcp://*:5555
#   Expects b"Hello" from client, replies with b"World"
#import time
import zmqcontext = zmq.Context()
socket = context.socket(zmq.REP)    #创建的socket类型需要定义好,zmq.REP,响应型
socket.bind("tcp://*:5555")    #绑定端口,其实也就是bind&listenwhile True:#  Wait for next request from clientmessage = socket.recv()            #阻塞型的print("Received request: %s" % message)#  Do some 'work'time.sleep(1)#  Send reply back to clientsocket.send(b"World")

~要理解的是zmq就是对socket进行了一层封装

客户端代码:

#
#   Hello World client in Python
#   Connects REQ socket to tcp://localhost:5555
#   Sends "Hello" to server, expects "World" back
#import zmqcontext = zmq.Context()#  Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)            #zmq.REQ请求型
socket.connect("tcp://localhost:5555")      #这个就是连接端口#  Do 10 requests, waiting each time for a response
for request in range(10):print("Sending request %s …" % request)socket.send(b"Hello")                   #send#  Get the reply.message = socket.recv()print("Received reply %s [ %s ]" % (request, message)

a) 服务端和客户端无论谁先启动,效果是相同的,这点不同于Socket。

b) 在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。

c) 服务端收到信息以后,会send一个“World”给客户端。值得注意的是一定是client连接上来以后,send消息给Server,然后Server再rev然后响应client,这种一问一答式的。如果Server先send,client先rev是会报错的。

d) ZMQ通信通信单元是消息,他除了知道Bytes的大小,他并不关心的消息格式。因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json等等。

e) 虽然可以使用ZMQ实现HTTP协议,但是,这绝不是他所擅长的。

模型二:订阅者模式(Publish-Subscribe)

这里写图片描述

服务器端:

#
#   Weather update server
#   Binds PUB socket to tcp://*:5556
#   Publishes random weather updates
#import zmq
from random import randrangecontext = zmq.Context()
socket = context.socket(zmq.PUB)  #publisher类型
socket.bind("tcp://*:5556")while True:zipcode = randrange(1, 100000)temperature = randrange(-80, 135)relhumidity = randrange(10, 60)socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

这里可以看出发布者只是绑定了端口,并进行信息发布,其并不care是否有接收者,有哪些接收者。

客户端:

#
#   Weather update client
#   Connects SUB socket to tcp://localhost:5556
#   Collects weather updates and finds avg temp in zipcode
#import sys
import zmq#  Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)print("Collecting updates from weather server…")
socket.connect("tcp://localhost:5556")# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)# Process 5 updates
total_temp = 0
for update_nbr in range(5):string = socket.recv_string()zipcode, temperature, relhumidity = string.split()total_temp += int(temperature)print("Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / update_nbr)
)

其中有一句代码是乍看之下不太容易理解的:

socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

官方文档的解释:

Note that when you use a SUB socket you must set a subscription using zmq_setsockopt() and SUBSCRIBE, as in this code. If you don’t set any subscription, you won’t get any messages.

也就是说当使用SUB形式来订阅消息的时候,必须设置一个过滤频道,否则什么也接收不到。而此处使用了,发布者的第一个发布字符串来过滤。这个有规定吗,具体的设置原则是什么?具体请参考:zmq_setsockopt()

另外要说明的两点就是:
1. 服务器端一直不断的广播中,如果中途有 Subscriber 端退出,并不影响他继续的广播,当 Subscriber 再连接上来的时候,收到的就是后来发送的新的信息了。这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题,所谓的 Slow joiner。

注意这个slow joiner问题,之后会为了解决这个问题而设计新的模式。

2.但是,如果 Publisher 中途离开,所有的 Subscriber 会 hold 住,等待 Publisher 再上线的时候,会继续接受信息。

管道模式(Pipeline)

想象一下这样的场景,如果需要统计各个机器的日志,我们需要将统计任务分发到各个节点机器上,最后收集统计结果,做一个汇总。PipeLine 比较适合于这种场景。
这里写图片描述

Pipeline的原理就是:有一个Publisher来发布任务,这些任务是可以平行执行的。有一批Worker用于接收任务,Worker处理完任务之后就将结果发送到Sink之中用于归总或进一步处理。

所以要明确的是Pipeline之中并不是服务器,客户端的关系了,而是有三种对象——Ventilator,Worker,Sink

Ventilator代码:

# Task ventilator
# Binds PUSH socket to tcp://localhost:5557
# Sends batch of tasks to workers via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>import zmq
import random
import timetry:raw_input
except NameError:# Python 3raw_input = inputcontext = zmq.Context()# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")print("Press Enter when the workers are ready: ")
_ = raw_input()
print("Sending tasks to workers…")# The first message is "0" and signals start of batch
sink.send(b'0')# Initialize random number generator
random.seed()# Send 100 tasks
total_msec = 0
for task_nbr in range(100):# Random workload from 1 to 100 msecsworkload = random.randint(1, 100)total_msec += workloadsender.send_string(u'%i' % workload)print("Total expected cost: %s msec" % total_msec)# Give 0MQ time to deliver
time.sleep(1)

Worker代码:

# Task worker
# Connects PULL socket to tcp://localhost:5557
# Collects workloads from ventilator via that socket
# Connects PUSH socket to tcp://localhost:5558
# Sends results to sink via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>import sys
import time
import zmqcontext = zmq.Context()# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")# Process tasks forever
while True:s = receiver.recv()# Simple progress indicator for the viewersys.stdout.write('.')sys.stdout.flush()# Do the worktime.sleep(int(s)*0.001)# Send results to sinksender.send(b'')

Sink代码:

# Task sink
# Binds PULL socket to tcp://localhost:5558
# Collects results from workers via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>import sys
import time
import zmqcontext = zmq.Context()# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")# Wait for start of batch
s = receiver.recv()# Start our clock now
tstart = time.time()# Process 100 confirmations
total_msec = 0
for task_nbr in range(100):s = receiver.recv()if task_nbr % 10 == 0:sys.stdout.write(':')else:sys.stdout.write('.')sys.stdout.flush()# Calculate and report duration of batch
tend = time.time()
print("Total elapsed time: %d msec" % ((tend-tstart)*1000))

从程序中,我们可以看到,task ventilator 使用的是 SOCKET_PUSH,将任务分发到 Worker 节点上。而 Worker 节点上,使用 SOCKET_PULL 从上游接受任务,并使用 SOCKET_PUSH 将结果汇集到 Slink。

这篇关于ZeroMQ(1)——三个基本模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493003

相关文章

Golang的CSP模型简介(最新推荐)

《Golang的CSP模型简介(最新推荐)》Golang采用了CSP(CommunicatingSequentialProcesses,通信顺序进程)并发模型,通过goroutine和channe... 目录前言一、介绍1. 什么是 CSP 模型2. Goroutine3. Channel4. Channe

使用Python进行文件读写操作的基本方法

《使用Python进行文件读写操作的基本方法》今天的内容来介绍Python中进行文件读写操作的方法,这在学习Python时是必不可少的技术点,希望可以帮助到正在学习python的小伙伴,以下是Pyth... 目录一、文件读取:二、文件写入:三、文件追加:四、文件读写的二进制模式:五、使用 json 模块读写

无线路由器哪个品牌好用信号强? 口碑最好的三个路由器大比拼

《无线路由器哪个品牌好用信号强?口碑最好的三个路由器大比拼》不同品牌在信号覆盖、稳定性和易用性等方面各有特色,如何在众多选择中找到最适合自己的那款无线路由器呢?今天推荐三款路由器让你的网速起飞... 今天我们来聊聊那些让网速飞起来的路由器。在这个信息爆炸的时代,一个好路由器简直就是家庭网编程络的心脏。无论你

Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)

《Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)》:本文主要介绍Python基于火山引擎豆包大模型搭建QQ机器人详细的相关资料,包括开通模型、配置APIKEY鉴权和SD... 目录豆包大模型概述开通模型付费安装 SDK 环境配置 API KEY 鉴权Ark 模型接口Prompt

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基本知识点

1、c++的输入加上ios::sync_with_stdio(false);  等价于 c的输入,读取速度会加快(但是在字符串的题里面和容易出现问题) 2、lower_bound()和upper_bound() iterator lower_bound( const key_type &key ): 返回一个迭代器,指向键值>= key的第一个元素。 iterator upper_bou

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

Retrieval-based-Voice-Conversion-WebUI模型构建指南

一、模型介绍 Retrieval-based-Voice-Conversion-WebUI(简称 RVC)模型是一个基于 VITS(Variational Inference with adversarial learning for end-to-end Text-to-Speech)的简单易用的语音转换框架。 具有以下特点 简单易用:RVC 模型通过简单易用的网页界面,使得用户无需深入了

透彻!驯服大型语言模型(LLMs)的五种方法,及具体方法选择思路

引言 随着时间的发展,大型语言模型不再停留在演示阶段而是逐步面向生产系统的应用,随着人们期望的不断增加,目标也发生了巨大的变化。在短短的几个月的时间里,人们对大模型的认识已经从对其zero-shot能力感到惊讶,转变为考虑改进模型质量、提高模型可用性。 「大语言模型(LLMs)其实就是利用高容量的模型架构(例如Transformer)对海量的、多种多样的数据分布进行建模得到,它包含了大量的先验

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}