Netty Review - 探索Pipeline的Inbound和Outbound

2023-12-01 03:12

本文主要是介绍Netty Review - 探索Pipeline的Inbound和Outbound,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 概念
  • Server Code
  • Client Code
  • InboundHandler和OutboundHandler的执行顺序
    • 在InboundHandler中不触发fire方法
    • InboundHandler和OutboundHandler的执行顺序
    • 如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗

在这里插入图片描述


概念

在这里插入图片描述

我们知道boss线程监控到绑定端口上有accept事件,此时会为该socket连接实例化Pipeline,并将InboundHandlerOutboundHandler按序加载到Pipeline中,然后将该socket连接(也就是Channel对象)挂载到selector

一个selector对应一个线程,该线程会轮询所有挂载在他身上的socket连接有没有readwrite事件,然后通过线程池去执行Pipeline的业务流

selector如何查询哪些socket连接有readwrite事件,主要取决于调用操作系统的哪种IO多路复用内核

  • 如果是select注意,此处的select是指操作系统内核的select IO多路复用,不是nettyseletor对象),那么将会遍历所有socket连接,依次询问是否有readwrite事件,最终操作系统内核将所有IO事件的socket连接返回给netty进程,当有很多socket连接时,这种方式将会大大降低性能,因为存在大量socket连接的遍历和内核内存的拷贝
  • 如果是epoll,性能将会大幅提升,因为它基于完成端口事件,已经维护好有IO事件的socket连接列表,selector直接取走,无需遍历,也少掉内核内存拷贝带来的性能损耗

Netty中,InboundOutbound是两个重要的概念,用于描述数据在ChannelPipeline中的流动方向。

Inbound(入站)指的是数据从网络传输到应用程序,即数据从远程主机进入本地主机。在ChannelPipeline中,Inbound数据会依次经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的末尾。

Outbound(出站)指的是数据从应用程序传输到网络,即数据从本地主机发送到远程主机。在ChannelPipeline中,Outbound数据会从Pipeline的末尾开始,逆序经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的起始位置。

InboundOutbound的区别在于数据的流动方向。Inbound数据是从网络进入应用程序,而Outbound数据是从应用程序发送到网络。这意味着Inbound数据是应用程序接收和处理外部数据的入口,而Outbound数据是应用程序发送数据到外部的出口。

虽然InboundOutbound描述了数据的不同流动方向,但它们之间也存在联系。在ChannelPipeline中,InboundOutbound数据可以相互影响和交互。例如,一个ChannelHandler可以在处理Inbound数据时生成Outbound数据作为响应,或者在处理Outbound数据时修改Inbound数据的内容。

总结起来,InboundOutbound是描述数据在ChannelPipeline中流动方向的概念。Inbound数据是从网络进入应用程序,Outbound数据是从应用程序发送到网络。它们在ChannelPipeline中相互影响和交互,共同实现网络数据的处理和传输。


Pipeline的责任链是通过ChannelHandlerContext对象串联的,ChannelHandlerContext对象里封装了ChannelHandler对象,通过prev和next节点实现双向链表。Pipeline的首尾节点分别是headtail,当selector轮询到socketread事件时,将会触发Pipeline责任链,从head开始调起第一个InboundHandlerChannelRead事件,接着通过fire方法依次触发Pipeline上的下一个ChannelHandler .

在这里插入图片描述

ChannelHandler分为InbounHandlerOutboundHandler

  • InboundHandler用来处理接收消息
  • OutboundHandler用来处理发送消息。

headChannelHandler既是InboundHandler又是OutboundHandler,无论是read还是write都会经过head,所以head封装了unsafe方法,用来操作socketreadwritetailChannelHandler只是InboundHandlerreadPipleline处理将会最终到达tail


演示之前,我们先附一下代码

Server Code

package com.artisan.pipeline.inout;import com.artisan.pipeline.inout.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class ArtisanEchoServer {private int port;public ArtisanEchoServer(int port) {this.port = port;}private void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoOutboundHandler3());ch.pipeline().addLast(new EchoOutboundHandler2());ch.pipeline().addLast(new EchoOutboundHandler1());ch.pipeline().addLast(new EchoInboundHandler1());ch.pipeline().addLast(new EchoInboundHandler2());ch.pipeline().addLast(new EchoInboundHandler3());}}).option(ChannelOption.SO_BACKLOG, 10000).childOption(ChannelOption.SO_KEEPALIVE, true);System.out.println("EchoServer正在启动...");ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("EchoServer绑定端口:" + port);channelFuture.channel().closeFuture().sync();System.out.println("EchoServer已关闭.");} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) {int port = 1234;if (args != null && args.length > 0) {try {port = Integer.parseInt(args[0]);} catch (Exception e) {e.printStackTrace();}}ArtisanEchoServer server = new ArtisanEchoServer(port);server.run();}
}

6个handler演示如下

package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler1.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler1.channelRead 收到数据:" + data);ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler1 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler1.channelReadComplete]");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler1.exceptionCaught]" + cause.toString());}
}
package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler2.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler2.channelRead 接收到数据:" + data);//ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler2 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler2.channelReadComplete]读取数据完成");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler2.exceptionCaught]");}
}
package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler3.channelRead");String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler3.channelRead 接收到数据:" + data);//ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] " + data, CharsetUtil.UTF_8));ctx.fireChannelRead(msg);System.out.println("退出 EchoInboundHandler3 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler3.channelReadComplete]读取数据完成");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler3.exceptionCaught]");}}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler1.write");//ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler里测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.write(msg);System.out.println("退出 EchoOutboundHandler1.write");}}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler2.write");//ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8));ctx.write(msg);System.out.println("退出 EchoOutboundHandler2.write");}
}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler3.write");ctx.write(msg);System.out.println("退出 EchoOutboundHandler3.write");}}

Client Code

package com.artisan.netty4.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;/*** @author 小工匠* @version 1.0* @description: 客户端启动程序* @mark: show me the code , change the world*/
public class ArtisanClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();try {//创建bootstrap对象,配置参数Bootstrap bootstrap = new Bootstrap();//设置线程组bootstrap.group(eventExecutors)//设置客户端的通道实现类型.channel(NioSocketChannel.class)//使用匿名内部类初始化通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加客户端通道的处理器ch.pipeline().addLast(new ArtisanClientHandler());}});System.out.println("客户端准备就绪");//连接服务端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();//对通道关闭进行监听channelFuture.channel().closeFuture().sync();} finally {//关闭线程组eventExecutors.shutdownGracefully();}}
}
package com.artisan.netty4.client;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @description: 通用handler,处理I/O事件* @mark: show me the code , change the world*/
@ChannelHandler.Sharable
public class ArtisanClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//发送消息到服务端ctx.writeAndFlush(Unpooled.copiedBuffer("msg send from client 2 server ", CharsetUtil.UTF_8));System.out.println("客户端发消息给服务端结束");System.out.println();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收服务端发送过来的消息ByteBuf byteBuf = (ByteBuf) msg;System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}}

InboundHandler和OutboundHandler的执行顺序

在InboundHandler中不触发fire方法

ArtisanEchoServer#run 中我们先进存在InboundHandler

在这里插入图片描述

在这里插入图片描述

先启动server, 在启动Client,我们测试一下

在这里插入图片描述
在这里插入图片描述

我们可以看到: InboundHandler2没有调用fire事件,InboundHandler3没有被执行

InboundHandler是通过fire事件决定是否要执行下一个InboundHandler,如果InboundHandler没有调用fire事件,那么后续的Pipeline中的Handler将不会执行。

我们来看下源码

在这里插入图片描述


InboundHandler和OutboundHandler的执行顺序

在这里插入图片描述
加入Pipeline的ChannelHandler的顺序如上。

别忘了放开EchoInboundHandler2

 ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));

我们来验证下

在这里插入图片描述

执行顺序如上。

InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3

1、InboundHandler是按照Pipleline的加载顺序,顺序执行。

2、OutboundHandler是按照Pipeline的加载顺序,逆序执行。


如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗

在这里插入图片描述

其中EchoInboundHandler2 先不要给客户端发送数据,先屏蔽掉。

public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("进入 EchoInboundHandler2.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler2.channelRead 接收到数据:" + data);
//        ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler2 channelRead");}
.......
.......
.......

在这里插入图片描述

这篇关于Netty Review - 探索Pipeline的Inbound和Outbound的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/439575

相关文章

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI

Redis中管道操作pipeline的实现

《Redis中管道操作pipeline的实现》RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操... 目录什么是pipeline场景一:我要向Redis新增大批量的数据分批处理事务( MULTI/EXE

pip install jupyterlab失败的原因问题及探索

《pipinstalljupyterlab失败的原因问题及探索》在学习Yolo模型时,尝试安装JupyterLab但遇到错误,错误提示缺少Rust和Cargo编译环境,因为pywinpty包需要它... 目录背景问题解决方案总结背景最近在学习Yolo模型,然后其中要下载jupyter(有点LSVmu像一个

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出 在数字化时代,文本到语音(Text-to-Speech, TTS)技术已成为人机交互的关键桥梁,无论是为视障人士提供辅助阅读,还是为智能助手注入声音的灵魂,TTS 技术都扮演着至关重要的角色。从最初的拼接式方法到参数化技术,再到现今的深度学习解决方案,TTS 技术经历了一段长足的进步。这篇文章将带您穿越时

轻松录制每一刻:探索2024年免费高清录屏应用

你不会还在用一些社交工具来录屏吧?现在的市面上有不少免费录屏的软件了。别看如软件是免费的,它的功能比起社交工具的录屏功能来说全面的多。这次我就分享几款我用过的录屏工具。 1.福晰录屏大师 链接直达:https://www.foxitsoftware.cn/REC/  这个软件的操作方式非常简单,打开软件之后从界面设计就能看出来这个软件操作的便捷性。界面的设计简单明了基本一打眼你就会轻松驾驭啦

深入探索嵌入式 Linux

摘要:本文深入探究嵌入式 Linux。首先回顾其发展历程,从早期尝试到克服诸多困难逐渐成熟。接着阐述其体系结构,涵盖硬件、内核、文件系统和应用层。开发环境方面包括交叉编译工具链、调试工具和集成开发环境。在应用领域,广泛应用于消费电子、工业控制、汽车电子和智能家居等领域。关键技术有内核裁剪与优化、设备驱动程序开发、实时性增强和电源管理等。最后展望其未来发展趋势,如与物联网融合、人工智能应用、安全性与

【vue3|第28期】 Vue3 + Vue Router:探索路由重定向的使用与作用

日期:2024年9月8日 作者:Commas 签名:(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释:如果您觉在这里插入代码片得有所帮助,帮忙点个赞,也可以关注我,我们一起成长;如果有不对的地方,还望各位大佬不吝赐教,谢谢^ - ^ 1.01365 = 37.7834;0.99365 = 0.0255 1.02365 = 1377.4083;0.98365 = 0.0006 说

多云架构下大模型训练的存储稳定性探索

一、多云架构与大模型训练的融合 (一)多云架构的优势与挑战 多云架构为大模型训练带来了诸多优势。首先,资源灵活性显著提高,不同的云平台可以提供不同类型的计算资源和存储服务,满足大模型训练在不同阶段的需求。例如,某些云平台可能在 GPU 计算资源上具有优势,而另一些则在存储成本或性能上表现出色,企业可以根据实际情况进行选择和组合。其次,扩展性得以增强,当大模型的规模不断扩大时,单一云平

Jenkins--pipeline版本管理

为了提高脚本可维护性,更好的管理pipeline脚本,我们可以在项目配置中修改流水线定义,使用版本管理脚本,选择pipeline script from SCM: 我们看到现在SCM是无,因为还没有安装版本管理工具,先需要到插件管理中安装git。 安装后,在流水线设置的SCM中就能查看到Git: 在Repository URL中添加版本管理工具github或码云的仓库地址: 在Cred