本文主要是介绍Dubbo源码分析----处理请求,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Dubbo默认是使用Netty进行通信的,那么Netty会配置一个Handler,来处理一些事件,所以这个Handler是核心,主要找一下Dubbo初始化Netty的时候设置的Handler
回顾一下网络通信相关,在DubboProtocol中,会调用createServer方法返回一个Server对象,这是一个切入点
private ExchangeServer createServer(URL url) {//....ExchangeServer server;try {server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}//....return server;}
bind方法内部对requestHandler进行了层层的装饰,但是这里也不是我们要找的地方,在NettyServer的doOpen才是我们要找的(具体流程可以看下网络通信的那篇文章)
protected void doOpen() throws Throwable {//....bootstrap.setPipelineFactory(new ChannelPipelineFactory() {public ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();/*int idleTimeout = getIdleTimeout();if (idleTimeout > 10000) {pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));}*/pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});// ....}
可以看到,这个NettyHandler是处理请求的入口,看下NettyHandler的结构图,其中最下面的handler是requestHandler
当Netty接收到请求的时候,会调用NettyServer的messageReceived方法,请求依次经过图中红色框框的Handler,不同的Handler有不同的功能,只需要看下几个关键的Handler
AllChannelHandler
这个Handler在讲Dubbo的Dispatcher的ThreadPool的时候讲过,就是将请求交给线程池处理
public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}
那么看下ChannelEventRunnable的run方法
public void run() {switch (state) {case CONNECTED:handler.connected(channel);break;case DISCONNECTED:handler.disconnected(channel);break;case SENT:handler.sent(channel,message);break;case RECEIVED:handler.received(channel, message);break;case CAUGHT:handler.caught(channel, exception);break;default:logger.warn("unknown state: " + state + ", message is " + message);}}
在线程中将请求交由下一个handler处理,从上面的图中可以知道AllChannelHandler下两个handler是DecodeHandler和HeaderExchangeHandler,这两个Handler做了一些特殊处理,然后将请求转发给requestHandler
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {public Object reply(ExchangeChannel channel, Object message) throws RemotingException {if (message instanceof Invocation) {Invocation inv = (Invocation) message;Invoker<?> invoker = getInvoker(channel, inv);// 通过参数信息获取本地Invoker//....RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());return invoker.invoke(inv);//调用}throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}//....};
getInvoker方法如下
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{boolean isCallBackServiceInvoke = false;boolean isStubServiceInvoke = false;int port = channel.getLocalAddress().getPort();String path = inv.getAttachments().get(Constants.PATH_KEY);//xxx.xxx.xxx.xxxService//如果是客户端的回调服务.isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));if (isStubServiceInvoke){port = channel.getRemoteAddress().getPort();}//callbackisCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;if(isCallBackServiceInvoke){path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());}String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));// port+serviceName+version+group区分的key// 暴露服务的时候会将Exporter放到map中DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);if (exporter == null)throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);return exporter.getInvoker();}
getInvoker也是比较简单的,这时候得到的Invoker相当于是本地服务的一个代理对象,invoke之后就会调用到真实的方法
这篇关于Dubbo源码分析----处理请求的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!