Netty-ServerBootstrap-监听

2024-05-27 11:04

本文主要是介绍Netty-ServerBootstrap-监听,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

归档

  • GitHub: Netty-ServerBootstrap-监听

使用示例

  • 参考:简单示例-服务端
  • Java 参考:
    • https://github.com/zengxf/small-frame-demo/blob/master/jdk-demo/simple-demo/src/main/java/test/socket/nio/MyNioServer.java

总结

  • 启动顺序:
    • 先注册事件监听
    • 再绑定地址启动服务
  • Unsafe:
    • 底层逻辑处理
  • 说明:
    • 代码注释中“发送信道激活事件”:实为直接调用钩子函数

原理

启动

  • 使用 bind() 方法开启服务

  • io.netty.bootstrap.ServerBootstrap

/*** 服务端-引导类 */
// 继承 AbstractBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {// bind() 在父类里}
  • io.netty.bootstrap.AbstractBootstrap
/*** 抽象-引导类 */
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {/*** 绑定地址(启动) */public ChannelFuture bind() {... // 省略校验return doBind(localAddress);}// 启动private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister(); // sign_m_010 初始化信道并添加到监听器(事件轮循组)final Channel channel = regFuture.channel();... // 省略异常判断处理if (regFuture.isDone()) {... // 初始化没这么快,一般不会进入此,因此省略} else { // 测试时进入此final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() { // 添加监听@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) { // 有异常promise.setFailure(cause); // 设置失败} else {promise.registered(); // 标记下doBind0(regFuture, channel, localAddress, promise); // 绑定处理}}});return promise;}}// 绑定地址private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) { // 此方法在触发 channelRegister() 之前调用channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// sb 设置的是 NioServerSocketChannelchannel.bind(localAddress, promise) // 进入绑定.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
}
  • io.netty.channel.AbstractChannel
    • 参考:基础类介绍-NioServerSocketChannel
/*** 抽象-信道类 */
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {/*** 绑定地址 */@Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise); // pipeline 是 DefaultChannelPipeline 的实例}
}
  • io.netty.channel.DefaultChannelPipeline
/*** 默认流水线 */
public class DefaultChannelPipeline implements ChannelPipeline {/*** 绑定地址 */@Overridepublic final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {/*** 从尾节点开始往前传递处理。* 链结构:*   head <-> ServerBootstrapAcceptor <-> tail* 处理顺序:*   tail  -> ServerBootstrapAcceptor  -> head* * ServerBootstrapAcceptor */return tail.bind(localAddress, promise); // }/*** 尾节点 */final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {// bind(...) 方法在父类里实现}
}
  • io.netty.channel.AbstractChannelHandlerContext
/*** 抽象上下文(相当于流水线双向链表的节点) */
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {@Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {... // 省略校验处理/*** 查找出站处理器上下文:*     查找出来的是 HeadContext,*     即:next = head*/final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeBind(localAddress, promise); // 调用绑定} else {... // 省略异步调用绑定}return promise;}// 调用绑定 (this 变为 HeadContext 实例)private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) { // 测试时,进入此分支try {final ChannelHandler handler = handler(); // 返回 headfinal DefaultChannelPipeline.HeadContext headContext = pipeline.head;if (handler == headContext) { // 进入此分支headContext.bind(this, localAddress, promise); // 调用头节点的绑定方法(sign001)} ... // 省略其他分支处理} ... // 省略 catch 处理} else {bind(localAddress, promise); // 递归调用}}
}
  • io.netty.channel.DefaultChannelPipeline.HeadContext
    /*** 头节点 */final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {/*** 绑定方法(sign001) */@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {/*** unsafe 为 AbstractNioMessageChannel.NioMessageUnsafe 实例*/unsafe.bind(localAddress, promise);}}
  • io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
    // 继承 AbstractNioUnsafeprivate final class NioMessageUnsafe extends AbstractNioUnsafe {// bind(...) 在父类 AbstractUnsafe 里实现}
  • io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe
    // 继承 AbstractUnsafeprotected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {}
  • io.netty.channel.AbstractChannel.AbstractUnsafe
    protected abstract class AbstractUnsafe implements Unsafe {@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {... // 省略校验与日志打印boolean wasActive = isActive(); // 返回 falsetry {/*** doBind(...) 指定的是 NioServerSocketChannel 对象。*     即:回调 NioServerSocketChannel 的绑定方法。*/doBind(localAddress);} ... // 省略 catch 处理if (!wasActive && isActive()) { // 相当于之前没启动,后面启动了invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive(); // sign_m_020 发送信道激活事件}});}/*** 将 promise 设置为成功状态。*     这样示例中 bs.bind().sync(); // sign_demo_001*     才会返回,否则一直阻塞。*/safeSetSuccess(promise);}}
  • io.netty.channel.socket.nio.NioServerSocketChannel
/*** NIO 服务端信道 */
public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel 
{// 绑定处理@Overrideprotected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {/*** javaChannel() 返回的是 java.nio.channels.ServerSocketChannel 实例。 *     即:底层使用 Java 的信道绑定地址,*          相当于底层开启监听。*/javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}
}

监听

  • io.netty.channel.socket.nio.NioServerSocketChannel
/*** 服务端-信道 */
public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel 
{public NioServerSocketChannel(ServerSocketChannel channel) { // channel 是 Java NIO 的实例super(null, channel, SelectionKey.OP_ACCEPT); // 设置 OP_ACCEPT 类型事件监听config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
}
  • io.netty.channel.nio.AbstractNioMessageChannel
/*** 抽象 NIO 消息信道 */
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp); // readInterestOp 要监听的事件类型}
}
  • io.netty.channel.nio.AbstractNioChannel
    • 底层操作在此类实现
/*** 抽象 NIO 信道 */
public abstract class AbstractNioChannel extends AbstractChannel {private final SelectableChannel ch; // Java 底层信道protected final int readInterestOp; // 要监听的事件类型volatile SelectionKey selectionKey; // Java 底层监听的 key 句柄/*** 构造器 */protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false); // 设置为非阻塞} ... // 省略 catch 处理}/*** 获取 Java 底层信道 */protected SelectableChannel javaChannel() {return ch;}/*** 获取 Java 底层监听的 key 句柄 */protected SelectionKey selectionKey() {return selectionKey;}/*** 注册处理 sign_m_001 */@Overrideprotected void doRegister() throws Exception {...for (;;) {try {// 向事件轮循里面的选择器进行初步注册,key 的附件为自己selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} ... // catch}}/*** sign_m_030 开始读处理 */@Overrideprotected void doBeginRead() throws Exception {new RuntimeException("栈跟踪-设置兴趣事件").printStackTrace();... // 省略 selectionKey 校验 final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp); // 正式注册要监听的事件类型}}
}
注册调用点
  • io.netty.bootstrap.AbstractBootstrap
    /*** sign_m_010 初始化信道并添加到监听器(事件轮循组) */final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel); // 初始化:信道流水线添加 ServerBootstrapAcceptor 处理器} ... // catch// config().group() 相当于是设置给引导器的 NioEventLoopGroupChannelFuture regFuture = config().group().register(channel); // 注册处理... // 省略 regFuture 异常处理return regFuture;}
  • io.netty.channel.MultithreadEventLoopGroup
    • 参考:基础类介绍-NioEventLoopGroup
    /*** 注册(相当于将信道绑定到线程,同时又注册到选择器) */@Overridepublic ChannelFuture register(Channel channel) {/*** next(): 通过轮循算法返回下一个事件轮循者(相当于线程)*/return next().register(channel); // 调用事件轮循者进行注册}
  • io.netty.channel.SingleThreadEventLoop
    • DefaultChannelPromise 可参考:
      • 基础类介绍-DefaultChannelPromise
      • 异步工具类-DefaultChannelPromise
    /*** 注册信道 */@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}/*** 注册信道 */@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");/*** promise.channel().unsafe() 返回的是 NioMessageUnsafe 实例*/promise.channel().unsafe().register(this, promise); // 通过 unsafe 注册return promise;}
  • io.netty.channel.AbstractChannel.AbstractUnsafe
    • 参考:基础类介绍-NioMessageUnsafe
    protected abstract class AbstractUnsafe implements Unsafe {/*** 注册处理 */@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {... // 省略校验等AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else { // 不是当前线程,则异步调用try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise); // sign_m_201 注册处理}});} ... // catch}}// sign_m_201 注册处理private void register0(ChannelPromise promise) {try {... // 省略校验等doRegister(); // 注册处理 sign_m_001: 调用 AbstractNioChannel #doRegister()pipeline.invokeHandlerAddedIfNeeded(); // 回调 ChannelHandler #handlerAdded()safeSetSuccess(promise); // 设置 promise 为成功pipeline.fireChannelRegistered(); // 发射已注册事件if (isActive()) { // 调试时:还未激活,不进入此分支if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} ... // catch }}
设置兴趣事件
  • 栈跟踪
java.lang.RuntimeException: 栈跟踪-设置兴趣事件at io.netty.channel.nio.AbstractNioChannel.doBeginRead(AbstractNioChannel.java:413)at io.netty.channel.nio.AbstractNioMessageChannel.doBeginRead(AbstractNioMessageChannel.java:55)at io.netty.channel.AbstractChannel$AbstractUnsafe.beginRead(AbstractChannel.java:834)at io.netty.channel.DefaultChannelPipeline$HeadContext.read(DefaultChannelPipeline.java:1362)at io.netty.channel.AbstractChannelHandlerContext.invokeRead(AbstractChannelHandlerContext.java:835)at io.netty.channel.AbstractChannelHandlerContext.read(AbstractChannelHandlerContext.java:814)at io.netty.channel.DefaultChannelPipeline.read(DefaultChannelPipeline.java:1004)at io.netty.channel.AbstractChannel.read(AbstractChannel.java:290)at io.netty.channel.DefaultChannelPipeline$HeadContext.readIfIsAutoRead(DefaultChannelPipeline.java:1422)at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1400)at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:258) // sign_c_001at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238) // sign_c_0_1at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) // sign_m_021 发送信道激活事件at io.netty.channel.AbstractChannel$AbstractUnsafe$2.run(AbstractChannel.java:573) // sign_m_020 发送信道激活事件
  • 在绑定后,发送信道激活事件

    • 自动进行读处理
  • io.netty.channel.DefaultChannelPipeline

    /*** sign_m_021 发送信道激活事件 */@Overridepublic final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head); // sign_c_0_1 从头节点开始return this;}// 头节点final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {/*** sign_c_002 信道激活事件处理 */@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}private void readIfIsAutoRead() {/*** io.netty.channel.ChannelConfig #isAutoRead():*   默认只在 io.netty.channel.DefaultChannelConfig 实现,*   其实现相当于直接返回 true,*   所以会进入 read() 方法。*/if (channel.config().isAutoRead()) {channel.read(); // sign_c_003}}// sign_c_007@Overridepublic void read(ChannelHandlerContext ctx) {unsafe.beginRead(); // sign_c_008}}// sign_c_004@Overridepublic final ChannelPipeline read() {tail.read(); // sign_c_005return this;}
  • io.netty.channel.AbstractChannelHandlerContext
    // sign_c_0_1static void invokeChannelActive(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {// next => headnext.invokeChannelActive(); // sign_c_001} ... // else }// sign_c_001private void invokeChannelActive() {if (invokeHandler()) {try {final ChannelHandler handler = handler();final DefaultChannelPipeline.HeadContext headContext = pipeline.head;if (handler == headContext) {headContext.channelActive(this); // sign_c_002 从头节点开始} ... // else} ... // catch} ... // else}// sign_c_005@Overridepublic ChannelHandlerContext read() {final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeRead(); // sign_c_006} ... // elsereturn this;}// sign_c_006private void invokeRead() {if (invokeHandler()) {try {final ChannelHandler handler = handler();final DefaultChannelPipeline.HeadContext headContext = pipeline.head;if (handler == headContext) {headContext.read(this); // sign_c_007} ... // else} ... // catch } ... // else}
  • io.netty.channel.AbstractChannel
    // sign_c_003@Overridepublic Channel read() {pipeline.read(); // sign_c_004return this;}protected abstract class AbstractUnsafe implements Unsafe {// sign_c_008@Overridepublic final void beginRead() {...try {doBeginRead(); // sign_m_030 开始读} ... // catch }}
监听连接
  • 参考:选择器-监听-原理

这篇关于Netty-ServerBootstrap-监听的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007290

相关文章

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

vue如何监听对象或者数组某个属性的变化详解

《vue如何监听对象或者数组某个属性的变化详解》这篇文章主要给大家介绍了关于vue如何监听对象或者数组某个属性的变化,在Vue.js中可以通过watch监听属性变化并动态修改其他属性的值,watch通... 目录前言用watch监听深度监听使用计算属性watch和计算属性的区别在vue 3中使用watchE

linux定时监听ssh服务是否启动-------麒麟操作系统永久关闭swap

linux监听ssh服务是否启动 1、监听脚本2、定时任务3、麒麟操作系统,永久关闭swap 1、监听脚本 #在/usr/local/bin目录下新建脚本文件 cd /usr/local/bintouch check_sshd.sh#给可执行权限chmod +x /usr/local/bin/check_sshd.sh 脚本内容如下: #!/bin/bashs

【Netty】netty中都是用了哪些设计模式

对于工程师来说,掌握并理解运用设计模式,是非常重要的,但是除了学习基本的概念之外,需要结合优秀的中间件、框架源码学习其中的优秀软件设计,这样才能以不变应万变。 单例模式 单例模式解决的对象的唯一性,一般来说就是构造方法私有化、然后提供一个静态的方法获取实例。 在netty中,select用于处理CONTINUE、SELECT、BUSY_WAIT 三种策略,通过DefaultSelectStra

Java语言的Netty框架+云快充协议1.5+充电桩系统+新能源汽车充电桩系统源码

介绍 云快充协议+云快充1.5协议+云快充1.6+云快充协议开源代码+云快充底层协议+云快充桩直连+桩直连协议+充电桩协议+云快充源码 软件架构 1、提供云快充底层桩直连协议,版本为云快充1.5,对于没有对接过充电桩系统的开发者尤为合适; 2、包含:启动充电、结束充电、充电中实时数据获取、报文解析、Netty通讯框架、包解析工具、调试器模拟器软件等; 源码合作 提供完整云快充协议源代码

WebAPI(二)、DOM事件监听、事件对象event、事件流、事件委托、页面加载与滚动事件、页面尺寸事件

文章目录 一、 DOM事件1. 事件监听2. 事件类型(1)、鼠标事件(2)、焦点事件(3)、键盘事件(4)、文本事件 3. 事件对象(1)、获取事件对象(2)、事件对象常用属性 4. 环境对象 this5. 回调函数 二、 DOM事件进阶1. 事件流(1)、 捕获阶段(2)、 冒泡阶段(3)、 阻止冒泡(4) 、阻止元素默认行为(5) 、解绑事件 2. 事件委托3. 其他事件(1)、页面加

Netty源码解析9-ChannelHandler实例之MessageToByteEncoder

MessageToByteEncoder框架可见用户使用POJO对象编码为字节数据存储到ByteBuf。用户只需定义自己的编码方法encode()即可。 首先看类签名: public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter 可知该类只处理出站事件,切确的说是write事件

Netty源码解析8-ChannelHandler实例之CodecHandler

编解码处理器作为Netty编程时必备的ChannelHandler,每个应用都必不可少。Netty作为网络应用框架,在网络上的各个应用之间不断进行数据交互。而网络数据交换的基本单位是字节,所以需要将本应用的POJO对象编码为字节数据发送到其他应用,或者将收到的其他应用的字节数据解码为本应用可使用的POJO对象。这一部分,又和JAVA中的序列化和反序列化对应。幸运的是,有很多其他的开源工具(prot

Netty源码解析7-ChannelHandler实例之TimeoutHandler

请戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigData TimeoutHandler 在开发TCP服务时,一个常见的需求便是使用心跳保活客户端。而Netty自带的三个超时处理器IdleStateHandler,ReadTimeoutHandler和WriteTimeoutHandler可完美满足此需求。其中IdleSt