Netty Review - 探索Pipeline的Inbound和Outbound

2023-12-01 03:12

本文主要是介绍Netty Review - 探索Pipeline的Inbound和Outbound,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 概念
  • Server Code
  • Client Code
  • InboundHandler和OutboundHandler的执行顺序
    • 在InboundHandler中不触发fire方法
    • InboundHandler和OutboundHandler的执行顺序
    • 如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗

在这里插入图片描述


概念

在这里插入图片描述

我们知道boss线程监控到绑定端口上有accept事件,此时会为该socket连接实例化Pipeline,并将InboundHandlerOutboundHandler按序加载到Pipeline中,然后将该socket连接(也就是Channel对象)挂载到selector

一个selector对应一个线程,该线程会轮询所有挂载在他身上的socket连接有没有readwrite事件,然后通过线程池去执行Pipeline的业务流

selector如何查询哪些socket连接有readwrite事件,主要取决于调用操作系统的哪种IO多路复用内核

  • 如果是select注意,此处的select是指操作系统内核的select IO多路复用,不是nettyseletor对象),那么将会遍历所有socket连接,依次询问是否有readwrite事件,最终操作系统内核将所有IO事件的socket连接返回给netty进程,当有很多socket连接时,这种方式将会大大降低性能,因为存在大量socket连接的遍历和内核内存的拷贝
  • 如果是epoll,性能将会大幅提升,因为它基于完成端口事件,已经维护好有IO事件的socket连接列表,selector直接取走,无需遍历,也少掉内核内存拷贝带来的性能损耗

Netty中,InboundOutbound是两个重要的概念,用于描述数据在ChannelPipeline中的流动方向。

Inbound(入站)指的是数据从网络传输到应用程序,即数据从远程主机进入本地主机。在ChannelPipeline中,Inbound数据会依次经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的末尾。

Outbound(出站)指的是数据从应用程序传输到网络,即数据从本地主机发送到远程主机。在ChannelPipeline中,Outbound数据会从Pipeline的末尾开始,逆序经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的起始位置。

InboundOutbound的区别在于数据的流动方向。Inbound数据是从网络进入应用程序,而Outbound数据是从应用程序发送到网络。这意味着Inbound数据是应用程序接收和处理外部数据的入口,而Outbound数据是应用程序发送数据到外部的出口。

虽然InboundOutbound描述了数据的不同流动方向,但它们之间也存在联系。在ChannelPipeline中,InboundOutbound数据可以相互影响和交互。例如,一个ChannelHandler可以在处理Inbound数据时生成Outbound数据作为响应,或者在处理Outbound数据时修改Inbound数据的内容。

总结起来,InboundOutbound是描述数据在ChannelPipeline中流动方向的概念。Inbound数据是从网络进入应用程序,Outbound数据是从应用程序发送到网络。它们在ChannelPipeline中相互影响和交互,共同实现网络数据的处理和传输。


Pipeline的责任链是通过ChannelHandlerContext对象串联的,ChannelHandlerContext对象里封装了ChannelHandler对象,通过prev和next节点实现双向链表。Pipeline的首尾节点分别是headtail,当selector轮询到socketread事件时,将会触发Pipeline责任链,从head开始调起第一个InboundHandlerChannelRead事件,接着通过fire方法依次触发Pipeline上的下一个ChannelHandler .

在这里插入图片描述

ChannelHandler分为InbounHandlerOutboundHandler

  • InboundHandler用来处理接收消息
  • OutboundHandler用来处理发送消息。

headChannelHandler既是InboundHandler又是OutboundHandler,无论是read还是write都会经过head,所以head封装了unsafe方法,用来操作socketreadwritetailChannelHandler只是InboundHandlerreadPipleline处理将会最终到达tail


演示之前,我们先附一下代码

Server Code

package com.artisan.pipeline.inout;import com.artisan.pipeline.inout.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class ArtisanEchoServer {private int port;public ArtisanEchoServer(int port) {this.port = port;}private void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoOutboundHandler3());ch.pipeline().addLast(new EchoOutboundHandler2());ch.pipeline().addLast(new EchoOutboundHandler1());ch.pipeline().addLast(new EchoInboundHandler1());ch.pipeline().addLast(new EchoInboundHandler2());ch.pipeline().addLast(new EchoInboundHandler3());}}).option(ChannelOption.SO_BACKLOG, 10000).childOption(ChannelOption.SO_KEEPALIVE, true);System.out.println("EchoServer正在启动...");ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("EchoServer绑定端口:" + port);channelFuture.channel().closeFuture().sync();System.out.println("EchoServer已关闭.");} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) {int port = 1234;if (args != null && args.length > 0) {try {port = Integer.parseInt(args[0]);} catch (Exception e) {e.printStackTrace();}}ArtisanEchoServer server = new ArtisanEchoServer(port);server.run();}
}

6个handler演示如下

package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler1.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler1.channelRead 收到数据:" + data);ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler1 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler1.channelReadComplete]");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler1.exceptionCaught]" + cause.toString());}
}
package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler2.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler2.channelRead 接收到数据:" + data);//ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler2 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler2.channelReadComplete]读取数据完成");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler2.exceptionCaught]");}
}
package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler3.channelRead");String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler3.channelRead 接收到数据:" + data);//ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] " + data, CharsetUtil.UTF_8));ctx.fireChannelRead(msg);System.out.println("退出 EchoInboundHandler3 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler3.channelReadComplete]读取数据完成");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler3.exceptionCaught]");}}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler1.write");//ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler里测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.write(msg);System.out.println("退出 EchoOutboundHandler1.write");}}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler2.write");//ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8));ctx.write(msg);System.out.println("退出 EchoOutboundHandler2.write");}
}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler3.write");ctx.write(msg);System.out.println("退出 EchoOutboundHandler3.write");}}

Client Code

package com.artisan.netty4.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;/*** @author 小工匠* @version 1.0* @description: 客户端启动程序* @mark: show me the code , change the world*/
public class ArtisanClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();try {//创建bootstrap对象,配置参数Bootstrap bootstrap = new Bootstrap();//设置线程组bootstrap.group(eventExecutors)//设置客户端的通道实现类型.channel(NioSocketChannel.class)//使用匿名内部类初始化通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加客户端通道的处理器ch.pipeline().addLast(new ArtisanClientHandler());}});System.out.println("客户端准备就绪");//连接服务端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();//对通道关闭进行监听channelFuture.channel().closeFuture().sync();} finally {//关闭线程组eventExecutors.shutdownGracefully();}}
}
package com.artisan.netty4.client;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @description: 通用handler,处理I/O事件* @mark: show me the code , change the world*/
@ChannelHandler.Sharable
public class ArtisanClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//发送消息到服务端ctx.writeAndFlush(Unpooled.copiedBuffer("msg send from client 2 server ", CharsetUtil.UTF_8));System.out.println("客户端发消息给服务端结束");System.out.println();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收服务端发送过来的消息ByteBuf byteBuf = (ByteBuf) msg;System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}}

InboundHandler和OutboundHandler的执行顺序

在InboundHandler中不触发fire方法

ArtisanEchoServer#run 中我们先进存在InboundHandler

在这里插入图片描述

在这里插入图片描述

先启动server, 在启动Client,我们测试一下

在这里插入图片描述
在这里插入图片描述

我们可以看到: InboundHandler2没有调用fire事件,InboundHandler3没有被执行

InboundHandler是通过fire事件决定是否要执行下一个InboundHandler,如果InboundHandler没有调用fire事件,那么后续的Pipeline中的Handler将不会执行。

我们来看下源码

在这里插入图片描述


InboundHandler和OutboundHandler的执行顺序

在这里插入图片描述
加入Pipeline的ChannelHandler的顺序如上。

别忘了放开EchoInboundHandler2

 ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));

我们来验证下

在这里插入图片描述

执行顺序如上。

InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3

1、InboundHandler是按照Pipleline的加载顺序,顺序执行。

2、OutboundHandler是按照Pipeline的加载顺序,逆序执行。


如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗

在这里插入图片描述

其中EchoInboundHandler2 先不要给客户端发送数据,先屏蔽掉。

public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("进入 EchoInboundHandler2.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler2.channelRead 接收到数据:" + data);
//        ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler2 channelRead");}
.......
.......
.......

在这里插入图片描述

这篇关于Netty Review - 探索Pipeline的Inbound和Outbound的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/439575

相关文章

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出 在数字化时代,文本到语音(Text-to-Speech, TTS)技术已成为人机交互的关键桥梁,无论是为视障人士提供辅助阅读,还是为智能助手注入声音的灵魂,TTS 技术都扮演着至关重要的角色。从最初的拼接式方法到参数化技术,再到现今的深度学习解决方案,TTS 技术经历了一段长足的进步。这篇文章将带您穿越时

轻松录制每一刻:探索2024年免费高清录屏应用

你不会还在用一些社交工具来录屏吧?现在的市面上有不少免费录屏的软件了。别看如软件是免费的,它的功能比起社交工具的录屏功能来说全面的多。这次我就分享几款我用过的录屏工具。 1.福晰录屏大师 链接直达:https://www.foxitsoftware.cn/REC/  这个软件的操作方式非常简单,打开软件之后从界面设计就能看出来这个软件操作的便捷性。界面的设计简单明了基本一打眼你就会轻松驾驭啦

深入探索嵌入式 Linux

摘要:本文深入探究嵌入式 Linux。首先回顾其发展历程,从早期尝试到克服诸多困难逐渐成熟。接着阐述其体系结构,涵盖硬件、内核、文件系统和应用层。开发环境方面包括交叉编译工具链、调试工具和集成开发环境。在应用领域,广泛应用于消费电子、工业控制、汽车电子和智能家居等领域。关键技术有内核裁剪与优化、设备驱动程序开发、实时性增强和电源管理等。最后展望其未来发展趋势,如与物联网融合、人工智能应用、安全性与

【vue3|第28期】 Vue3 + Vue Router:探索路由重定向的使用与作用

日期:2024年9月8日 作者:Commas 签名:(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释:如果您觉在这里插入代码片得有所帮助,帮忙点个赞,也可以关注我,我们一起成长;如果有不对的地方,还望各位大佬不吝赐教,谢谢^ - ^ 1.01365 = 37.7834;0.99365 = 0.0255 1.02365 = 1377.4083;0.98365 = 0.0006 说

多云架构下大模型训练的存储稳定性探索

一、多云架构与大模型训练的融合 (一)多云架构的优势与挑战 多云架构为大模型训练带来了诸多优势。首先,资源灵活性显著提高,不同的云平台可以提供不同类型的计算资源和存储服务,满足大模型训练在不同阶段的需求。例如,某些云平台可能在 GPU 计算资源上具有优势,而另一些则在存储成本或性能上表现出色,企业可以根据实际情况进行选择和组合。其次,扩展性得以增强,当大模型的规模不断扩大时,单一云平

Jenkins--pipeline版本管理

为了提高脚本可维护性,更好的管理pipeline脚本,我们可以在项目配置中修改流水线定义,使用版本管理脚本,选择pipeline script from SCM: 我们看到现在SCM是无,因为还没有安装版本管理工具,先需要到插件管理中安装git。 安装后,在流水线设置的SCM中就能查看到Git: 在Repository URL中添加版本管理工具github或码云的仓库地址: 在Cred

Jenkins--pipeline认识及与RF文件的结合应用

什么是pipeline? Pipeline,就是可运行在Jenkins上的工作流框架,将原本独立运行的单个或多个节点任务连接起来,实现单个任务难以完成的复杂流程编排与可视化。 为什么要使用pipeline? 1.流程可视化显示 2.可自定义流程任务 3.所有步骤代码化实现 如何使用pipeline 首先需要安装pipeline插件: 流水线有声明式和脚本式的流水线语法 流水线结构介绍 Node:

探索Invoke:Python自动化任务的瑞士军刀

文章目录 探索Invoke:Python自动化任务的瑞士军刀背景:为何选择Invoke?`invoke`是什么?如何安装`invoke`?简单的`invoke`库函数使用方法场景应用:`invoke`在实际项目中的使用场景一:自动化测试场景二:代码格式化场景三:部署应用 常见问题与解决方案问题一:命令执行失败问题二:权限不足问题三:并发执行问题 总结 探索Invoke:P

使用Azure Devops Pipeline将Docker应用部署到你的Raspberry Pi上

文章目录 1. 添加树莓派到 Agent Pool1.1 添加pool1.2 添加agent 2. 将树莓派添加到 Deployment Pool2.1 添加pool2.2 添加target 3. 添加编译流水线3.1 添加编译命令3.2 配置触发器 4. 添加发布流水线4.1 添加命令行4.2 配置artifact和触发器 5. 完成 1. 添加树莓派到 Agent Pool