7.4 服务远程暴露 - 创建Exporter与启动netty服务端

2024-05-07 09:18

本文主要是介绍7.4 服务远程暴露 - 创建Exporter与启动netty服务端,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

7.4 服务远程暴露 - 创建Exporter与启动netty服务端

为了安全:服务启动的ip全部使用10.10.10.10

远程服务的暴露总体步骤:

  • 将ref封装为invoker
  • 将invoker转换为exporter
  • 启动netty
  • 注册服务到zookeeper
  • 订阅
  • 返回新的exporter实例

服务远程暴露的代码:

复制代码
 1             //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
 2             if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
 3                 if (logger.isInfoEnabled()) {
 4                     logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
 5                 }
 6                 if (registryURLs != null && registryURLs.size() > 0
 7                         && url.getParameter("register", true)) {
 8                     for (URL registryURL : registryURLs) {
 9                         url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
10                         URL monitorUrl = loadMonitor(registryURL);
11                         if (monitorUrl != null) {
12                             url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
13                         }
14                         if (logger.isInfoEnabled()) {
15                             logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
16                         }
17                         Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
18                         Exporter<?> exporter = protocol.export(invoker);
19                         exporters.add(exporter);
20                     }
21                 } else {
22                     Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
23                     Exporter<?> exporter = protocol.export(invoker);
24                     exporters.add(exporter);
25                 }
26             }
复制代码

首先将实现类ref封装为Invoker,之后将invoker转换为exporter,最后将exporter放入缓存List<Exporter> exporters中。

 

一 将实现类ref封装为Invoker

1 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

1  为registryURL拼接export=providerUrl参数

一开始的registryURL:

registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&pid=887&registry=zookeeper&timestamp=1507096022072

registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())这句代码为registryURL添加了参数并编码:(这里给出没有编码的样子)

1 export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=887&side=provider&timestamp=1507096024334

2  ProxyFactory$Adaptive.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)

复制代码
 1     public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg2 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg2;
 5         String extName = url.getParameter("proxy", "javassist");//结果是javassist
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
 8         com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
 9         return extension.getInvoker(arg0, arg1, arg2);
10     }
复制代码

这里,本来是调用JavassistProxyFactory的getInvoker方法,但是JavassistProxyFactory被StubProxyFactoryWrapper给aop了。

3  StubProxyFactoryWrapper.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)

1     public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
2         return proxyFactory.getInvoker(proxy, type, url);
3     }

4  JavassistProxyFactory.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)

复制代码
 1     public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
 2         // TODO Wrapper类不能正确处理带$的类名
 3         final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
 4         return new AbstractProxyInvoker<T>(proxy, type, url) {
 5             @Override
 6             protected Object doInvoke(T proxy, String methodName,
 7                                       Class<?>[] parameterTypes,
 8                                       Object[] arguments) throws Throwable {
 9                 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
10             }
11         };
12     }
复制代码

首先是创建Wrapper类:Wrapper.getWrapper(Class<DemoServiceImpl>)。该类记录了DemoServiceImpl的属性名称,方法名称等信息。关键代码如下:(完整代码见:7.2 服务本地暴露)

复制代码
 1 import com.alibaba.dubbo.common.bytecode.Wrapper;
 2 import java.util.HashMap;
 3 
 4 public class Wrapper1 extends Wrapper {
 5 
 6     public static String[] pns;//property name array
 7     public static java.util.Map pts = new HashMap();//<property key, property value>
 8     public static String[] mns;//method names
 9     public static String[] dmns;//
10     public static Class[] mts0;
55     /**
56      * @param o  实现类
57      * @param n  方法名称
58      * @param p  参数类型
59      * @param v  参数名称
60      * @return
61      * @throws java.lang.reflect.InvocationTargetException
62      */
63     public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
64         com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
65         try {
66             w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
67         } catch (Throwable e) {
68             throw new IllegalArgumentException(e);
69         }
70         try {
71             if ("sayHello".equals(n) && p.length == 1) {
72                 return ($w) w.sayHello((java.lang.String) v[0]);
73             }
74         } catch (Throwable e) {
75             throw new java.lang.reflect.InvocationTargetException(e);
76         }
77         throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
78     }
79 }
复制代码

创建完DemoServiceImpl的Wrapper类之后(实际上该实例在本地暴露的时候已经存入缓存了,这里只是从缓存中拿出来而已),创建一个AbstractProxyInvoker实例。

复制代码
 1     private final T proxy;
 2     private final Class<T> type;
 3     private final URL url;
 4 
 5     public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
 6         if (proxy == null) {
 7             throw new IllegalArgumentException("proxy == null");
 8         }
 9         if (type == null) {
10             throw new IllegalArgumentException("interface == null");
11         }
12         if (!type.isInstance(proxy)) {
13             throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
14         }
15         this.proxy = proxy;
16         this.type = type;
17         this.url = url;
18     }
复制代码

最后创建完成的AbstractProxyInvoker实例属性如下:

  • proxy:DemoServiceImpl实例
  • type:Class<com.alibaba.dubbo.demo.DemoService>
  • url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993&registry=zookeeper&timestamp=1507100319830

这样我们就将ref实现类转换成了Invoker,之后在调用该invoker.invoke(Invocation invocation)的时候,会调用invoker.doInvoke(T proxy, String methodName,Class<?>[] parameterTypes, Object[] arguments)的时候,就会调用相应的实现类proxy的wrapper类的invokeMethod(proxy, methodName, parameterTypes, arguments),该方法又会调用真实的实现类methodName方法。这里可以先给出AbstractProxyInvoker.invoke(Invocation invocation)源码:

复制代码
1     public Result invoke(Invocation invocation) throws RpcException {
2         try {
3             return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
4         } catch (InvocationTargetException e) {
5             return new RpcResult(e.getTargetException());
6         } catch (Throwable e) {
7             throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
8         }
9     }
复制代码

这里的proxy就是上边赋好值的proxy:DemoServiceImpl实例。而方法信息会封装在Invocation对象中,该对象在服务引用时介绍。

 

二  将Invoker转换为Exporter

1 Exporter<?> exporter = protocol.export(invoker)

1  Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker AbstractProxyInvoker实例)

复制代码
 1     public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
 4         if (arg0.getUrl() == null)
 5             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
 6         com.alibaba.dubbo.common.URL url = arg0.getUrl();
 7         String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//registry
 8         if(extName == null)
 9             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
10         com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
11         return extension.export(arg0);
12     }
复制代码

这里,由于aop的原因,首先调用了ProtocolListenerWrapper的export(Invoker<T> invoker),如下:

复制代码
1     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2         if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
3             return protocol.export(invoker);
4         }
5         return new ListenerExporterWrapper<T>(protocol.export(invoker),
6                                               Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
7     }
复制代码

由于协议是“registry”,所以不做任何处理,继续调用ProtocolFilterWrapper的export(Invoker<T> invoker),如下:

1     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2         if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
3             return protocol.export(invoker);
4         }
5         return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
6     }

同理,由于协议是“registry”,所以不做任何处理,继续调用RegistryProtocol.export(final Invoker<T> originInvoker),如下:

复制代码
 1     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2         //export invoker
 3         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 4         //registry provider
 5         final Registry registry = getRegistry(originInvoker);
 6         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
 7         registry.register(registedProviderUrl);
 8         // 订阅override数据
 9         // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
10         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
11         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
12         overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
13         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
14         //保证每次export都返回一个新的exporter实例
15         return new Exporter<T>() {
16             public Invoker<T> getInvoker() {
17                 return exporter.getInvoker();
18             }
19 
20             public void unexport() {
21                 try {
22                     exporter.unexport();
23                 } catch (Throwable t) {
24                     logger.warn(t.getMessage(), t);
25                 }
26                 try {
27                     registry.unregister(registedProviderUrl);
28                 } catch (Throwable t) {
29                     logger.warn(t.getMessage(), t);
30                 }
31                 try {
32                     overrideListeners.remove(overrideSubscribeUrl);
33                     registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
34                 } catch (Throwable t) {
35                     logger.warn(t.getMessage(), t);
36                 }
37             }
38         };
39     }
复制代码

该方法完成了远程暴露的全部流程。

  • 将invoker转换为exporter
  • 启动netty
  • 注册服务到zookeeper
  • 订阅
  • 返回新的exporter实例

2  将invoker转换为exporter并启动netty服务

1 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

doLocalExport(final Invoker<T> originInvoker)

复制代码
 1     /**
 2      * 1 从invoker的URL中的Map<String, String> parameters中获取key为export的地址providerUrl,该地址将是服务注册在zk上的节点
 3      * 2 从 Map<String, ExporterChangeableWrapper<?>> bounds 缓存中获取key为上述providerUrl的exporter,如果有,直接返回,如果没有,创建并返回
 4      * @return
 5      */
 6     @SuppressWarnings("unchecked")
 7     private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
 8         String key = getCacheKey(originInvoker);//根据originInvoker获取providerUrl
 9         ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
10         if (exporter == null) {
11             synchronized (bounds) {
12                 exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
13                 if (exporter == null) {
14                     final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));//存储originInvoker和providerUrl
15                     exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
16                     bounds.put(key, exporter);
17                 }
18             }
19         }
20         return exporter;
21     }
复制代码

2.1 从originInvoker中获取providerUrl

该方法直接首先调用getCacheKey(final Invoker<?> originInvoker)中获取providerUrl,这里的originInvoker就是上述创建出来的AbstractProxyInvoker实例,注意他的url是registry协议的,该url的export参数的value就是我们要获取的providerUrl。获取providerUrl的源码如下:

复制代码
 1     private String getCacheKey(final Invoker<?> originInvoker) {
 2         URL providerUrl = getProviderUrl(originInvoker);
 3         String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
 4         return key;
 5     }
 6 
 7     private URL getProviderUrl(final Invoker<?> origininvoker) {
 8         String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
 9         if (export == null || export.length() == 0) {
10             throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
11         }
12 
13         URL providerUrl = URL.valueOf(export);
14         return providerUrl;
15     }
复制代码

之后一系列的操作,就是获取该providerUrl对应的exporter,之后放入缓存Map<String, ExporterChangeableWrapper<?>> bounds中,所以一个providerUrl只会对应一个exporter。

2.2  创建InvokerDelegete

1 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));

InvokerDelegete是RegistryProtocol的一个静态内部类,该类是一个originInvoker的委托类,该类存储了originInvoker,其父类InvokerWrapper还会存储providerUrl,InvokerWrapper会调用originInvoker的invoke方法,也会销毁invoker。可以管理invoker的生命周期。

复制代码
 1     public static class InvokerDelegete<T> extends InvokerWrapper<T> {
 2         private final Invoker<T> invoker;
 3 
 4         /**
 5          * @param invoker
 6          * @param url     invoker.getUrl返回此值
 7          */
 8         public InvokerDelegete(Invoker<T> invoker, URL url) {
 9             super(invoker, url);
10             this.invoker = invoker;
11         }
12 
13         public Invoker<T> getInvoker() {
14             if (invoker instanceof InvokerDelegete) {
15                 return ((InvokerDelegete<T>) invoker).getInvoker();
16             } else {
17                 return invoker;
18             }
19         }
20     }
复制代码

InvokerWrapper的核心代码:

复制代码
 1 public class InvokerWrapper<T> implements Invoker<T> {
 2     private final Invoker<T> invoker;//originInvoker
 3     private final URL url;//providerUrl
 4 
 5     public InvokerWrapper(Invoker<T> invoker, URL url) {
 6         this.invoker = invoker;
 7         this.url = url;
 8     }
 9 
10     public boolean isAvailable() {
11         return invoker.isAvailable();
12     }
13 
14     public Result invoke(Invocation invocation) throws RpcException {
15         return invoker.invoke(invocation);
16     }
17 
18     public void destroy() {
19         invoker.destroy();
20     }
21 }
复制代码

这样一个InvokerDelegete对象就创建好了,属性如下:

  • invoker:originInvoker(AbstractProxyInvoker对象)
  • InvokerWrapper.invoker:originInvoker(AbstractProxyInvoker对象)
  • url:providerUrl(dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1035&side=provider&timestamp=1507101286063)

2.3  使用DubboProtocol将InvokerDelegete转换为Exporter

1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)

2.3.1  Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker InvokerDelegete对象)

复制代码
 1     public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
 4         if (arg0.getUrl() == null)
 5             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
 6         com.alibaba.dubbo.common.URL url = arg0.getUrl();
 7         String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//dubbo
 8         if(extName == null)
 9             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
10         com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
11         return extension.export(arg0);
12     }
复制代码

该代码再贴最后一遍了。之后调用ProtocolListenerWrapper的ProtocolListenerWrapper.export(Invoker<T> InvokerDelegete),之后调用ProtocolFilterWrapper.export(Invoker<T> InvokerDelegete):首先对InvokerDelegete对象进行8个filter的递归包装,之后使用DubboProtocol对包装后的InvokerDelegete对象进行export。

层层包装的源码:

复制代码
 1     /**
 2      * 1 根据key从url中获取相应的filter的values,再根据这个values和group去获取类上带有@Active注解的filter集合
 3      * 2 之后将这些filter对传入的invoker进行递归包装层invoker(就是一个链表)
 4      */
 5     private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
 6         Invoker<T> last = invoker;
 7         List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
 8         if (filters.size() > 0) {
 9             for (int i = filters.size() - 1; i >= 0; i--) {
10                 final Filter filter = filters.get(i);
11                 final Invoker<T> next = last;
12                 last = new Invoker<T>() {
13 
14                     public Class<T> getInterface() {
15                         return invoker.getInterface();
16                     }
17 
18                     public URL getUrl() {
19                         return invoker.getUrl();
20                     }
21 
22                     public boolean isAvailable() {
23                         return invoker.isAvailable();
24                     }
25 
26                     public Result invoke(Invocation invocation) throws RpcException {
27                         return filter.invoke(next, invocation);
28                     }
29 
30                     public void destroy() {
31                         invoker.destroy();
32                     }
33 
34                     @Override
35                     public String toString() {
36                         return invoker.toString();
37                     }
38                 };
39             }
40         }
41         return last;
42     }
复制代码

上述方法中最重要的就是Invoker的Result invoke(Invocation invocation),在该方法中,是使用了filter.invoke(next, invocation),而这里的next又可能是另一个filter。这里我们打开一个filter来看一下源码:

复制代码
1 @Activate(group = Constants.PROVIDER, order = -110000)
2 public class EchoFilter implements Filter {
3     public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
4         if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
5             return new RpcResult(inv.getArguments()[0]);
6         return invoker.invoke(inv);
7     }
8 }
复制代码

可以看到,该filter会调用传入的next的invoke方法。

这里给出被递归包装后的对象:(命名为InvokerDelegete的filter对象)

复制代码
1 EchoFilter
2 -->ClassLoaderFilter
3    -->GenericFilter
4       -->ContextFilter
5          -->TraceFilter
6             -->TimeoutFilter
7                -->MonitorFilter
8                   -->ExceptionFilter
9                      -->InvokerDelegete对象
复制代码

2.3.2  DubboProtocol.export(Invoker<T> InvokerDelegete的filter对象)

复制代码
    /*** 1 从invoker的url中获取将要暴露的远程服务的key:com.alibaba.dubbo.demo.DemoService:20880(实际上是:serviceGroup/serviceName:serviceVersion:port)* 注意:本地暴露的key就是:com.alibaba.dubbo.demo.DemoService* 2 打开ExchangeServer*/public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// export service.String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);exporterMap.put(key, exporter);//export an stub service for dispaching eventBoolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded."));}} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}openServer(url);return exporter;}
复制代码

首先从“InvokerDelegete的filter对象”中的url获取key,这段代码很简单,就是获取serviceGroup/serviceName:serviceVersion:port这样形式的一个key,这里最后获取到的是com.alibaba.dubbo.demo.DemoService:20880。

之后创建DubboExporter。

2.3.2.1 DubboExporter<T>(InvokerDelegete的filter对象, "com.alibaba.dubbo.demo.DemoService:20880", exporterMap)

复制代码
 1 public class DubboExporter<T> extends AbstractExporter<T> {
 2     //serviceGroup/serviceName:serviceVersion:port, 例如:com.alibaba.dubbo.demo.DemoService:20880
 3     private final String key;//
 4     //{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }
 5     private final Map<String, Exporter<?>> exporterMap;
 6 
 7     public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
 8         super(invoker);
 9         this.key = key;
10         this.exporterMap = exporterMap;
11     }
12 
13     @Override
14     public void unexport() {
15         super.unexport();
16         exporterMap.remove(key);
17     }
18 }
复制代码

注意这里的exporterMap是引用传递。

父类:

复制代码
 1 public abstract class AbstractExporter<T> implements Exporter<T> {
 2     protected final Logger logger = LoggerFactory.getLogger(getClass());
 3     private final Invoker<T> invoker;
 4     private volatile boolean unexported = false;
 5 
 6     public AbstractExporter(Invoker<T> invoker) {
 7         if (invoker == null)
 8             throw new IllegalStateException("service invoker == null");
 9         if (invoker.getInterface() == null)
10             throw new IllegalStateException("service type == null");
11         if (invoker.getUrl() == null)
12             throw new IllegalStateException("service url == null");
13         this.invoker = invoker;
14     }
15 
16     public Invoker<T> getInvoker() {
17         return invoker;
18     }
19 
20     public void unexport() {
21         if (unexported) {
22             return;
23         }
24         unexported = true;
25         getInvoker().destroy();
26     }
27 
28     public String toString() {
29         return getInvoker().toString();
30     }
31 }
复制代码

这里,我们把一个“InvokerDelegete的filter对象”赋给了AbstractExporter的Invoker引用,也就是说从exporter中可以获取到invoker。最后在DubboProtocol.export(Invoker<T> invoker)中执行:exporterMap.put(key, exporter); 这样就将{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }存储起来了。

来看一下现在的DubboExporter实例:

  • key:com.alibaba.dubbo.demo.DemoService:20880
  • invoker:“InvokerDelegete的filter对象”
  • exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }

2.3.2.2 开启ExchangeServer

复制代码
 1     /**
 2      * 从缓存Map<String, ExchangeServer> serverMap中根据"host:port"获取ExchangeServer,如果没有,创建ExchangeServer,之后放入缓存。
 3      * @param url
 4      */
 5     private void openServer(URL url) {
 6         // find server.
 7         String key = url.getAddress();//10.10.10.10:20880
 8         //client 也可以暴露一个只有server可以调用的服务。
 9         boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
10         if (isServer) {
11             ExchangeServer server = serverMap.get(key);
12             if (server == null) {
13                 serverMap.put(key, createServer(url));
14             } else {
15                 //server支持reset,配合override功能使用
16                 server.reset(url);
17             }
18         }
19     }
复制代码

首先从provderUrl中获取host:port作为key,之后从缓存serverMap中获取ExchangeServer,如果没有,创建ExchangeServer,最后以如下方式放入缓存:

Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->ExchangeServer实例 }。

创建ExchangeServer:createServer(URL providerUrl)

复制代码
 1     private ExchangeServer createServer(URL url) {
 2         //默认开启server关闭时发送readonly事件
 3         url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
 4         //默认开启heartbeat
 5         url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 6         String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
 7 
 8         if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
 9             throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10 
11         url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
12         ExchangeServer server;
13         try {
14             server = Exchangers.bind(url, requestHandler);
15         } catch (RemotingException e) {
16             throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17         }
18         str = url.getParameter(Constants.CLIENT_KEY);
19         if (str != null && str.length() > 0) {
20             Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21             if (!supportedTypes.contains(str)) {
22                 throw new RpcException("Unsupported client type: " + str);
23             }
24         }
25         return server;
26     }
复制代码

首先是在原本providerUrl上添加参数:channel.readonly.sent=true&heartbeat=60000&codec=dubbo(其中的heartbeat参数会在HeaderExchangeServer启动心跳计时器时使用)

之后使用Exchangers.bind("添加参数后的providerUrl", requestHandler)创建ExchangeServer。首先来看一下DubboProtocol#requestHandler。这个类极其重要,后续经过层层包装后,会成为最终netty的服务端逻辑处理器。

复制代码
 1     private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
 2         public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
 3             if (message instanceof Invocation) {
 4                 Invocation inv = (Invocation) message;
 5                 Invoker<?> invoker = getInvoker(channel, inv);
 6                 //如果是callback 需要处理高版本调用低版本的问题
 7                 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
 8                     String methodsStr = invoker.getUrl().getParameters().get("methods");
 9                     boolean hasMethod = false;
10                     if (methodsStr == null || methodsStr.indexOf(",") == -1) {
11                         hasMethod = inv.getMethodName().equals(methodsStr);
12                     } else {
13                         String[] methods = methodsStr.split(",");
14                         for (String method : methods) {
15                             if (inv.getMethodName().equals(method)) {
16                                 hasMethod = true;
17                                 break;
18                             }
19                         }
20                     }
21                     if (!hasMethod) {
22                         logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
23                         return null;
24                     }
25                 }
26                 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
27                 return invoker.invoke(inv);
28             }
29             throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
30         }
31 
32         @Override
33         public void received(Channel channel, Object message) throws RemotingException {
34             if (message instanceof Invocation) {
35                 reply((ExchangeChannel) channel, message);
36             } else {
37                 super.received(channel, message);
38             }
39         }
40 
41         @Override
42         public void connected(Channel channel) throws RemotingException {
43             invoke(channel, Constants.ON_CONNECT_KEY);
44         }
45 
46         @Override
47         public void disconnected(Channel channel) throws RemotingException {
48             if (logger.isInfoEnabled()) {
49                 logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
50             }
51             invoke(channel, Constants.ON_DISCONNECT_KEY);
52         }
53 
54         private void invoke(Channel channel, String methodKey) {
55             Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
56             if (invocation != null) {
57                 try {
58                     received(channel, invocation);
59                 } catch (Throwable t) {
60                     logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
61                 }
62             }
63         }
64 
65         private Invocation createInvocation(Channel channel, URL url, String methodKey) {
66             String method = url.getParameter(methodKey);
67             if (method == null || method.length() == 0) {
68                 return null;
69             }
70             RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
71             invocation.setAttachment(Constants.PATH_KEY, url.getPath());
72             invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
73             invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
74             invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
75             if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
76                 invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
77             }
78             return invocation;
79         }
80     };
复制代码

从上可以看出在该handler中,定义了与客户端连接成功/断开连接/接受到客户端消息/相应消息,以及创造Invocation的方法。其中的getInvoker(Channel channel, Invocation inv)方法简码如下:

1         String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
2         DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
3         return exporter.getInvoker();

这不就是我们刚刚放置到exporterMap中的DubboExporter,而其中的invoker不就是我们的“filter的invokerdelegete对象”。

 

使用Exchangers.bind(providerUrl, ExchangeHandlerAdapter对象)创建ExchangeServer

复制代码
 1     public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
 2         if (url == null) {
 3             throw new IllegalArgumentException("url == null");
 4         }
 5         if (handler == null) {
 6             throw new IllegalArgumentException("handler == null");
 7         }
 8         url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
 9         return getExchanger(url).bind(url, handler);
10     }
11 
12     public static Exchanger getExchanger(URL url) {
13         String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);//header
14         return getExchanger(type);
15     }
16 
17     public static Exchanger getExchanger(String type) {
18         return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
19     }
复制代码

getExchanger(URL url)返回一个HeaderExchanger实例。所以ExchangeServer的创建交由HeaderExchanger来实现。

 

HeaderExchanger.bind(providerUrl, ExchangeHandlerAdapter对象) 

复制代码
1     /**
2      * 1 对handler进行三次包装:首先将ExchangeHandlerAdapter赋给HeaderExchangeHandler中的ExchangeHandler handler属性;然后将创建出来的HeaderExchangeHandler赋给DecodeHandler的父类AbstractChannelHandlerDelegate的ChannelHandler handler属性
3      */
4     public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
5         return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
6     }
复制代码

说明:

  • 这里首先对传入的ExchangeHandlerAdapter进行了两次包装,最终得到DecodeHandler实例;
  • 之后,使用Transporters.bind(providerUrl, DecodeHandler对象)创建了一个NettyServer;
  • 最后使用HeaderExchangeServer包装了上边的NettyServer,并启动了心跳计数器。
    • HeaderExchangeServer实例也是最终返回的ExchangeServer实例,将最终被存储在Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->HeaderExchangeServer实例 }

包装ExchangeHandlerAdapter,获取DecodeHandler实例。代码比较简单,不列出来了。

最终获取到的DecodeHandler实例的层级关系:

1 DecodeHandler实例
2 -->HeaderExchangeHandler实例
3    -->ExchangeHandlerAdapter实例

 

使用Transporters.bind(providerUrl, DecodeHandler对象)创建了一个NettyServer

Transporters.bind(providerUrl, DecodeHandler对象)

复制代码
 1     public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
 2         if (url == null) {
 3             throw new IllegalArgumentException("url == null");
 4         }
 5         if (handlers == null || handlers.length == 0) {
 6             throw new IllegalArgumentException("handlers == null");
 7         }
 8         ChannelHandler handler;
 9         if (handlers.length == 1) {
10             handler = handlers[0];
11         } else {
12             handler = new ChannelHandlerDispatcher(handlers);
13         }
14         return getTransporter().bind(url, handler);
15     }
16 
17     public static Transporter getTransporter() {
18         return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
19     }
复制代码

 

Transporter$Adaptive.bind(providerUrl, DecodeHandler对象)

复制代码
 1     public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg0;
 5         String extName = url.getParameter("server", url.getParameter("transporter", "netty"));//netty
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
 8         com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
 9         return extension.bind(arg0, arg1);
10     }
复制代码

 

最后NettyServer的创建由NettyTransporter来创建。

NettyTransporter.bind(providerUrl, DecodeHandler对象)

复制代码
 1 public class NettyTransporter implements Transporter {
 2     public static final String NAME = "netty";
 3 
 4     public Server bind(URL url, ChannelHandler listener) throws RemotingException {
 5         return new NettyServer(url, listener);
 6     }
 7 
 8     public Client connect(URL url, ChannelHandler listener) throws RemotingException {
 9         return new NettyClient(url, listener);
10     }
11 }
复制代码

 

new NettyServer(providerUrl, DecodeHandler对象)

1     public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
2         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
3     }

这里首先为providerUrl添加参数:threadname=DubboServerHandler-10.10.10.10:20880(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));

之后,使用ChannelHandlers.wrap(DecodeHandler对象, providerUrl)对DecodeHandler对象进行了三层包装,最终得到MultiMessageHandler实例;

最后调用父类的构造器初始化NettyServer的各个属性,最后启动netty。

先看一下

ChannelHandlers.wrap(DecodeHandler对象, providerUrl)

复制代码
 1     /**
 2      * 这里又是层层包裹:
 3      * MultiMessageHandler
 4      * --HeartbeatHandler
 5      *   --AllChannelHandler
 6      *     --DecodeHandler
 7      *       --HeaderExchangeHandler
 8      *         --ExchangeHandlerAdapter
 9      * @param handler
10      * @param url
11      * @return
12      */
13     protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
14         return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
15                 .getAdaptiveExtension().dispatch(handler, url)));
16     }
复制代码

ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()获取到一个Dispatcher$Adaptive适配类。

Dispatcher$Adaptive.dispatch(DecodeHandler对象, providerUrl)

复制代码
 1     public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) {
 2         if (arg1 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg1;
 5         String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all")));//all
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])");
 8         com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName);
 9         return extension.dispatch(arg0, arg1);
10     }
复制代码

这里获取到AllDispatcher,Dispatcher决定了dubbo的线程模型,指定了哪些做什么,哪些线程做什么。讲到dubbo通信的时候再写。

AllDispatcher.dispatch(DecodeHandler对象, providerUrl)

1     public ChannelHandler dispatch(ChannelHandler handler, URL url) {
2         return new AllChannelHandler(handler, url);
3     }

new AllChannelHandler(DecodeHandler对象, providerUrl)

1     public AllChannelHandler(ChannelHandler handler, URL url) {
2         super(handler, url);
3     }

来看其父类的WrappedChannelHandler的构造器:

WrappedChannelHandler(DecodeHandler对象, providerUrl)

复制代码
 1     protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
 2     protected final ExecutorService executor;
 3     protected final ChannelHandler handler;
 4     protected final URL url;
 5 
 6     public WrappedChannelHandler(ChannelHandler handler, URL url) {
 7         this.handler = handler;
 8         this.url = url;
 9         executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
10 
11         String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
12         if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
13             componentKey = Constants.CONSUMER_SIDE;
14         }
15         DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
16         dataStore.put(componentKey, Integer.toString(url.getPort()), executor);//{"java.util.concurrent.ExecutorService":{"20880":executor}}
17     }
复制代码

首先创建了一个共享线程池:SHARED_EXECUTOR;

之后为handler/url/executor赋值,其中executor是一个200个线程数的fixed线程池(队列为0,即同步队列);

复制代码
 1     public Executor getExecutor(URL url) {
 2         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);//默认为dubbo,但是我们这里是DubboServerHandler-10.10.10.10:20880(就是之前设置到url上的threadname)
 3         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);//200
 4         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);//0
 5         return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
 6                 queues == 0 ? new SynchronousQueue<Runnable>() :
 7                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
 8                                 : new LinkedBlockingQueue<Runnable>(queues)),
 9                 new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
10     }
复制代码

之后获取了一个数据存储器:SimpleDataStore;

最后将{"java.util.concurrent.ExecutorService":{"20880": executor}}数据存储在SimpleDataStore的ConcurrentMap<String, ConcurrentMap<String, Object>> data数据结构中。也就是说:每一个端口,有一个线程池。

注意:为什么SimpleDataSource可以做缓存来使用?

复制代码
 1     public T getExtension(String name) {
 2         if (name == null || name.length() == 0)
 3             throw new IllegalArgumentException("Extension name == null");
 4         if ("true".equals(name)) {
 5             return getDefaultExtension();
 6         }
 7         Holder<Object> holder = cachedInstances.get(name);
 8         if (holder == null) {
 9             cachedInstances.putIfAbsent(name, new Holder<Object>());
10             holder = cachedInstances.get(name);
11         }
12         Object instance = holder.get();
13         if (instance == null) {
14             synchronized (holder) {
15                 instance = holder.get();
16                 if (instance == null) {
17                     instance = createExtension(name);
18                     holder.set(instance);
19                 }
20             }
21         }
22         return (T) instance;
23     }
复制代码

其实,就是这样SimpleDataStore实例会存储在cachedInstances缓存中,下一次不会再创建,而是直接获取该缓存。

 

这样之后,一个AllChannelHandler实例就完成了,该实例属性如下

  • WrappedChannelHandler.url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1287&side=provider&threadname=DubboServerHandler-10.10.10.10:20880&timestamp=1507116859919
  • WrappedChannelHandler.handler:DecodeHandler对象
  • WrappedChannelHandler.executor:FixedThreadPool实例

当然还有一个类变量WrappedChannelHandler.SHARED_EXECUTOR=CachedThreadPool实例。

 

之后AllChannelHandler实例会被HeartbeatHandler进行包裹,之后HeartbeatHandler实例又会被MultiMessageHandler所包裹,最后得到的MultiMessageHandler实例的层级结构如下:

复制代码
1 MultiMessageHandler
2 -->handler: HeartbeatHandler
3    -->handler: AllChannelHandler
4          -->url: providerUrl
5          -->executor: FixedExecutor
6          -->handler: DecodeHandler
7             -->handler: HeaderExchangeHandler
8                -->handler: ExchangeHandlerAdapter
复制代码

 

MultiMessageHandler实例创建出来之后,NettyServer就开始调用其各个父类进行属性的初始化了。首先来看一下NettyServer的父类层级图:

AbstractServer:

复制代码
 1     protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
 2     ExecutorService executor;
 3     private InetSocketAddress localAddress;
 4     private InetSocketAddress bindAddress;
 5     private int accepts;
 6     private int idleTimeout = 600;
 7 
 8     public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
 9         super(url, handler);
10         localAddress = getUrl().toInetSocketAddress();
11         String host = url.getParameter(Constants.ANYHOST_KEY, false)
12                 || NetUtils.isInvalidLocalHost(getUrl().getHost())
13                 ? NetUtils.ANYHOST : getUrl().getHost();
14         bindAddress = new InetSocketAddress(host, getUrl().getPort());
15         this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
16         this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
17         try {
18             doOpen();
19             if (logger.isInfoEnabled()) {
20                 logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
21             }
22         } catch (Throwable t) {
23             throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
24                     + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
25         }
26         //fixme replace this with better method
27         DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
28         executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
29     }
复制代码

首先调用父类初始化属性,之后启动服务。

AbstractEndpoint:

复制代码
 1     private Codec2 codec;
 2     private int timeout;
 3     private int connectTimeout;
 4 
 5     public AbstractEndpoint(URL url, ChannelHandler handler) {
 6         super(url, handler);
 7         this.codec = getChannelCodec(url);
 8         this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);//1000
 9         this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);//3000
10     }
复制代码

AbstractPeer:

复制代码
 1     private final ChannelHandler handler;
 2     private volatile URL url;
 3 
 4     public AbstractPeer(URL url, ChannelHandler handler) {
 5         if (url == null) {
 6             throw new IllegalArgumentException("url == null");
 7         }
 8         if (handler == null) {
 9             throw new IllegalArgumentException("handler == null");
10         }
11         this.url = url;
12         this.handler = handler;
13     }
复制代码

来看一下最后初始化好的NettyServer实例:

  • url:providerUrl(dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1287&side=provider&timestamp=1507116859919)
  • handler:MultiMessageHandler实例
  • codec:DubboCountCodec实例
  • timeout:1000
  • connectTimeout:3000
  • idleTime:600*1000
  • localAddress:10.10.10.10:20880
  • bindAddress:0.0.0.0:20880
  • accepts:0
  • executor:null(此时的executor还没实例话,要等netty服务起来之后才会从缓存中获取之前存储在SimpleDataStore缓存中的那个200个线程数的FixedThreadPool实例)

 

之后,就要启动netty服务了。

复制代码
 1     /**
 2      * 启动netty服务,监听客户端连接
 3      */
 4     @Override
 5     protected void doOpen() throws Throwable {
 6         NettyHelper.setNettyLoggerFactory();
 7         ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
 8         ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
 9         ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
10         bootstrap = new ServerBootstrap(channelFactory);
11 
12         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
13         channels = nettyHandler.getChannels();
14         // https://issues.jboss.org/browse/NETTY-365
15         // https://issues.jboss.org/browse/NETTY-379
16         // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
17         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
18             public ChannelPipeline getPipeline() {
19                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
20                 ChannelPipeline pipeline = Channels.pipeline();
21                 /*int idleTimeout = getIdleTimeout();
22                 if (idleTimeout > 10000) {
23                     pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
24                 }*/
25                 pipeline.addLast("decoder", adapter.getDecoder());
26                 pipeline.addLast("encoder", adapter.getEncoder());
27                 pipeline.addLast("handler", nettyHandler);
28                 return pipeline;
29             }
30         });
31         // bind
32         channel = bootstrap.bind(getBindAddress());
33     }
复制代码

说明:

  • boss线程数默认只有一个;
  • worker线程数:Runtime.getRuntime().availableProcessors() + 1,为计算机核数+1;
  • 服务端逻辑处理器为NettyHandler:
  • 编码器为:InternalEncoder实例,内部使用NettyServer的DubboCountCodec实例来编码
  • 解码器为:InternalDecoder实例,内部使用NettyServer的DubboCountCodec实例来解码

 NettyHandler:

复制代码
 1 @Sharable
 2 public class NettyHandler extends SimpleChannelHandler {
 3     private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
 4     private final URL url;
 5     private final ChannelHandler handler;
 6 
 7     public NettyHandler(URL url, ChannelHandler handler) {
 8         if (url == null) {
 9             throw new IllegalArgumentException("url == null");
10         }
11         if (handler == null) {
12             throw new IllegalArgumentException("handler == null");
13         }
14         this.url = url;
15         this.handler = handler;
16     }
17 
18     public Map<String, Channel> getChannels() {
19         return channels;
20     }
21 
22     @Override
23     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
24         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
25         try {
26             if (channel != null) {
27                 channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
28             }
29             handler.connected(channel);
30         } finally {
31             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
32         }
33     }
34 
35     @Override
36     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
37         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
38         try {
39             channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()));
40             handler.disconnected(channel);
41         } finally {
42             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
43         }
44     }
45 
46     @Override
47     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
48         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
49         try {
50             handler.received(channel, e.getMessage());
51         } finally {
52             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
53         }
54     }
55 
56     @Override
57     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
58         super.writeRequested(ctx, e);
59         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
60         try {
61             handler.sent(channel, e.getMessage());
62         } finally {
63             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
64         }
65     }
66 
67     @Override
68     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
69         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
70         try {
71             handler.caught(channel, e.getCause());
72         } finally {
73             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
74         }
75     }
76 }
复制代码

说明:

属性

  • handler:就是当前的NettyServer实例
  • url:providerUrl
  • channels:存放连接到来的channel

监听连接完成/连接断开/接收到消息/发送完消息/异常捕捉事件,之后使用NettyServer实例进行相应的处理,NettyServer又会调用MultiMessageHandler实例(该handler属性位于NettyServer的父类AbstractPeer中)进行处理。

 

在来看编码器和解码器:

NettyCodecAdapter(DubboCountCodec实例, providerUrl, 当前的NettyServer实例)

复制代码
 1 final class NettyCodecAdapter {
 2     private final ChannelHandler encoder = new InternalEncoder();
 3     private final ChannelHandler decoder = new InternalDecoder();
 4     private final Codec2 codec;
 5     private final URL url;
 6     private final int bufferSize;
 7     private final com.alibaba.dubbo.remoting.ChannelHandler handler;
 8 
 9     public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
10         this.codec = codec;
11         this.url = url;
12         this.handler = handler;
13         int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);//8*1024
14         this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;//8*1024
15     }
16 
17     public ChannelHandler getEncoder() {
18         return encoder;
19     }
20 
21     public ChannelHandler getDecoder() {
22         return decoder;
23     }
24 
25     @Sharable
26     private class InternalEncoder extends OneToOneEncoder {
27         @Override
28         protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
29             ...
30             codec.encode(channel, buffer, msg);
31             ...
32         }
33     }
34 
35     private class InternalDecoder extends SimpleChannelUpstreamHandler {
36         @Override
37         public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
38            ...
39             msg = codec.decode(channel, message);
40            ...
41         }
42         ...
43     }
44 }
复制代码

只列出核心代码:可以看到,InternalEncoder实例和InternalDecoder实例内部还是使用NettyServer的DubboCountCodec实例来编解码的。dubbo的编解码做的非常好,后续会写。

 

到此为止,NettyServer就创建成功了。 之后,终于执行到了:

new HeaderExchangeServer(Server NettyServer)

复制代码
 1     private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory("dubbo-remoting-server-heartbeat",true));
 2     private final Server server;
 3     // 心跳定时器
 4     private ScheduledFuture<?> heatbeatTimer;
 5     // 心跳超时,毫秒。缺省0,不会执行心跳。
 6     private int heartbeat;
 7     private int heartbeatTimeout;
 8     private AtomicBoolean closed = new AtomicBoolean(false);
 9 
10     public HeaderExchangeServer(Server server) {
11         if (server == null) {
12             throw new IllegalArgumentException("server == null");
13         }
14         this.server = server;
15         this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//60000 在createServer(URL providerUrl)中拼接了heartbeat参数
16         this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);//3*60000
17         if (heartbeatTimeout < heartbeat * 2) {
18             throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
19         }
20         startHeatbeatTimer();
21     }
复制代码

说明:

  • 属性
    • scheduled:是一个有1个名字为dubbo-remoting-server-heartbeat的后台线程的定时线程池;
    • server:之前创建出来的NettyServer实例;
    • heartbeatTimer:心跳计时器
    • heartbeat:心跳时间,该参数会在HeaderExchangeServer的构造器中进行赋值,60000
    • heartbeatTimeout:心跳超时时间,超过该时间,会进行channel重连,180000
  • 启动心跳计时器

startHeatbeatTimer()

复制代码
 1     private void startHeatbeatTimer() {
 2         stopHeartbeatTimer();
 3         if (heartbeat > 0) {
 4             heatbeatTimer = scheduled.scheduleWithFixedDelay(
 5                     new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
 6                         public Collection<Channel> getChannels() {
 7                             return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
 8                         }
 9                     }, heartbeat, heartbeatTimeout),
10                     heartbeat,
11                     heartbeat,
12                     TimeUnit.MILLISECONDS);
13         }
14     }
15 
16     private void stopHeartbeatTimer() {
17         try {
18             ScheduledFuture<?> timer = heatbeatTimer;
19             if (timer != null && !timer.isCancelled()) {
20                 timer.cancel(true);
21             }
22         } catch (Throwable t) {
23             logger.warn(t.getMessage(), t);
24         } finally {
25             heatbeatTimer = null;
26         }
27     }
复制代码

首先停掉之前的计时器,之后在线程创建开始heartbeat毫秒(60s)后执行第一次HeartBeatTask任务,之后每隔heartbeat毫秒(60s)执行一次HeartBeatTask任务。来看一下HeartBeatTask:

HeartBeatTask

复制代码
 1 final class HeartBeatTask implements Runnable {
 2     private ChannelProvider channelProvider;
 3     private int heartbeat;//60s
 4     private int heartbeatTimeout;//180s
 5 
 6     HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
 7         this.channelProvider = provider;
 8         this.heartbeat = heartbeat;
 9         this.heartbeatTimeout = heartbeatTimeout;
10     }
11 
12     public void run() {
13         try {
14             long now = System.currentTimeMillis();
15             for (Channel channel : channelProvider.getChannels()) {
16                 if (channel.isClosed()) {
17                     continue;
18                 }
19                 try {
20                     Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);//"READ_TIMESTAMP"
21                     Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);//"WRITE_TIMESTAMP"
22                     //如果最后一次读和写在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,需要程序发送心跳
23                     if ((lastRead != null && now - lastRead > heartbeat)
24                             || (lastWrite != null && now - lastWrite > heartbeat)) {
25                         Request req = new Request();
26                         req.setVersion("2.0.0");
27                         req.setTwoWay(true);
28                         req.setEvent(Request.HEARTBEAT_EVENT);
29                         channel.send(req);
30                         if (logger.isDebugEnabled()) {
31                             logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
32                                     + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
33                         }
34                     }
35                     //如果最后一次读的时间距离现在已经超过heartbeatTimeout了,我们认为channel已经断了(因为在这个过程中,发送了三次心跳都没反应),此时channel进行重连
36                     if (lastRead != null && now - lastRead > heartbeatTimeout) {
37                         logger.warn("Close channel " + channel
38                                 + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
39                         if (channel instanceof Client) {
40                             try {
41                                 ((Client) channel).reconnect();
42                             } catch (Exception e) {
43                                 //do nothing
44                             }
45                         } else {
46                             channel.close();
47                         }
48                     }
49                 } catch (Throwable t) {
50                     logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
51                 }
52             }
53         } catch (Throwable t) {
54             logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
55         }
56     }
57 
58     interface ChannelProvider {
59         Collection<Channel> getChannels();
60     }
61 }
复制代码

说明:

  • 属性
    • channelProvider在startHeatbeatTimer()中创建,并且获取了当前的HeaderExchangeServer的所有channels
    • heartbeat:60s
    • heartbeatTimeout:180s
  • run()
    • 如果最后一次读和写的时间距离现在在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,发送心跳
    • 如果最后一次读的时间距离现在已经超过heartbeatTimeout了,认为channel已经断了(因为在这个过程中,发送了三次心跳都没反应),此时channel进行重连

到现在一个完整的ExchangeServer就OK了。之后我们将创建出来的ExchangeServer实例存放在DubboProtocol的Map<String, ExchangeServer> serverMap属性中:

{ "10.10.10.10:20880" : ExchangeServer实例 }

最后,DubboProtocol.export(Invoker<T> invoker)将之前创建的DubboExporter实例返回。

 

2.4  创建RegistryProtocol.ExporterChangeableWrapper来封装Exporter和originInvoker

1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
复制代码
 1     private class ExporterChangeableWrapper<T> implements Exporter<T> {
 2         private final Invoker<T> originInvoker;
 3         private Exporter<T> exporter;
 4 
 5         public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
 6             this.exporter = exporter;
 7             this.originInvoker = originInvoker;
 8         }
 9 
10         public Invoker<T> getOriginInvoker() {
11             return originInvoker;
12         }
13 
14         public Invoker<T> getInvoker() {
15             return exporter.getInvoker();
16         }
17 
18         public void setExporter(Exporter<T> exporter) {
19             this.exporter = exporter;
20         }
21 
22         public void unexport() {
23             String key = getCacheKey(this.originInvoker);
24             bounds.remove(key);
25             exporter.unexport();
26         }
27     }
复制代码

ExporterChangeableWrapper类是RegistryProtocol的私有内部类

最后,将<providerUrl, ExporterChangeableWrapper实例>放入RegistryProtocol的属性Map<String, ExporterChangeableWrapper<?>> bounds中。

  • key:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=744&side=provider&timestamp=1507176748026
  • value:RegistryProtocol$ExporterChangeableWrapper实例
    • originInvoker:即AbstractProxyInvoker实例属性如下:
      • proxy:DemoServiceImpl实例
      • type:Class<com.alibaba.dubbo.demo.DemoService>
      • url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993&registry=zookeeper&timestamp=1507100319830
    • DubboExporter实例
      • key:com.alibaba.dubbo.demo.DemoService:20880
      • invoker:"InvokerDelegete的filter对象"
      • exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }

 

到此为止,RegistryProtocol.export(final Invoker<T> originInvoker)的第一行代码就完成了。

复制代码
 1     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2         //export invoker
 3         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 4         //registry provider
 5         final Registry registry = getRegistry(originInvoker);
 6         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
 7         registry.register(registedProviderUrl);
 8         // 订阅override数据
 9         // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
10         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
11         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
12         overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
13         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
14         //保证每次export都返回一个新的exporter实例
15         return new Exporter<T>() {
16             public Invoker<T> getInvoker() {
17                 return exporter.getInvoker();
18             }
19 
20             public void unexport() {
21                 try {
22                     exporter.unexport();
23                 } catch (Throwable t) {
24                     logger.warn(t.getMessage(), t);
25                 }
26                 try {
27                     registry.unregister(registedProviderUrl);
28                 } catch (Throwable t) {
29                     logger.warn(t.getMessage(), t);
30                 }
31                 try {
32                     overrideListeners.remove(overrideSubscribeUrl);
33                     registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
34                 } catch (Throwable t) {
35                     logger.warn(t.getMessage(), t);
36                 }
37             }
38         };
39     }
复制代码

这篇关于7.4 服务远程暴露 - 创建Exporter与启动netty服务端的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/966942

相关文章

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

用命令行的方式启动.netcore webapi

用命令行的方式启动.netcore web项目 进入指定的项目文件夹,比如我发布后的代码放在下面文件夹中 在此地址栏中输入“cmd”,打开命令提示符,进入到发布代码目录 命令行启动.netcore项目的命令为:  dotnet 项目启动文件.dll --urls="http://*:对外端口" --ip="本机ip" --port=项目内部端口 例: dotnet Imagine.M

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

Linux服务器Java启动脚本

Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包, 通常我们会使用nohup直接启动,但是还是需要手动停止然后再次启动, 那如何更优雅的在服务器上启动jar包呢,让我们一起探讨一下吧。 1、初版 第一个版本是常用的做法,直接使用nohup后台启动jar包, 并将日志输出到当前文件夹n