Linux下使用ZMQ实践之与libevent结合

2024-02-02 21:38

本文主要是介绍Linux下使用ZMQ实践之与libevent结合,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 前言

        最近考虑到一个问题,项目中有同时处理socket、zeromq的逻辑需求,想通过libevent(I/O服用)一块将zmq-socket的事件也放一个线程中处理。
        网上了解了一些实现,大部分都是通过将zmq的sockfd拿到,加入libevent_dispatch中一并处理,但是存在问题是实时性不够,写法不对导致丢事件的情况。
       上述的根本原因是:zeromq是通过底层独立线程完成socket层面的收发,拷贝到内存队列供给上层业务使用。强行监听socket事件,但实际数据获取还是从内存队列拿取,肯定是存在时效性的问题。
       所以,本文尝试一种方法,服务端这块直接剖析zeromq协议zmtp,不要zmq子线程、内存队列,直接从socket层面收取数据;客户端仍然兼容libzmq常规的写法。

2. 相关知识

根据 zmtp官方说明 的解读:
协议流程主要为:打招呼greeting、握手handshake、通信traffic
通信内容主要分为命令command、消息message

2.1 Greeting格式

Greeting 又可以细分为:签名signature、版本号version、机器码mechanism、类型as-server、扩展filter;主要是zmq考虑到了多种协议版本、多种协议类型,考虑的字段信息就多了。

;   The greeting announces the protocol detailsgreeting = signature version mechanism as-server fillersignature = %xFF padding %x7F
padding = 8OCTET        ; Not significantversion = version-major version-minor
version-major = %x03
version-minor = %x00;   The mechanism is a null padded string
mechanism = 20mechanism-char
mechanism-char = "A"-"Z" | DIGIT| "-" | "_" | "." | "+" | %x0;   Is the peer acting as server?
as-server = %x00 | %x01;   The filler extends the greeting to 64 octets
filler = 31%x00             ; 31 zero octets

2.1 Handshake格式

Handshake相比Greeting简单多了,一个command搞定

;   The handshake consists of at least one command
;   The actual grammar depends on the security mechanism
handshake = 1*command

2.2 Command格式

Command提供了长命令、短命令进行选择;与Message不同的是,Command数据域body还拆分出命令名称Command-name和命令数据Command-data,这块主要用到的地方是包含在数据帧中,配合Message进行信息控制交互;

;   A command is a single long or short frame
command = command-size command-body
command-size = %x04 short-size | %x06 long-size
short-size = OCTET          ; Body is 0 to 255 octets
long-size = 8OCTET          ; Body is 0 to 2^63-1 octets
command-body = command-name command-data
command-name = OCTET 1*255command-name-char
command-name-char = ALPHA
command-data = *OCTET

2.3 Message格式

Message内部其实提供了两种类型(长消息、短消息)、两种标记(独立报文、组合报文)进行灵活选择

;   A message is one or more frames
message = *message-more message-last
message-more = ( %x01 short-size | %x03 long-size ) message-body
message-last = ( %x00 short-size | %x02 long-size ) message-body
message-body = *OCTET

3. 实践

通过zmtp协议的指导,我们选取一个方向来实践一下:push-pull 协议、libevent::buffer_event

3.1 连接握手

这次先看一下主函数,有个大概的印象:socket连接__do_connect,zmq握手__do_handshake,回调函数拉数据on_recv

int main(int argc, char *argv[])
{int res = -1;struct bufferevent *bev = NULL;struct event_base *base = NULL;struct msg_ctx mctx = {0};mctx.evbuf = evbuffer_new();__do_reset(&mctx);if (argc < 2) {printf("%s <address>\n", argv[0], argv[1]);exit(EXIT_FAILURE);}printf("Program start...\n");base = event_base_new();assert(base);bev = __do_connect(base, argv[1]);res = __do_handshark(bev);assert(res == 0);bufferevent_setwatermark(bev, EV_READ, 0, 0);bufferevent_setcb(bev, on_recv, NULL, NULL, &mctx);bufferevent_enable(bev, EV_READ);event_base_dispatch(base);printf("Program quit...\n");sleep(5);return EXIT_SUCCESS;
}

本端实现的是pull的功能,需要连接到push,然后进行拉取数据;
socket建立连接使用的常规的bufferevent_socket_connect的方法

static struct bufferevent *__do_connect(struct event_base *base, const char *paddr)
{int res = -1;int dstlen = sizeof(struct sockaddr_storage);struct bufferevent *bev = NULL;struct sockaddr_storage dst;res = evutil_parse_sockaddr_port(paddr, (struct sockaddr *)&dst, &dstlen);assert(0 == res);bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);assert(bev);res = bufferevent_socket_connect(bev, (struct sockaddr *)&dst, dstlen);assert(0 == res);return bev;
}

下来准备看握手过程的实现,先看一下结构体的定义是怎么跟zmtp结合的;这里有个地方注意一下,zmtp3.0与2.0协议格式还不一致,所以先制定约束:使用v3协议,不做v2的兼容。

/** ZMTP 3.0* connection = greeting handshake traffic** This is the 3.0 greeting (64 bytes)* greeting = signature version mechanism as-server filler*/
struct zmtp_greeting {char signature [10];    // %xFF padding %x7Fchar version [2];        // version-major %x03, version-minor %x00char mechanism [20];    // The mechanism is a null padded stringchar as_server [1];        // Is the peer acting as serverchar filler [31];        // The filler extends the greeting to 64 octets
};

从第2小节可以看出,Message和Command是类似的,这里我们全抽象为:flags、size、data(柔性数组)

/** A message is one or more frames** message = *message-more message-last* message-more = ( %x01 short-size | %x03 long-size ) message-body* message-last = ( %x00 short-size | %x02 long-size ) message-body* message-body = *OCTET** short-size = OCTET          ; Body is 0 to 255 octets* long-size = 8OCTET          ; Body is 0 to 2^63-1 octets**/
struct zmtp_msg_shdr {u8 flags;             //  Must be zerou8 size;              //  Size, 0 to 255 bytesu8 data [0];          //  Message data
};struct zmtp_msg_lhdr {u8 flags;             //  Must be zerou64 size;             //  Size, 0 to 255 bytesu8 data [0];          //  Message data
};

结构体准备好了,我们看一个握手的快速实现,这里说的快速是指使用阻塞同步的形式完成;另外一种常规的做法是使用状态机+事件回调的方式完成。

static int __do_handshark(struct bufferevent *bev)
{int res = -1;int fd = bufferevent_getfd(bev);struct zmtp_greeting gt = ZMTP_GREETING_INIT;char buffer[SIZE_LINE_NORMAL];struct zmtp_msg_shdr *phead = (struct zmtp_msg_shdr *)buffer;char pull_data[] = ZMTP_HANDSHAKE_PULL;struct zmtp_msg_shdr pull_head = {.flags = ZMTP_FLAGS_SCMD,.size = sizeof(pull_data) - 1,};/* Greeting */res = sdk_tcp_send_nbytes(fd, &gt, sizeof(struct zmtp_greeting), TIMEO);assert(0 == res);res = sdk_tcp_recv_nbytes(fd, &gt, sizeof(struct zmtp_greeting), TIMEO);assert(0 == res);printf("Greeting done...\n");/* Handshark */res = sdk_tcp_recv_nbytes(fd, phead, sizeof(struct zmtp_msg_shdr), TIMEO);assert(0 == res);assert(0x04 == phead->flags);res = sdk_tcp_recv_nbytes(fd, phead->data, phead->size, TIMEO);assert(0 == res);printf("Handsharke size: %d\n\n", phead->size);DSP_TOTAL(phead->data, phead->size);res = sdk_tcp_send_nbytes(fd, &pull_head, sizeof(struct zmtp_msg_shdr), TIMEO);assert(0 == res);res = sdk_tcp_send_nbytes(fd, &pull_data, pull_head.size, TIMEO);assert(0 == res);printf("Handsharke done...\n");return 0;
}

上述其实可以看到,报文填充非常暴力,通过宏直接将握手信息全部填充进去了。

#define ZMTP_GREETING_INIT { \{ 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F }, \{ 3, 0 }, \{ 'N', 'U', 'L', 'L', 0 }, \{ 0 }, \{ 0 } \
}#define ZMTP_HANDSHAKE_PULL { \0x05, \'R', 'E', 'A', 'D', 'Y', \0x0b, \'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e', \0x00, 0x00, 0x00, 0x04, \'P', 'U', 'L', 'L', '0'\
}

3.2 数据拉取

数据拉取过程,由于Message数据域是不定长的,所以我们通过状态机的形式剖报文

void on_recv(struct bufferevent *bev, void *args)
{int res = 0;struct msg_ctx *mctx = (struct msg_ctx *)args;/* Et mode */while (1) {size_t length = evbuffer_get_length(bufferevent_get_input(bev));if (length == 0) {break;}printf("Evbuffer length: %u\n", length);switch (mctx->status) {case STATUS_MSG_HEAD:if (0 == __do_head_parser(bev, mctx)) {/* state transition */mctx->status = STATUS_MSG_BODY;printf("Head done, data: %u\n", mctx->size);}break;case STATUS_MSG_BODY:if (0 == __do_body_parser(bev, mctx)) {/* state transition */mctx->status = STATUS_MSG_HEAD;printf("Body done, data: %u\n", mctx->size);}break;default:assert(0);break;}}return;
}

Message头部信息,我们主要需要区分是长消息、还是短消息;是独立消息、还是组合消息。

enum {ZMTP_FLAGS_SCMD = 0x04,ZMTP_FLAGS_LCMD = 0x06,ZMTP_FLAGS_SMSG_MORE = 0x01,ZMTP_FLAGS_LMSG_MORE = 0x03,ZMTP_FLAGS_SMSG_LAST = 0x00,ZMTP_FLAGS_LMSG_LAST = 0x02,
};static int __do_head_parser(struct bufferevent *bev, struct msg_ctx *mctx)
{if (mctx->flags == 0xFF) {bufferevent_read(bev, &mctx->flags, sizeof(u8));printf("Head flag: %x\n", mctx->flags);}if (mctx->size == 0x00) {size_t expect = 0;size_t length = evbuffer_get_length(bufferevent_get_input(bev));switch (mctx->flags) {case ZMTP_FLAGS_SMSG_MORE:case ZMTP_FLAGS_SMSG_LAST:expect = sizeof(u8);printf("Body short\n");break;case ZMTP_FLAGS_LMSG_MORE:case ZMTP_FLAGS_LMSG_LAST:expect = sizeof(u64);printf("Body long\n");break;default:assert(0);break;}if (length < expect) {printf("Retry\n");return RETRY;}size_t rlen = bufferevent_read(bev, &mctx->size, expect);assert(rlen == expect);if (expect == sizeof(u64)) {mctx->size = ntohll(mctx->size);}assert(mctx->size > 0);}return 0;
}

然后是数据域状态下的数据读取了,这个主要考虑缓冲消息到evbuffer中,消息收全了最后才调用__do_something__do_reset函数

static int __do_body_parser(struct bufferevent *bev, struct msg_ctx *mctx)
{char buffer[SIZE_LINE_LONG];size_t rmax = _MIN(sizeof(buffer), mctx->size - evbuffer_get_length(mctx->evbuf));size_t rlen = bufferevent_read(bev, buffer, rmax);int res = evbuffer_add(mctx->evbuf, buffer, rlen);assert(res == 0);if (evbuffer_get_length(mctx->evbuf) >= mctx->size) {printf("Body done, length: %u/%u\n",evbuffer_get_length(mctx->evbuf), mctx->size);__do_something(mctx);__do_reset(mctx);return 0;}printf("Retry\n");return RETRY;
}static void __do_reset(struct msg_ctx *mctx)
{mctx->status = STATUS_MSG_HEAD;mctx->flags = 0xFF;mctx->size  = 0x00;evbuffer_drain(mctx->evbuf, evbuffer_get_length(mctx->evbuf));
}static void __do_something(struct msg_ctx *mctx)
{printf("-- 0x%x\n", mctx->flags);printf("-- %d\n", mctx->size);while (evbuffer_get_length(mctx->evbuf) > 0) {char buffer[SIZE_LINE_LONG];ssize_t rlen = evbuffer_remove(mctx->evbuf, buffer, sizeof(buffer));printf("-- %s\n", buffer);}
}

3.3 运行结果

短消息的拉取:

./pullx 127.0.0.1:5555Program start...
Greeting done...
Handsharke size: 260000  05 52 45 41 44 59 0b 53 - 6f 63 6b 65 74 2d 54 79         .READY.Socket-Ty
0010  70 65 00 00 00 04 50 55 - 53 48 ** ** ** ** ** **         pe....PUSH
Handsharke done...
Evbuffer length: 14
Head flag: 0
Body short
Head done, data: 12
Evbuffer length: 12
Body done, length: 12/12
-- 0x0
-- 12
-- Data-   -000
Body done, data: 0
Evbuffer length: 14
Head flag: 0
Body short
Head done, data: 12
Evbuffer length: 12
Body done, length: 12/12
-- 0x0
-- 12
-- Data-   -001
Body done, data: 0

长消息的拉取:

./pullx 127.0.0.1:5555
Program start...
Greeting done...
Handsharke size: 260000  05 52 45 41 44 59 0b 53 - 6f 63 6b 65 74 2d 54 79         .READY.Socket-Ty
0010  70 65 00 00 00 04 50 55 - 53 48 ** ** ** ** ** **         pe....PUSH
Handsharke done...
Evbuffer length: 1033
Head flag: 2
Body long
Head done, data: 1024
Evbuffer length: 1024
Body done, length: 1024/1024
-- 0x2
-- 1024
-- Data-   -000
Body done, data: 0
Evbuffer length: 1033
Head flag: 2
Body long
Head done, data: 1024
Evbuffer length: 1024
Body done, length: 1024/1024
-- 0x2
-- 1024
-- Data-   -001
Body done, data: 0
Evbuffer length: 1033
Head flag: 2
Body long
Head done, data: 1024
Evbuffer length: 1024
Body done, length: 1024/1024

4. 结论

    通过本次试验,验证了直接剖取zmtp方法的可行性;但是要到工程中实践,还得思考几个问题:

  1. push-pull协议简单,但router-dealer、pub-sub模式、加密、认证协议的剖析会更加复杂;
  2. socket维护的问题,需要考虑断线重连、重新握手的问题;
  3. 协议版本的兼容性;

这篇关于Linux下使用ZMQ实践之与libevent结合的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/672047

相关文章

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Linux内核之内核裁剪详解

《Linux内核之内核裁剪详解》Linux内核裁剪是通过移除不必要的功能和模块,调整配置参数来优化内核,以满足特定需求,裁剪的方法包括使用配置选项、模块化设计和优化配置参数,图形裁剪工具如makeme... 目录简介一、 裁剪的原因二、裁剪的方法三、图形裁剪工具四、操作说明五、make menuconfig

Golang使用minio替代文件系统的实战教程

《Golang使用minio替代文件系统的实战教程》本文讨论项目开发中直接文件系统的限制或不足,接着介绍Minio对象存储的优势,同时给出Golang的实际示例代码,包括初始化客户端、读取minio对... 目录文件系统 vs Minio文件系统不足:对象存储:miniogolang连接Minio配置Min

使用Python绘制可爱的招财猫

《使用Python绘制可爱的招财猫》招财猫,也被称为“幸运猫”,是一种象征财富和好运的吉祥物,经常出现在亚洲文化的商店、餐厅和家庭中,今天,我将带你用Python和matplotlib库从零开始绘制一... 目录1. 为什么选择用 python 绘制?2. 绘图的基本概念3. 实现代码解析3.1 设置绘图画

使用Python实现大文件切片上传及断点续传的方法

《使用Python实现大文件切片上传及断点续传的方法》本文介绍了使用Python实现大文件切片上传及断点续传的方法,包括功能模块划分(获取上传文件接口状态、临时文件夹状态信息、切片上传、切片合并)、整... 目录概要整体架构流程技术细节获取上传文件状态接口获取临时文件夹状态信息接口切片上传功能文件合并功能小

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

Linux使用nohup命令在后台运行脚本

《Linux使用nohup命令在后台运行脚本》在Linux或类Unix系统中,后台运行脚本是一项非常实用的技能,尤其适用于需要长时间运行的任务或服务,本文我们来看看如何使用nohup命令在后台... 目录nohup 命令简介基本用法输出重定向& 符号的作用后台进程的特点注意事项实际应用场景长时间运行的任务服

Python3中Sanic中间件的使用

《Python3中Sanic中间件的使用》Sanic框架中的中间件是一种强大的工具,本文就来介绍Python3中Sanic中间件的使用,具有一定的参考价值,感兴趣的可以了解一下... 目录Sanic 中间件的工作流程中间件的使用1. 全局中间件2. 路由中间件3. 异常处理中间件4. 异步中间件5. 优先级

SpringBoot使用注解集成Redis缓存的示例代码

《SpringBoot使用注解集成Redis缓存的示例代码》:本文主要介绍在SpringBoot中使用注解集成Redis缓存的步骤,包括添加依赖、创建相关配置类、需要缓存数据的类(Tes... 目录一、创建 Caching 配置类二、创建需要缓存数据的类三、测试方法Spring Boot 熟悉后,集成一个外

Redis分布式锁使用及说明

《Redis分布式锁使用及说明》本文总结了Redis和Zookeeper在高可用性和高一致性场景下的应用,并详细介绍了Redis的分布式锁实现方式,包括使用Lua脚本和续期机制,最后,提到了RedLo... 目录Redis分布式锁加锁方式怎么会解错锁?举个小案例吧解锁方式续期总结Redis分布式锁如果追求