Thrfit 服务端请求处理流程

2024-09-05 09:48

本文主要是介绍Thrfit 服务端请求处理流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用同步的非阻塞的服务端的请求处理流程

实现

IDL

  • helloworld.thrift
namespace java io.github.helloworlde.thriftstruct HelloMessage {1: required string message,
}struct HelloResponse {1: required string message,
}service HelloService {HelloResponse sayHello(1: HelloMessage request);
}

服务端实现

使用 TThreadedSelectorServer 作为服务端,支持接收连接,处理 IO 事件,执行请求由不同的线程实现;底层连接使用 ServerSocket

public class NonblockingServer {@SneakyThrowspublic static void main(String[] args) {HelloServiceImpl helloService = new HelloServiceImpl();HelloService.Processor<HelloService.Iface> helloServiceProcessor = new HelloService.Processor<>(helloService);TNonblockingServerTransport transport = new TNonblockingServerSocket(9090);// 配置参数以及处理器TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(transport).selectorThreads(4).workerThreads(10).acceptQueueSizePerThread(20).processor(helloServiceProcessor);TServer server = new TThreadedSelectorServer(serverArgs);server.serve();}
}

请求处理流程

1. 启动 Server

TServer server = new TThreadedSelectorServer(serverArgs);
server.serve();
  • org.apache.thrift.server.AbstractNonblockingServer#serve

启动 Server,启动用于连接的线程 AcceptThread 和用于处理 IO 事件的多个线程 SelectorThread;然后开始监听 IO 事件,由线程池处理请求

public void serve() {// 启动if (!startThreads()) {return;}// 开始监听if (!startListening()) {return;}// 修改状态setServing(true);// 阻塞直到关闭waitForShutdown();setServing(false);// 停止监听器stopListening();
}
  • org.apache.thrift.server.TThreadedSelectorServer#startThreads

启动用于连接的线程 AcceptThread 和用于处理 IO 事件的多个线程 SelectorThread

protected boolean startThreads() {try {// 创建选择线程,并添加到集合中for (int i = 0; i < args.selectorThreads; ++i) {selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));}// 创建处理连接的负载均衡, 创建处理连接的线程acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, createSelectorThreadLoadBalancer(selectorThreads));// 启动选择线程for (SelectorThread thread : selectorThreads) {thread.start();}// 启动连接的线程acceptThread.start();return true;} catch (IOException e) {LOGGER.error("Failed to start threads!", e);return false;}
}
  • org.apache.thrift.transport.TNonblockingServerSocket#listen

开始监听

public void listen() throws TTransportException {// Make sure not to block on acceptif (serverSocket_ != null) {try {serverSocket_.setSoTimeout(0);} catch (SocketException sx) {LOGGER.error("Socket exception while setting socket timeout", sx);}}
}

2. 处理连接事件

  • org.apache.thrift.server.TThreadedSelectorServer.AcceptThread#run

连接事件由 AcceptThread 线程独立处理;会循环监听 Selector事件,当有新的连接事件时,会建立连接

public void run() {try {if (eventHandler_ != null) {// 通知 Server 开始启动eventHandler_.preServe();}while (!stopped_) {// 选择处理连接select();}} finally {acceptSelector.close();TThreadedSelectorServer.this.stop();}
}
  • org.apache.thrift.server.TThreadedSelectorServer.AcceptThread#select

会不断从 Selector获取事件,判断如果是 accept 事件,则处理,并建立连接

private void select() {try {// 等待连接事件acceptSelector.select();// 处理接收到的事件Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();if (!key.isValid()) {continue;}if (key.isAcceptable()) {// 建立连接handleAccept();} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} catch (IOException e) {LOGGER.warn("Got an IOException while selecting!", e);}
}
  • org.apache.thrift.server.TThreadedSelectorServer.AcceptThread#handleAccept

会通过底层的 ServerSocketChannel 建立连接,然后将这个连接添加到 SelectorThread 的队列中,由 SelectorThread处理 IO 事件

private void handleAccept() {// 建立连接final TNonblockingTransport client = doAccept();if (client != null) {// 将连接传递给选择线程final SelectorThread targetThread = threadChooser.nextThread();// 如果策略是尽快建立连接,则添加到处理的队列中if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {doAddAccept(targetThread, client);} else {try {// 如果是 FAIR_ACCEPT,则提交异步任务进行添加invoker.submit(new Runnable() {public void run() {doAddAccept(targetThread, client);}});} catch (RejectedExecutionException rx) {LOGGER.warn("ExecutorService rejected accept registration!", rx);// close immediatelyclient.close();}}}
}
  • org.apache.thrift.transport.TNonblockingServerSocket#acceptImpl

建立连接,返回新的 TNonblockingSocket

protected TNonblockingSocket acceptImpl() throws TTransportException {if (serverSocket_ == null) {throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");}try {// 接受连接SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel == null) {return null;}// 使用 Channel 构建 SocketTNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);tsocket.setTimeout(clientTimeout_);return tsocket;} catch (IOException iox) {throw new TTransportException(iox);}
}
  • org.apache.thrift.server.TThreadedSelectorServer.SelectorThread#addAcceptedConnection

将连接添加到 SelectorThread的队列中,由 SelectorThread处理 IO 事件

public boolean addAcceptedConnection(TNonblockingTransport accepted) {try {// 放入队列中acceptedQueue.put(accepted);} catch (InterruptedException e) {LOGGER.warn("Interrupted while adding accepted connection!", e);return false;}selector.wakeup();return true;
}

3. 处理 IO 事件

IO 事件由 SelectorThread 处理

  • org.apache.thrift.server.TThreadedSelectorServer.SelectorThread#run

轮询读取事件,如果是 IO 事件,则分别处理;如果是新的连接,则注册 Selector

public void run() {try {while (!stopped_) {// 选择读取或写入事件select();// 处理新的连接processAcceptedConnections();// 改变需要改变的状态processInterestChanges();}// 如果停止了,则清理选择for (SelectionKey selectionKey : selector.keys()) {cleanupSelectionKey(selectionKey);}} catch (Throwable t) {LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);} finally {// 关闭selector.close();TThreadedSelectorServer.this.stop();}
}
  • org.apache.thrift.server.TThreadedSelectorServer.SelectorThread#select

处理 IO 事件,根据事件类型分别处理读取或者写入

private void select() {try {// 获取事件doSelect();Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// 如果无效则跳过if (!key.isValid()) {cleanupSelectionKey(key);continue;}if (key.isReadable()) {// 如果是读取则处理读取事件handleRead(key);} else if (key.isWritable()) {// 如果是写入则处理写入handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} catch (IOException e) {LOGGER.warn("Got an IOException while selecting!", e);}
}
处理读取事件
  • org.apache.thrift.server.AbstractNonblockingServer.AbstractSelectThread#handleRead

在处理读取事件时,会读取整个帧,当完全读取时,会调用 requestInvoke 方法,通过线程池处理请求

protected void handleRead(SelectionKey key) {// 获取帧FrameBuffer buffer = (FrameBuffer) key.attachment();// 如果没有可读取的,则清理if (!buffer.read()) {cleanupSelectionKey(key);return;}// if the buffer's frame read is complete, invoke the method.// 如果 buffer 完全读取,则执行处理,如果失败则清理if (buffer.isFrameFullyRead()) {if (!requestInvoke(buffer)) {cleanupSelectionKey(key);}}
}
  • org.apache.thrift.server.TThreadedSelectorServer#requestInvoke

处理调用,会将帧封装为 Runnable 任务,提交给线程池执行

protected boolean requestInvoke(FrameBuffer frameBuffer) {// 封装为 RunnableRunnable invocation = getRunnable(frameBuffer);if (invoker != null) {try {// 执行处理invoker.execute(invocation);return true;} catch (RejectedExecutionException rx) {LOGGER.warn("ExecutorService rejected execution!", rx);return false;}} else {// Invoke on the caller's thread// 如果没有线程池,由当前线程直接处理invocation.run();return true;}
}
处理写入事件
  • org.apache.thrift.server.AbstractNonblockingServer.AbstractSelectThread#handleWrite

处理写入事件,调用 FrameBuffer 的写入方法进行处理

protected void handleWrite(SelectionKey key) {FrameBuffer buffer = (FrameBuffer) key.attachment();if (!buffer.write()) {cleanupSelectionKey(key);}
}
  • org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer#write

由 Transport 执行写入,最终由 SocketChannel 执行,将响应内容发送给客户端

public boolean write() {if (state_ == FrameBufferState.WRITING) {// 写入if (trans_.write(buffer_) < 0) {return false;}// 如果没有待写入的,则切换到读取if (buffer_.remaining() == 0) {prepareRead();}return true;}return false;
}

4. 执行请求

  • org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer#invoke

在处理读取事件时,会将 FrameBuffer 包装为 Runnable,提交给线程池执行;最终由 FrameBuffer处理
会获取 Processor,然后调用 process 方法进行处理

public void invoke() {frameTrans_.reset(buffer_.array());response_.reset();try {// 如果有事件处理器,则触发if (eventHandler_ != null) {eventHandler_.processContext(context_, inTrans_, outTrans_);}// 获取处理器,调用处理方法processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);responseReady();return;} catch (TException te) {}// This will only be reached when there is a throwable.state_ = FrameBufferState.AWAITING_CLOSE;requestSelectInterestChange();
}
  • org.apache.thrift.TBaseProcessor#process

在处理时,根据方法名获取具体的处理函数,然后调用响应的处理方法进行处理

public void process(TProtocol in, TProtocol out) throws TException {TMessage msg = in.readMessageBegin();ProcessFunction fn = processMap.get(msg.name);if (fn == null) {TProtocolUtil.skip(in, TType.STRUCT);in.readMessageEnd();TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));x.write(out);out.writeMessageEnd();out.getTransport().flush();} else {fn.process(msg.seqid, in, out, iface);}
}
  • org.apache.thrift.ProcessFunction#process

读取请求信息,反序列化为对象,然后调用 getResult 方法执行实现逻辑,获取响应;如果不是 oneway 的请求,则将相应结果写入流中,发送给客户端

public final void process(int seqid,TProtocol iprot,TProtocol oprot,I iface) throws TException {// 获取空参数实例T args = getEmptyArgsInstance();// 读取args.read(iprot);iprot.readMessageEnd();TSerializable result = null;byte msgType = TMessageType.REPLY;// 获取结果result = getResult(iface, args);// 如果不是 oneway 的,则写入响应结果if (!isOneway()) {oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));result.write(oprot);oprot.writeMessageEnd();oprot.getTransport().flush();}
}
  • io.github.helloworlde.thrift.HelloService.Processor.sayHello#getResult

由生成的代码处理,会先构建一个响应结构体,然后调用相应的方法进行处理,返回结果

public sayHello_result getResult(I iface, sayHello_args args) throws org.apache.thrift.TException {sayHello_result result = new sayHello_result();result.success = iface.sayHello(args.request);return result;
}
  • io.github.helloworlde.thrift.HelloServiceImpl#sayHello

具体的逻辑处理,返回响应

public HelloResponse sayHello(HelloMessage request) throws TException {String message = request.getMessage();HelloResponse response = new HelloResponse();response.setMessage("Hello " + message);return response;
}

4. 写入响应

  • org.apache.thrift.ProcessFunction#process

在处理完请求之后,会判断是否是 oneway 请求,如果不是,则会执行写入响应

if(!isOneway()) {oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));result.write(oprot);oprot.writeMessageEnd();oprot.getTransport().flush();
}
  • org.apache.thrift.protocol.TBinaryProtocol#writeMessageBegin

写入响应时,会先写入响应头;会将版本信息,消息类型,方法的名称和请求 ID 一起写入

public void writeMessageBegin(TMessage message) throws TException {if (strictWrite_) {int version = VERSION_1 | message.type;writeI32(version);writeString(message.name);writeI32(message.seqid);} else {writeString(message.name);writeByte(message.type);writeI32(message.seqid);}
}
  • io.github.helloworlde.thrift.HelloResponse.HelloResponseStandardScheme#write

随后写入响应内容,将对象序列化为字节

public void write(org.apache.thrift.protocol.TProtocol oprot, HelloResponse struct) throws org.apache.thrift.TException {struct.validate();oprot.writeStructBegin(STRUCT_DESC);if (struct.message != null) {oprot.writeFieldBegin(MESSAGE_FIELD_DESC);oprot.writeString(struct.message);oprot.writeFieldEnd();}oprot.writeFieldStop();oprot.writeStructEnd();
}

然后会写入响应结尾符,由 SelectorThread 处理写入事件,最终将请求发送给客户端

参考文档

  • helloworlde/thrift-java-sample

这篇关于Thrfit 服务端请求处理流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1138622

相关文章

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Linux流媒体服务器部署流程

《Linux流媒体服务器部署流程》文章详细介绍了流媒体服务器的部署步骤,包括更新系统、安装依赖组件、编译安装Nginx和RTMP模块、配置Nginx和FFmpeg,以及测试流媒体服务器的搭建... 目录流媒体服务器部署部署安装1.更新系统2.安装依赖组件3.解压4.编译安装(添加RTMP和openssl模块

Android 悬浮窗开发示例((动态权限请求 | 前台服务和通知 | 悬浮窗创建 )

《Android悬浮窗开发示例((动态权限请求|前台服务和通知|悬浮窗创建)》本文介绍了Android悬浮窗的实现效果,包括动态权限请求、前台服务和通知的使用,悬浮窗权限需要动态申请并引导... 目录一、悬浮窗 动态权限请求1、动态请求权限2、悬浮窗权限说明3、检查动态权限4、申请动态权限5、权限设置完毕后

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat

通过prometheus监控Tomcat运行状态的操作流程

《通过prometheus监控Tomcat运行状态的操作流程》文章介绍了如何安装和配置Tomcat,并使用Prometheus和TomcatExporter来监控Tomcat的运行状态,文章详细讲解了... 目录Tomcat安装配置以及prometheus监控Tomcat一. 安装并配置tomcat1、安装

MySQL的cpu使用率100%的问题排查流程

《MySQL的cpu使用率100%的问题排查流程》线上mysql服务器经常性出现cpu使用率100%的告警,因此本文整理一下排查该问题的常规流程,文中通过代码示例讲解的非常详细,对大家的学习或工作有一... 目录1. 确认CPU占用来源2. 实时分析mysql活动3. 分析慢查询与执行计划4. 检查索引与表

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed