本文主要是介绍Netty初识Hello World 事件循环对象(EventLoop) 事件循环组 (EventLoopGroup),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
初始Netty-HelloWorld
Netty在网络通信中的地位就如同Spring框架在JavaEE开发中的地位。
基于Netty网络通信开发简易的服务端、客户端,以实现客户端向服务端发送hello world,服务端仅接收不返回数据。
服务端代码:
@Slf4j
public class HelloServer {public static void main(String[] args) {new ServerBootstrap() // 服务器端的启动器,负责组装netty组件,启动服务器// Group组,BossEventLoop,WorkerEventLoop(selector, thread).group(new NioEventLoopGroup()) // 1.channel(NioServerSocketChannel.class) // 2// boss 负责处理连接,worker(child)负责处理读写,决定了worker(child)能执行哪些操作(handler).childHandler(// channel代表和客户端进行数据读写的通道 Initializer初始化,负责添加别的handlernew ChannelInitializer<NioSocketChannel>() { // 3protected void initChannel(NioSocketChannel ch) {// 添加具体的handler, StringDecoder字符串解码,将传输的ByteBuf转换为字符串ch.pipeline().addLast(new StringDecoder()); // 5ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6,自定义handler@Override // 读事件protected void channelRead0(ChannelHandlerContext ctx, String msg) {// 打印上一步转换的字符串System.out.println(msg);}});}}).bind(8080); // 4,绑定监听端口}
}
客户端代码:
public class HelloClient {public static void main(String[] args) throws InterruptedException {new Bootstrap() // 创建启动器// 添加EventLoop.group(new NioEventLoopGroup()) // 1// 选择客户端的channel.channel(NioSocketChannel.class) // 2// 添加处理器.handler(new ChannelInitializer<Channel>() { // 3@Override // 在连接建立后被调用protected void initChannel(Channel ch) {// 对发送的数据进行编码ch.pipeline().addLast(new StringEncoder()); // 8}})// 连接到服务器.connect("127.0.0.1", 8080) // 4//.sync() // 5.channel() // 6.writeAndFlush(new Date() + ": hello world!"); // 7 向服务器发送数据}
}
主要执行流程:
可以理解如下:
- 把 channel 理解为数据的通道
- 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline (流水线)的加工,会变成其它类型对象,最后输出又变成 ByteBuf
- 把 handler 理解为数据的处理工序
- 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler 分 Inbound 和 Outbound 两类
- 把 eventLoop 理解为处理数据的工人
- 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
- 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人
EventLoop详解
EventLoop基础知识
事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
EventLoop使用示例
// 创建事件循环组
// NioEventLoopGroup可以处理io事件、普通任务、定时任务
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
// DefaultEventLoopGroup可以处理普通任务、定时任务
EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();
创建事件循环组时,若不指定线程个数,则创建个数为机器可用CPU * 2,若指定个数,则按照指定个数进行创建。
// 处理普通任务eventLoopGroup.next().submit(()->{ // 或者使用.execute()try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.info("ok");});// 执行定时任务eventLoopGroup.next().scheduleAtFixedRate(()->{log.info("test schedule task ...");}, 0, 1, TimeUnit.SECONDS);
使用NioEventLoopGroup处理IO事件
采用多个NioEventLoopGroup来分别处理不同的事件,如boss专门处理accept事件,worker专门处理read事件。并且对于耗时较长的handler交给专门的EventLoopGroup来处理,从而不阻塞原有的NioEventLoopGroup监听事件的正常运行。
优化一:
上述图片中第23行为初始代码,一个NioEventLoopGroup处理所有的事件,包括accept、read
等,显然不符合各司其职的功能,将其优化为第24行所示,明确NioEventLoopGroup负责处理的
事件类别。
1.boss只负责NioServerSocketChannel上的accept事件。
2.worker负责NioSocketChannel上的read事件。
优化二:
对于比较耗时的handler,可以将其将给其他EventLoopGroup创建handler来执行。
服务端代码:
@Slf4j
public class EventLoopServer {public static void main(String[] args) {// 2.细分:如果某个handler执行事件比较长,可以独立出一个eventloopgroup进行处理EventLoopGroup group = new DefaultEventLoopGroup();new ServerBootstrap()// 1.进行优化,明确eventloop负责处理的事件类别,boss和worker// boss只负责NioServerSocketChannel上的accept事件,worker负责NioSocketChannel上的read事件// 是否需要指定第一个负责accept的NioEventLoopGroup的适量,不需要,只会有一个NioServerSocketChannel
// .group(new NioEventLoopGroup()).group(new NioEventLoopGroup(), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// 转为toString的时候需要指定字符集,此处选择使用默认字符集// 若在网络通信中,需要客户端和服务器协商字符集的使用,使用同一个标准进行处理log.info(buf.toString(Charset.defaultCharset()));ctx.fireChannelRead(msg); // 将消息传递给下一个handler}}).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;log.info(buf.toString(Charset.defaultCharset()));}});}}).bind(8080);}
}
客户端代码:
@Slf4j
public class EventLoopClient {public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap() // 创建启动器// 添加EventLoop.group(new NioEventLoopGroup()) // 1// 选择客户端的channel.channel(NioSocketChannel.class) // 2// 添加处理器.handler(new ChannelInitializer<Channel>() { // 3@Override // 在连接建立后被调用protected void initChannel(Channel ch) {// 对发送的数据进行编码ch.pipeline().addLast(new StringEncoder()); // 8}})// 连接到服务器.connect("127.0.0.1", 8080) // 4//.sync() // 5.channel();// 6
// for (int i = 0; i < 5; i++) {
// channel.writeAndFlush("Hello eventloop_" + i + " ");
// }log.info("{}",channel);System.out.println("");}
}
EventLoop、Channel和Handler之间的关系
建立连接之后,channel和一个EventLoop进行绑定,并且一个线程可以管理多个Channel。
同一个Channel绑定多个不同的EventLoop(也就不同EventLoopGroup中对应的EventLoop),此时如何进行不同handler之间的切换。

- 如果两个 handler 绑定的是同一个线程,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程EventExecutor executor = next.executor();// 是,直接调用if (executor.inEventLoop()) {next.invokeChannelRead(m);} // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}}
这篇关于Netty初识Hello World 事件循环对象(EventLoop) 事件循环组 (EventLoopGroup)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!