brpc之InputMessenger

2024-01-14 18:20
文章标签 brpc inputmessenger

本文主要是介绍brpc之InputMessenger,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

InputMessenger类是客户端处理socket中响应的处理类

类结构

InputMessageBase
InputMessenger
-InputMessageHandler* _handlers
+int AddHandler(const InputMessageHandler& handler)
#void OnNewMessages(Socket* m)
-int ProcessNewMessage(Socket* m, ssize_t bytes, bool read_eof,const uint64_t received_us, const uint64_t base_realtime,InputMessageClosure& last_msg)
InputMessageHandler

具体的处理是交给InputMessageHandler来处理,其定义如下

struct InputMessageHandler {typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket, bool read_eof, const void *arg);Parse parse;typedef void (*Process)(InputMessageBase* msg);Process process;typedef bool (*Verify)(const InputMessageBase* msg);Verify verify;// An argument associated with the handler.const void* arg;// Name of this handler, must be string constant.const char* name;
};

parse:用于解析响应
process:解析后作具体的处理
verify:主要用于process前的校验

初始化

InputMessenger的创建是通过get_or_new_client_side_messenger

InputMessenger* g_messenger = NULL;
static pthread_once_t g_messenger_init = PTHREAD_ONCE_INIT;
static void InitClientSideMessenger() {g_messenger = new InputMessenger;
}
InputMessenger* get_or_new_client_side_messenger() {pthread_once(&g_messenger_init, InitClientSideMessenger);return g_messenger;
}

在注册过多所有的Protocol后,对于协议支持处理process_response方法的,会创建对应的InputMessageHandler,将Protocol的parse和process_response添加到InputMessageHandler,最后通过AddHandler添加到InputMessenger的_handlers中

for (size_t i = 0; i < protocols.size(); ++i) {if (protocols[i].process_response) {InputMessageHandler handler;// `process_response' is required at client sidehandler.parse = protocols[i].parse;handler.process = protocols[i].process_response;// No need to verify at client sidehandler.verify = NULL;handler.arg = NULL;handler.name = protocols[i].name;if (get_or_new_client_side_messenger()->AddHandler(handler) != 0) {exit(1);}}}

消息处理

brpc使用的边缘触发,触发处理函数是InputMessenger的静态方法OnNewMessages

  • 调用Socket的DoRead从网络上读取数据
  • 调用InputMessenger的ProcessNewMessage来处理消息
    • 调用CutInputMessage选择正确的Prototol来解析接收到的数据
    • 调用QueueMessage让线程来执行处理,如果失败,则本线程来执行处理
static void QueueMessage(InputMessageBase* to_run_msg,int* num_bthread_created,bthread_keytable_pool_t* keytable_pool) {if (!to_run_msg) {return;}// Create bthread for last_msg. The bthread is not scheduled// until bthread_flush() is called (in the worse case).// TODO(gejun): Join threads.bthread_t th;bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?BTHREAD_ATTR_PTHREAD :BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;tmp.keytable_pool = keytable_pool;if (bthread_start_background(&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {++*num_bthread_created;} else {ProcessInputMessage(to_run_msg);}
}

这篇关于brpc之InputMessenger的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/606089

相关文章

brpc profiler

cpu profiler cpu profiler | bRPC MacOS的额外配置 在MacOS下,gperftools中的perl pprof脚本无法将函数地址转变成函数名,解决办法是: 安装standalone pprof,并把下载的pprof二进制文件路径写入环境变量GOOGLE_PPROF_BINARY_PATH中安装llvm-symbolizer(将函数符号转化为函数

brpc:WorkStealingQueue

代码 #ifndef BTHREAD_WORK_STEALING_QUEUE_H#define BTHREAD_WORK_STEALING_QUEUE_H#include "butil/macros.h"#include "butil/atomicops.h"#include "butil/logging.h"namespace bthread {template <typename T>

学习brpc:echo服务

Echo同步客户端 server 端 #include <gflags/gflags.h>#include <json2pb/pb_to_json.h>#include <brpc/server.h>#include "butil/endpoint.h"#include "echo.pb.h"// flags,用于配置serverDEFINE_bool(echo_attachment

brpc之ResourcePool

简介 ResourcePool用于管理资源,负责资源的分配以及回收 结构 BlockGroup:资源池中包含多个BlockGroup,最多65536个 Block:一个BlockGroup中包含多个Block,最多(1<<16)个;1个Block中包含BLOCK_NITEM个类型为T的资源,BLOCK_NITEM由类模板ResourcePoolBlockItemNum中的静态成员value

brpc负载均衡load balance和服务发现name servicing

1.SharedLoadBalancer(load_balancer.h):包含LoadBalancer指针_lb,AddServersInBatch 2.LoadBalancerWithNaming:继承SharedLoadBalancer和NamingServiceWatcher 2.1Init函数:SharedLoadBalancer::Init,new一个load balance对象

brpc: a little source code

之前在https://www.yuque.com/treblez/qksu6c/nqe8ip59cwegl6rk?singleDoc# 《olap/clickhouse-编译器优化与向量化》中我谈过brpc的汇编控制bthread。本文就来看一下brpc作为一个高性能的rpc实现,除了自定义线程栈之外,代码还有什么优秀之处。 因为时间原因,本文不做深入分析,只是解读下几个有意思的模块。 用户态f

【brpc学习实践十三】基于brpc的redis client的实现

brpc支持了redis协议,提供了相关redis访问接口,充分利用了bthread,可以坐到比hiredis更高效。 brpc redis与hiredis的对比 相比使用hiredis(官方client)的优势有: 线程安全。用户不需要为每个线程建立独立的client。支持同步、异步、批量同步、批量异步等访问方式,能使用ParallelChannel等组合访问方式。支持多种连接方式。支持超

【brpc学习实践十三】基于brpc的redis client的实现

brpc支持了redis协议,提供了相关redis访问接口,充分利用了bthread,可以坐到比hiredis更高效。 brpc redis与hiredis的对比 相比使用hiredis(官方client)的优势有: 线程安全。用户不需要为每个线程建立独立的client。支持同步、异步、批量同步、批量异步等访问方式,能使用ParallelChannel等组合访问方式。支持多种连接方式。支持超

【brpc学习实践】ParallelChannel的使用与并行请求

概览 ParallelChannel (有时被称为“pchan”)同时访问其包含的sub channel,并合并它们的结果。用户可通过CallMapper修改请求,通过ResponseMerger合并结果。ParallelChannel看起来就像是一个Channel: 支持同步和异步访问。 发起异步操作后可以立刻删除。 可以取消。 支持超时。 任何brpc::ChannelBas

【brpc学习实践七】dummy server、DynamicPartitionChannel

dummy server 如果你的程序只使用了baidu-rpc的client或根本没有使用baidu-rpc,但你也想使用baidu-rpc的内置服务,只要在程序中启动一个空的server就行了,这种server我们称为dummy server。 Dummy server 可以用于原型设计和开发目的,作为简单的 http 服务器使用,多数场景用不上。 brpc怎么开启dummy server