本文主要是介绍brpc之InputMessenger,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
简介
InputMessenger类是客户端处理socket中响应的处理类
类结构
具体的处理是交给InputMessageHandler来处理,其定义如下
struct InputMessageHandler {typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket, bool read_eof, const void *arg);Parse parse;typedef void (*Process)(InputMessageBase* msg);Process process;typedef bool (*Verify)(const InputMessageBase* msg);Verify verify;// An argument associated with the handler.const void* arg;// Name of this handler, must be string constant.const char* name;
};
parse:用于解析响应
process:解析后作具体的处理
verify:主要用于process前的校验
初始化
InputMessenger的创建是通过get_or_new_client_side_messenger
InputMessenger* g_messenger = NULL;
static pthread_once_t g_messenger_init = PTHREAD_ONCE_INIT;
static void InitClientSideMessenger() {g_messenger = new InputMessenger;
}
InputMessenger* get_or_new_client_side_messenger() {pthread_once(&g_messenger_init, InitClientSideMessenger);return g_messenger;
}
在注册过多所有的Protocol后,对于协议支持处理process_response方法的,会创建对应的InputMessageHandler,将Protocol的parse和process_response添加到InputMessageHandler,最后通过AddHandler添加到InputMessenger的_handlers中
for (size_t i = 0; i < protocols.size(); ++i) {if (protocols[i].process_response) {InputMessageHandler handler;// `process_response' is required at client sidehandler.parse = protocols[i].parse;handler.process = protocols[i].process_response;// No need to verify at client sidehandler.verify = NULL;handler.arg = NULL;handler.name = protocols[i].name;if (get_or_new_client_side_messenger()->AddHandler(handler) != 0) {exit(1);}}}
消息处理
brpc使用的边缘触发,触发处理函数是InputMessenger的静态方法OnNewMessages
- 调用Socket的DoRead从网络上读取数据
- 调用InputMessenger的ProcessNewMessage来处理消息
- 调用CutInputMessage选择正确的Prototol来解析接收到的数据
- 调用QueueMessage让线程来执行处理,如果失败,则本线程来执行处理
static void QueueMessage(InputMessageBase* to_run_msg,int* num_bthread_created,bthread_keytable_pool_t* keytable_pool) {if (!to_run_msg) {return;}// Create bthread for last_msg. The bthread is not scheduled// until bthread_flush() is called (in the worse case).// TODO(gejun): Join threads.bthread_t th;bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?BTHREAD_ATTR_PTHREAD :BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;tmp.keytable_pool = keytable_pool;if (bthread_start_background(&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {++*num_bthread_created;} else {ProcessInputMessage(to_run_msg);}
}
这篇关于brpc之InputMessenger的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!