来聊聊ChannelHandler

2023-11-23 16:48
文章标签 聊聊 channelhandler

本文主要是介绍来聊聊ChannelHandler,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

上一篇文章来聊聊Netty使用不当导致的并发波动问题我们了解使用Netty时跨线程使用不当导致性能问题,这篇我们就基于几个经典的生产案例了解一下ChannelHandler的基本使用和工作机制。

ChannelHandler的生命周期

ChannelHandler生命周期基本示例

要了解netty中逻辑处理器ChannelHandler的生命周期,我们不妨写一个完整的逻辑处理器,并重写其中每一个生命周期的方法,以笔者为例,封装了一个LifeCyCleHandler 并继承ChannelInboundHandlerAdapter ,并输出每个生命周期的调用。

/*** Channel生命周期调试代码*/
public class LifeCyCleHandler extends ChannelInboundHandlerAdapter {@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("调用handlerAdded,逻辑处理器被添加");super.handlerAdded(ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("调用channelRegistered,逻辑处理器被添加到NIO现成上");super.channelRegistered(ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("调用channelActive,连接准备就绪");super.channelActive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("执行channelRead,此时有数据可读");super.channelRead(ctx, msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("调用channelReadComplete,数据阅读完成");super.channelReadComplete(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel被关闭,调用channelInactive");super.channelInactive(ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("NIO线程解绑该channel,调用channelUnregistered");super.channelUnregistered(ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("执行handlerRemoved,逻辑处理器被移除");super.handlerRemoved(ctx);}
}

完成将其挂到NettyServer 上。

public class NettyServer {public static void main(String[] args) {// 启动一个netty服务端需要指定 线程模型 IO模型 业务处理逻辑// 引导类负责引导服务端启动工作ServerBootstrap serverBootstrap = new ServerBootstrap();// 以下两个对象可以看做是两个线程组// 负责监听端口,接受新的连接NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);// 负责处理每一个连接读写的线程组NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);// 配置线程组并指定NIO模型serverBootstrap.group(bossGroup, workerGroup)//设置IO模型,这里为NioServerSocketChannel,建议Linux服务器使用 EpollServerSocketChannel.channel(NioServerSocketChannel.class)// 定义后续每个连接的数据读写,对于业务处理逻辑.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new LifeCyCleHandler());}});bind(serverBootstrap, 9191);}/*** 以端口号递增的形式尝试绑定端口号*/private static void bind(ServerBootstrap serverBootstrap, int port) {// bind 方法是异步的,为其添加监听器serverBootstrap.bind(port);}
}

随后我们启动服务端和客户端,可以看到下面这样的输出结果:

调用handlerAdded,逻辑处理器被添加
调用channelRegistered,逻辑处理器被添加到NIO线程上
调用channelActive,连接准备就绪

客户端和服务端建立连接并发送数据到服务端:

执行channelRead,此时有数据可读
调用channelReadComplete,数据阅读完成

然后我们再将客户端关闭,又会出现下面这段输出结果:

channel被关闭,调用channelInactive
NIO线程解绑该channel,调用channelUnregistered
执行handlerRemoved,逻辑处理器被移除

于是,我们可以得出这样的结论:

  1. 检测到新连接时,当前channel会绑定一个新的业务处理器,绑定完成后回调执行handlerAdded,表示成功将当前业务处理器绑定到线程上。
  2. 所有业务处理器都绑定到逻辑链上,Netty线程池中的一个线程和当前这个channel绑定,执行channelRegistered。
  3. 上述准备工作都完成了,准备激活链接,成功后回调channelActive。
  4. 客户端向服务端发送数据,执行channelRead。
  5. 服务端将这些数据读取完成,执行channelReadComplete。
  6. 假如我们将客户端关闭,即对于TCP层面来说,它已经不是establish状态了,方法回调channelInactive。
  7. NIO线程解绑该channel,调用channelUnregistered,并将线程归还到NioEventLoop。
  8. channelHandler从该channel上移除,执行handlerRemoved。

小结一下,对于ChannelHandler生命周期的执行链,大抵如下图所示:

在这里插入图片描述

ChannelHandler源码解析

要了解整个生命周期的执行步骤,我们不妨在下面这段代码,以及各个生命周期的方法上打一个断点:

 // 指定线程组bootstrap.group(workerGroup)//指定NIO模型.channel(NioSocketChannel.class)// IO处理逻辑.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ClientHandler());}});

在启动服务端,并且客户端与服务端建立连接后,DefaultChannelPipeline回调用callHandlerAdded0,调用handlerAdded,此时就会执行到我们编写的handlerAdded。

 private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.handler().handlerAdded(ctx);ctx.setAddComplete();} catch (Throwable t) {//略}}

代码AbstractChannel执行下面这段方法,即表明NioEventLoop轮询查到有新的连接进来于是拿出一个线程处理该任务:

 eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});

不如register0,我们可以看到这样一段代码,它会从逻辑链中找到所有channelRegistered的回调,并执行。

 pipeline.fireChannelRegistered();

随后NioEventLoop线程会检查这个channel是否建立连接以及是否是第一次注册,如果为true则从当前channel的逻辑链中找到业务处理器并调用channelActive。

				//如果连接已经建立,则调用fireChannelActive回调channelActiveif (isActive()) {//是否是第一次注册if (firstRegistration) {pipeline.fireChannelActive();} }

后续的处理大致与上述一致,即NioEventLoop调用 processSelectedKeys();轮询是否有新事件,如果得到事件则判断事件类型,然后调用相应回调方法,这里就不多做赘述了。

ChannelHandler各个生命周期使用技巧

由上述我们可知不同生命周期的使用技巧:

  1. 对于连接的申请和释放,我们建议使用handlerAdded和handlerRemoved。
  2. 对于单机连接数统计或者黑白名单的过滤,我们建议使用channelActive()方法与channelInActive()。
  3. 读取客户端的数据直接调用channelRead。
  4. 读取完成后,若需要发送数据我们建议在channelReadComplete发,当然简单做法可以在channelRead然后调用writeAndFlush直接将数据刷新到底层,这种做法在性能要求不高的情况下可以使用,假如对性能要求较高的情况下,我们建议先使用ctx.channel().write(),然后到channelReadComplete这个周期使用flush进行批量刷新到底层。

实践-使用ChannelHandler的热插拔实现客户端身份校验

简单的登录校验示例

在客户端向服务端发送消息前,我们要求客户端先登录,只有校验成功后才能发送消息,于是我们编写了这样一个工具,当校验用户发送的登录包成功后,我们回通过该工具将连接中设置一个登录成功的标识,后续校验就可以通过判断该channel是否有该标识判断是否登录成功。

public class LoginUtil {//设置为已登录public static void markAsLogin(Channel channel) {channel.attr(Attributes.LOGIN).set(true);}//设置为未登录public static void resetLogin(Channel channel) {channel.attr(Attributes.LOGIN).set(false);}//设置为已登录public static boolean hasLogin(Channel channel) {Boolean res = channel.attr(Attributes.LOGIN).get();return ObjectUtil.isNotEmpty(res) && !res.equals(false);}
}

基于这个工具,我们编辑一个逻辑处理器:

public class AuthHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (LoginUtil.hasLogin(ctx.channel())) {//用户未登录,连接关闭ctx.channel().close();return;}System.out.println("用户已登录,继续后续逻辑链处理");super.channelRead(ctx, msg);}
}

然后将其挂到服务端逻辑链上:

 // 配置线程组并指定NIO模型serverBootstrap.group(bossGroup, workerGroup)//设置IO模型,这里为NioServerSocketChannel,建议Linux服务器使用 EpollServerSocketChannel.channel(NioServerSocketChannel.class)// 定义后续每个连接的数据读写,对于业务处理逻辑.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline()//处理登录请求                       .addLast(new LoginRequestHandler())//判断用户是否登录,若登录则往下走.addLast(new AuthHandler()).addLast(new MessageRequestHandler()).addLast(new PacketEncoder());}});

于是我们将服务端和客户端启动后,一旦登录成功,服务端的执行过程如下,可以看到每一次发送消息都需要进行一次校验,假如我们用户登录一次会不断发送消息,那么这种校验过程是非常冗余的,有没有办法做到登录一次,仅校验一次,后续就无需校验了呢?

端口[9000]绑定成功!
用户已登录,继续后续逻辑链处理
服务端收到:123
用户已登录,继续后续逻辑链处理
服务端收到:123
用户已登录,继续后续逻辑链处理
服务端收到:123

热插拔校验示例

于是我们将AuthHandler 代码修改,可以看到当第一次校验成功后,直接当这条逻辑处理器删除。

public class AuthHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (LoginUtil.hasLogin(ctx.channel())) {//用户未登录,连接关闭ctx.channel().close();return;}System.out.println("用户已登录,继续后续逻辑链处理");//如果用户登录成功,则将当前处理器移除,避免后续登录期间的冗余校验ctx.channel().pipeline().remove(this);super.channelRead(ctx, msg);}
}

可以看到客户端登录时进行一次校验,后续的校验就不再进行AuthHandler 的校验了。

端口[9000]绑定成功!
用户已登录,继续后续逻辑链处理
服务端收到:123
服务端收到:123
服务端收到:123

Netty并发安全问题

多连接使用不同的业务处理器

第一个场景我们会有多个客户端发起连接,每个客户端连接都有独立的业务处理,eventLoop收到这些任务之后向服务端发起连接。
紧接着服务端收到这一个个连接,就会将全局共享变量+1。

在这里插入图片描述

先来看看服务端代码,一套标准的模板,监听9999端口,使用ThreadSecurityServerHandler作为业务处理器。

public class ThreadSecurityServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();//业务处理器p.addLast(new ThreadSecurityServerHandler());}});//监听9999端口ChannelFuture f = b.bind(9999).sync();f.channel().closeFuture().sync();f.channel().closeFuture().addListener((future) -> {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();});}

而业务处理器代码也很简单,收到客户端的请求后就把sum+1。

public class ThreadSecurityServerHandler extends ChannelInboundHandlerAdapter {private static int sum1 = 0;//每次收到客户端的请求就+1public void channelRead(ChannelHandlerContext ctx, Object msg) {sum1 = sum1 + 1;System.out.println("Server receive client message :" + sum1);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

完成服务端编码后,我们继续完成客户端的编码,代码如下我们完成模板创建之后,连续进行100次异步连接,这就意味服务端就对sum进行100次自增。

public class NoThreadSecurityClient {public void connect() throws Exception {EventLoopGroup group = new NioEventLoopGroup(8);Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {//ch.pipeline().addLast(sharableClientHandler);ch.pipeline().addLast(new NoThreadSecurityClientHandler());}});//异步进行100次连接工作ChannelFuture f = null;for (int i = 0; i < 100; i++) {f = b.connect("127.0.0.1", 9999).sync();}f.channel().closeFuture().sync();f.channel().closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {group.shutdownGracefully();}});}public static void main(String[] args) throws Exception {new NoThreadSecurityClient().connect();}
}

业务处理逻辑如下,即连接建立时随便写一点消息

public class NoThreadSecurityClientHandler extends ChannelInboundHandlerAdapter {static final int MSG_SIZE = 256;/*** 一建立连接就发送256字节的数据* @param ctx*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf firstMessage = Unpooled.buffer(MSG_SIZE);for (int i = 0; i < firstMessage.capacity(); i++) {firstMessage.writeByte((byte) i);}ctx.writeAndFlush(firstMessage);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

完成后我们将项目启动查看服务端输出结果,可以看到输出结果不到100,很明显当前是存在线程安全问题。

在这里插入图片描述

原因也很简单,我们的服务端代码中声明childGroup时用到了下面的定义,这就意味着服务端处理客户端连接是采用多线程的,而多线程操作同一个static变量是存在线程安全问题的。

 EventLoopGroup workerGroup = new NioEventLoopGroup();

所以改造方式有两种,要么将childGroup线程数改为1。

 EventLoopGroup workerGroup = new NioEventLoopGroup(1);

要么将sum改为原子类。

public class ThreadSecurityServerHandler extends ChannelInboundHandlerAdapter {private static AtomicInteger sum = new AtomicInteger(0);//每次收到客户端的请求就+1public void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("Server receive client message :" + sum.addAndGet(1));}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

经过上述两种方式改造后线程安全问题都能解决。

多连接使用共性处理器

在看一个例子,多个客户端公用一个业务处理向服务端建立连接,当连接超过1000个的时候就不发消息了。

在这里插入图片描述

了解需求后,我们就可以开始编码了,服务端模板代码和上述一样就不多赘述了,唯一改变的就是业务处理加了个发送消息的逻辑。

public class ThreadSecurityServerHandler extends ChannelInboundHandlerAdapter {private static AtomicInteger sum = new AtomicInteger(0);
//    private static int sum1 = 0;//每次收到客户端的请求就+1public void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("Server receive client message :" + sum.addAndGet(1));ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

这里我们看看就介绍一下客户端代码,可以看到客户端启动类的整体逻辑没变,只不过业务处理器变为SharableClientHandler且对所有连接共享。

public class NoThreadSecurityClient {public void connect() throws Exception {EventLoopGroup group = new NioEventLoopGroup(8);SharableClientHandler sharableClientHandler = new SharableClientHandler();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(sharableClientHandler);}});//异步进行100次连接工作ChannelFuture f = null;for (int i = 0; i < 100; i++) {f = b.connect("127.0.0.1", 9999).sync();}f.channel().closeFuture().sync();f.channel().closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {group.shutdownGracefully();}});}public static void main(String[] args) throws Exception {new NoThreadSecurityClient().connect();}
}

然后我们再来看看业务处理器的代码,很简单,加了Sharable注解起到共享作用,每次收到服务端的请求之后自增一下,到10000次后就不发消息了。

@ChannelHandler.Sharable
public class SharableClientHandler extends ChannelInboundHandlerAdapter {int counter1 = 0;
//	AtomicInteger counter = new AtomicInteger(0);static final int MSG_SIZE = 256;@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf firstMessage = Unpooled.buffer(MSG_SIZE);for (int i = 0; i < firstMessage.capacity(); i++) {firstMessage.writeByte((byte) i);}ctx.writeAndFlush(firstMessage);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf req = (ByteBuf) msg;System.out.println("client counter=" + counter1);if (counter1++ <= 10000)ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

最终线程安全问题出现了,多个连接共享一个业务处理器就和上图一样存在安全问题。

在这里插入图片描述

解决方式也很简单,和服务端代码一样,使用原子类即可解决,改造后的代码如下所示,读者可以自行查看输出结果,确实没有重复的数字。

@ChannelHandler.Sharable
public class SharableClientHandler extends ChannelInboundHandlerAdapter {
//    int counter1 = 0;AtomicInteger counter = new AtomicInteger(0);static final int MSG_SIZE = 256;@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf firstMessage = Unpooled.buffer(MSG_SIZE);for (int i = 0; i < firstMessage.capacity(); i++) {firstMessage.writeByte((byte) i);}ctx.writeAndFlush(firstMessage);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf req = (ByteBuf) msg;System.out.println("client counter=" + counter.addAndGet(1));if (counter.get() <= 10000)ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

小结

上述我们介绍了两个Netty关于服务端和客户端的并发安全问题,这里我们给出两个建议:

  1. 非必要不要共享一个业务处理器。
  2. 如果业务处理器非要共享,请对共享变量做好并发控制。

Netty并发失效问题

错误代码示例

了解了ChannelHanlder线程安全问题之后,我们再来聊一聊ChannelHanlder的并发问题,再抛出问题之前我们不妨看一个例子,我们现在有一个客户端,通过channel建立连接之后,会连续发送100个消息,服务端收到后简单处理一下,即释放空间。

在这里插入图片描述

大概了解需求之后,我们不妨看看客户端的代码的启动类,可以看到这个代码就是标准的模板代码,通过异步的方式和服务端的9999端口建立连接。

/*** 单客户端发送多个消息*/
public class ConcurrentPerformanceClient {static final int MSG_SIZE = 256;public void connect() throws Exception {//一个线程EventLoopGroup group = new NioEventLoopGroup(1);Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {//业务处理器,会连发100次消息ch.pipeline().addLast(new ConcurrentPerformanceClientHandler());}});ChannelFuture f = b.connect("127.0.0.1", 9999).sync();f.channel().closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {group.shutdownGracefully();}});}public static void main(String[] args) throws Exception {new ConcurrentPerformanceClient().connect();}
}

再看看业务处理器,每次和服务端建立连接后就会调用channelActive方法,每隔1s发送100个消息。

public class ConcurrentPerformanceClientHandler extends ChannelInboundHandlerAdapter {static ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();@Overridepublic void channelActive(ChannelHandlerContext ctx) {//定时任务 每秒发送100个消息scheduledExecutorService.scheduleAtFixedRate(() -> {//发送100次消息for (int i = 0; i < 100; i++) {//组装消息并发送ByteBuf firstMessage = Unpooled.buffer(ConcurrentPerformanceClient.MSG_SIZE);for (int k = 0; k < firstMessage.capacity(); k++) {firstMessage.writeByte((byte) k);}ctx.writeAndFlush(firstMessage);}}, 0, 1000, TimeUnit.MILLISECONDS);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

以上便是客户端的代码,我们再来看看服务端的代码,为了更好的监控单一客户端连接channel的情况我们把主从reactor线程数都设置为1,然后创建一个包含100个线程的DefaultEventExecutorGroup处理业务handler。

public class ConcurrentPerformanceServer {static final EventExecutorGroup executor = new DefaultEventExecutorGroup(100);public static void main(String[] args) throws Exception {//主从reactorEventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(1);ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.config().setAllocator(UnpooledByteBufAllocator.DEFAULT);ChannelPipeline p = ch.pipeline();//服务端业务处理器p.addLast(executor, new ConcurrentPerformanceServerHandler());}}).childOption(ChannelOption.SO_RCVBUF, 8 * 1024).childOption(ChannelOption.SO_SNDBUF, 8 * 1024);ChannelFuture f = b.bind(9999).sync();f.channel().closeFuture().addListener((ChannelFutureListener) future -> {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();});}
}

然后再来看看业务处理器,逻辑很简单大概做的就是统计qps,每次建立连接时就输出原子类counter 的值,而该值的变化则是在channelRead时发生自增。

为什么这么做呢?原因很简单,netty每次收到一个消息就会调用channelRead方法,所以我们用该方法作为计数统计qps最合适不过。有了计数之后,自然是需要监控,而netty服务端每次和客户端建立连接时都会调用channelActive方法,所以用该方法输出上一次处理的消息的counter最合适不过。由此我们得出了下面这样一段写法。

public class ConcurrentPerformanceServerHandler extends ChannelInboundHandlerAdapter {static AtomicInteger counter = new AtomicInteger(0);static ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();/*** 每次建立了连接就统计qps然后清零* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {scheduledExecutorService.scheduleAtFixedRate(() -> {int qps = counter.getAndSet(0);System.out.println("The server QPS is : " + qps);}, 0, 1000, TimeUnit.MILLISECONDS);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {((ByteBuf) msg).release();//没收到一个消息原子类计数器就+1counter.incrementAndGet();//业务逻辑处理,模拟业务访问DB、缓存等,时延从100-1000毫秒之间不等Random random = new Random();try {TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));} catch (Exception e) {e.printStackTrace();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

问题复现

了解了代码整体流程之后,我们不妨将服务端和客户端都跑起来。从输出结果来看,qps的值基本都在10以内,这是为什么呢?

在这里插入图片描述

我们使用jvisualvm查看了一下服务端的线程数,可以看到处理客户端消息的线程永远只有一个,等于说我们的并发配置是失效的。

在这里插入图片描述

源码分析

为了了解出现并发失效问题的原因,我们不妨通过源码找一下答案。所以我们在挂业务处理的代码处插一个断点。然后启动服务端和客户端,代码就会走到这个断点。

在这里插入图片描述

我们首先会步入last方法,它会做以下几件事:

  1. 遍历处理器,调用addLast添加处理器。
  2. 完成后返回当前pieple。

因为我们想看看线程池处理发生了什么,我们就必须看看线程池在此期间做了什么事,我们步入addLast方法看看。

 @Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {if (handlers == null) {throw new NullPointerException("handlers");}for (ChannelHandler h: handlers) {if (h == null) {break;}addLast(executor, null, h);}return this;}

走到addLast,我们看到我们的线程池首先被newContext方法调用了,我们不妨步入看看它做了什么,

@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);......return this;}

步入之后我们会看到newContext方法会调用childExecutor会线程池进行一些处理工作,这些操作都和线程池相关,所以我们都需要步入看看。

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}

最终代码走到了这里,结果笔者调试发现在该代码首先会创建一个map,该map专用于绑定通道对应的通道事件,这就使得我们上述的channel都会被我们所创建的线程池DefaultEventExecutorGroup其中的一个执行器绑定,所以无论客户端发送多少个消息,只要是同一个chanel的连接,都会只用一个线程处理。

private EventExecutor childExecutor(EventExecutorGroup group) {....略Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;if (childExecutors == null) {// 创建一个childExecutors 的map,使用4的大小,因为大多数人只使用一个额外的EventExecutor。childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);}//将其中一个子执行器固定一次并记住它,以便使用相同的子执行器,来激发同一通道的事件。EventExecutor childExecutor = childExecutors.get(group);if (childExecutor == null) {//从中取出一个线程,和通道事件绑定childExecutor = group.next();childExecutors.put(group, childExecutor);}return childExecutor;}

返回这个单一执行器之后,DefaultChannelHandlerContext方法就会拿着这个executor封装成一个上下文handler。

DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, isInbound(handler), isOutbound(handler));//我们的执行器被服装成一个上下文的handlerthis.handler = handler;}

最终我们再次回到addLast方法,可以看到完成执行器封装后,它会将这个线程池转为单一线程的去处理用户的发送的消息,由于该业务处理器已经在上文IdentityHashMap<EventExecutorGroup, EventExecutor>(4)这个map中和单一执行器绑定,所以后续当前通道无论发送多少事件都会由这个线程处理。

在这里插入图片描述

解决方案

由此可知在单一连接场景下,服务端永远只有一个执行器负责,对于我们这种连接少,消息多大部分处于业务逻辑的IO消耗中,所以我们需要在通道读取到事件之后,将消息处理的逻辑放到异步线程池中。

所以这里我们需要修改一下业务处理器,修改后的代码如下,可以看到,我们创建了一个线程池executorService ,每当收到消息之后,都会将消息提交到业务线程池中,确保netty的线程可以尽可能多接收单一通道的消息.

public class ConcurrentPerformanceServerHandlerV2 extends ChannelInboundHandlerAdapter {static AtomicInteger counter = new AtomicInteger(0);static ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();static final ExecutorService executorService = Executors.newFixedThreadPool(100);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {scheduledExecutorService.scheduleAtFixedRate(() -> {int qps = counter.getAndSet(0);System.out.println("The server v2 QPS is : " + qps);}, 2, 1000, TimeUnit.MILLISECONDS);}public void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("工作线程:" + Thread.currentThread().getName());((ByteBuf) msg).release();//将IO任务提交到业务线程池中处理,确保netty的线程可以尽可能多接收单一通道的消息executorService.execute(() -> {counter.incrementAndGet();//业务逻辑处理,模拟业务访问DB、缓存等,时延从100-1000毫秒之间不等Random random = new Random();try {TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));} catch (Exception e) {e.printStackTrace();}});}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt == SslHandshakeCompletionEvent.SUCCESS) {//执行流控逻辑}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

经过修改之后我们查看qps,瞬间提升了不少,系统运行稳定的情况下,qps基本可以达到100.

在这里插入图片描述

多通道型ChannelHanlder并发优化

了解上述单通道多消息类型的使用场景,我们不妨再来看看多通道
,单服务端的场景。如下图所示,该场景单位时间内会有100个并发的连接请求,建立连接后同时向服务端发送1条消息。同样的,我们也希望能qps能够达到100。

在这里插入图片描述

对此,我们不妨来看看客户端启动类的代码,如下所示,可以看到模板代码后,启动类会建立100个连接。

/*** 多并发channel连接服务端*/
public class MulChannelPerformanceClient {static final int MSG_SIZE = 256;public void connect() throws Exception {EventLoopGroup group = new NioEventLoopGroup(8);Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {//模拟100分并发发消息ch.pipeline().addLast(new ConcurrentPerformanceClientHandlerV2());}});ChannelFuture f = null;// 100个异步连接到服务端for (int i = 0; i < 100; i++) {f = b.connect("127.0.0.1", 9999).sync();}f.channel().closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {future.channel().close();}});}public static void main(String[] args) throws Exception {new MulChannelPerformanceClient().connect();}
}

而每个客户端建立连接之后,就会创建一个256字节的数据,每个1s发送一次给服务端。

public class ConcurrentPerformanceClientHandlerV2 extends ChannelInboundHandlerAdapter {static ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();@Overridepublic void channelActive(ChannelHandlerContext ctx) {scheduledExecutorService.scheduleAtFixedRate(() -> {ByteBuf firstMessage = Unpooled.buffer(ConcurrentPerformanceClient.MSG_SIZE);for (int k = 0; k < firstMessage.capacity(); k++) {firstMessage.writeByte((byte) k);}ctx.writeAndFlush(firstMessage);}, 0, 1000, TimeUnit.MILLISECONDS);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

可以看到这种场景和上一个案例的区别在于,当前场景是连接数多,且每个连接的耗时在100ms-1000ms不等,所以我们服务端的业务处理就需要解决收包的问题了。

这样一来,我们最初的代码就派上用场了,服务端启动类的代码就可以改为下面这种形式了,将workerGroup设置为默认的CPU核心的两倍。

并且创建一个DefaultEventExecutorGroup和业务处理器绑定,这样一来,从Reactor即我们下面声明的workerGroup就有足够的线程处理任务,而当前通道事件就会和DefaultEventExecutorGroup绑定,从而实现确保多并发连接channel的情况下,有足够且合理的线程处理任务。

public class ConcurrentPerformanceServer {static final EventExecutorGroup executor = new DefaultEventExecutorGroup(100);public static void main(String[] args) throws Exception {//主从reactorEventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.config().setAllocator(UnpooledByteBufAllocator.DEFAULT);ChannelPipeline p = ch.pipeline();//服务端业务处理器p.addLast(executor, new ConcurrentPerformanceServerHandler());}}).childOption(ChannelOption.SO_RCVBUF, 8 * 1024).childOption(ChannelOption.SO_SNDBUF, 8 * 1024);ChannelFuture f = b.bind(9999).sync();f.channel().closeFuture().addListener((ChannelFutureListener) future -> {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();});}
}

注意,此时连接是并发的,所以我们统计qps不在使用channelActive方法了,改用静态代码块统计所有并发线程的任务的channelRead事件。

public class ConcurrentPerformanceServerHandler extends ChannelInboundHandlerAdapter {static AtomicInteger counter = new AtomicInteger(0);static ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();/*** 客户端多个channel并发连接过来的,所以我们不能用channelActive进行统计*/static {scheduledExecutorService.scheduleAtFixedRate(() -> {int qps = counter.getAndSet(0);System.out.println("The server QPS is : " + qps);}, 0, 1000, TimeUnit.MILLISECONDS);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {((ByteBuf) msg).release();//没收到一个消息原子类计数器就+1counter.incrementAndGet();//业务逻辑处理,模拟业务访问DB、缓存等,时延从100-1000毫秒之间不等Random random = new Random();try {TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));} catch (Exception e) {e.printStackTrace();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

完成后将服务端和客户端代码都启动,可以看到qps基本可以达到预期。

在这里插入图片描述

同样的,使用jvisualvm可以看到workerGroup也是多线程的调度将任务分配到对应的piepeline上交由该piepeline上的handler处理。

在这里插入图片描述

而每个客户端都有独立的piepeline,plepeline中的handler共享一个线程池,这就使得每个客户端read事件就绪之后,就会将业务处理器的任务提交到我们的业务线程池中,如下图

在这里插入图片描述

所以我们的业务线程池中的100个线程池都会利用到了,qps自然就达到我们所预期的100。

在这里插入图片描述

小结

对此我们不妨小结一下,不同场景下ChannelHanlder的用法:

  1. 对于客户端并发连接数不多,但是每个客户端channel业务请求阻塞较长的,我们建议在业务处理时,将耗时的地方提交到业务线程池中。
  2. 对于客户端并发连接数多,但channel阻塞不耗时的场景,我们只需按照机器性能调整好业务处理器对应的DefaultEventExecutorGroup即可。

参考文献

Java性能调优 6步实现项目性能升级

这篇关于来聊聊ChannelHandler的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/419336

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

Netty源码解析9-ChannelHandler实例之MessageToByteEncoder

MessageToByteEncoder框架可见用户使用POJO对象编码为字节数据存储到ByteBuf。用户只需定义自己的编码方法encode()即可。 首先看类签名: public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter 可知该类只处理出站事件,切确的说是write事件

Netty源码解析8-ChannelHandler实例之CodecHandler

编解码处理器作为Netty编程时必备的ChannelHandler,每个应用都必不可少。Netty作为网络应用框架,在网络上的各个应用之间不断进行数据交互。而网络数据交换的基本单位是字节,所以需要将本应用的POJO对象编码为字节数据发送到其他应用,或者将收到的其他应用的字节数据解码为本应用可使用的POJO对象。这一部分,又和JAVA中的序列化和反序列化对应。幸运的是,有很多其他的开源工具(prot

Netty源码解析7-ChannelHandler实例之TimeoutHandler

请戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigData TimeoutHandler 在开发TCP服务时,一个常见的需求便是使用心跳保活客户端。而Netty自带的三个超时处理器IdleStateHandler,ReadTimeoutHandler和WriteTimeoutHandler可完美满足此需求。其中IdleSt

Netty源码解析6-ChannelHandler实例之LoggingHandler

LoggingHandler 日志处理器LoggingHandler是使用Netty进行开发时的好帮手,它可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。首先看类签名: @Sharablepublic class LoggingHandler extends ChannelDuplexHandler 注解Sharable说明LoggingHandler没有状态相关变量,

Netty源码解析5-ChannelHandler

ChannelHandler并不处理事件,而由其子类代为处理:ChannelInboundHandler拦截和处理入站事件,ChannelOutboundHandler拦截和处理出站事件。ChannelHandler和ChannelHandlerContext通过组合或继承的方式关联到一起成对使用。事件通过ChannelHandlerContext主动调用如fireXXX()和write(msg)

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服