在Channel定义,属于Channel的内部类,表明Unsafe和Channel密切相关

2024-04-10 13:08

本文主要是介绍在Channel定义,属于Channel的内部类,表明Unsafe和Channel密切相关,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

下面是unsafe接口的所有方法

复制代码
interface Unsafe {
RecvByteBufAllocator.Handle recvBufAllocHandle();

SocketAddress localAddress();
SocketAddress remoteAddress();

void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly();
void beginRead();
void write(Object msg, ChannelPromise promise);
void flush();

ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
复制代码
按功能可以分为分配内存,Socket四元组信息,注册事件循环,绑定网卡端口,Socket的连接和关闭,Socket的读写,看的出来,这些操作都是和jdk底层相关

Unsafe 继承结构

NioUnsafe 在 Unsafe基础上增加了以下几个接口

复制代码
public interface NioUnsafe extends Unsafe {
SelectableChannel ch();
void finishConnect();
void read();
void forceFlush();
}
复制代码
从增加的接口以及类名上来看,NioUnsafe 增加了可以访问底层jdk的SelectableChannel的功能,定义了从SelectableChannel读取数据的read方法

Unsafe的分类
从以上继承结构来看,我们可以总结出两种类型的Unsafe分类,一个是与连接的字节数据读写相关的NioByteUnsafe,一个是与新连接建立操作相关的NioMessageUnsafe

NioByteUnsafe中的读:委托到外部类NioSocketChannel

复制代码
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
复制代码
最后一行已经与jdk底层以及netty中的ByteBuf相关,将jdk的 SelectableChannel的字节数据读取到netty的ByteBuf中

NioMessageUnsafe中的读:委托到外部类NioSocketChannel

复制代码
protected int doReadMessages(List buf) throws Exception {
SocketChannel ch = javaChannel().accept();

if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;
}
return 0;

}
复制代码
NioMessageUnsafe 的读操作很简单,就是调用jdk的accept()方法,新建立一条连接

NioByteUnsafe中的写:委托到外部类NioSocketChannel

复制代码
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
复制代码
最后一行已经与jdk底层以及netty中的ByteBuf相关,将netty的ByteBuf中的字节数据写到jdk的 SelectableChannel中

回到顶部
pipeline中的head
NioEventLoop

复制代码
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//新连接的已准备接入或者已存在的连接有数据可读
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
复制代码
NioByteUnsafe

复制代码
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 创建ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;
do {// 分配一个ByteBufbyteBuf = allocHandle.allocate(allocator);// 将数据读取到分配的ByteBuf中去allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;break;}// 触发事件,将会引发pipeline的读事件传播pipeline.fireChannelRead(byteBuf);byteBuf = null;
} while (allocHandle.continueReading());
pipeline.fireChannelReadComplete();

}
复制代码
同样,我抽出了核心代码,细枝末节先剪去,NioByteUnsafe 要做的事情可以简单地分为以下几个步骤

拿到Channel的config之后拿到ByteBuf分配器,用分配器来分配一个ByteBuf,ByteBuf是netty里面的字节数据载体,后面读取的数据都读到这个对象里面
将Channel中的数据读取到ByteBuf
数据读完之后,调用 pipeline.fireChannelRead(byteBuf); 从head节点开始传播至整个pipeline
最后调用fireChannelReadComplete();
这里,我们的重点其实就是 pipeline.fireChannelRead(byteBuf);

DefaultChannelPipeline

复制代码
final AbstractChannelHandlerContext head;
//…
head = new HeadContext(this);

public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
复制代码
结合这幅图

可以看到,数据从head节点开始流入,在进行下一步之前,我们先把head节点的功能过一遍

HeadContext

复制代码
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {

private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();
}@Override
public ChannelHandler handler() {return this;
}@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {// NOOP
}@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// NOOP
}@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {unsafe.bind(localAddress, promise);
}@Override
public void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) throws Exception {unsafe.connect(remoteAddress, localAddress, promise);
}@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {unsafe.disconnect(promise);
}@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {unsafe.close(promise);
}@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {unsafe.deregister(promise);
}@Override
public void read(ChannelHandlerContext ctx) {unsafe.beginRead();
}@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write(msg, promise);
}@Override
public void flush(ChannelHandlerContext ctx) throws Exception {unsafe.flush();
}@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);
}@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {invokeHandlerAddedIfNeeded();ctx.fireChannelRegistered();
}@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();// Remove all handlers sequentially if channel is closed and unregistered.if (!channel.isOpen()) {destroy();}
}@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();readIfIsAutoRead();
}@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();
}@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);
}@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();readIfIsAutoRead();
}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}
}@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ctx.fireUserEventTriggered(evt);
}@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelWritabilityChanged();
}

}
复制代码
从head节点继承的两个接口看,TA既是一个ChannelHandlerContext,同时又属于inBound和outBound Handler

在传播读写事件的时候,head的功能只是简单地将事件传播下去,如ctx.fireChannelRead(msg);

在真正执行读写操作的时候,例如在调用writeAndFlush()等方法的时候,最终都会委托到unsafe执行,而当一次数据读完,channelReadComplete方法会被调用

回到顶部
pipeline中的inBound事件传播
我们接着上面的 AbstractChannelHandlerContext.invokeChannelRead(head, msg); 这个静态方法看,参数传入了 head,我们知道入站数据都是从 head 开始的,以保证后面所有的 handler 都由机会处理数据流。

我们看看这个静态方法内部是怎么样的:

复制代码
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, “msg”), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
复制代码
调用这个 Context (也就是 head) 的 invokeChannelRead 方法,并传入数据。我们再看看head中 invokeChannelRead 方法的实现,实际上是在headContext的父类AbstractChannelHandlerContext中:

AbstractChannelHandlerContext

复制代码
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

public ChannelHandler handler() {
return this;
}
复制代码
上面 handler()就是headContext中的handler,也就是headContext自身,也就是调用 head 的 channelRead 方法。那么这个方法是怎么实现的呢?

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
什么都没做,调用 Context 的 fire 系列方法,将请求转发给下一个节点。我们这里是 fireChannelRead 方法,注意,这里方法名字都挺像的。需要细心区分。下面我们看看 Context 的成员方法 fireChannelRead:

AbstractChannelHandlerContext

复制代码
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
复制代码
这个是 head 的抽象父类 AbstractChannelHandlerContext 的实现,该方法再次调用了静态 fire 系列方法,但和上次不同的是,不再放入 head 参数了,而是使用 findContextInbound 方法的返回值。从这个方法的名字可以看出,是找到入站类型的 handler。我们看看方法实现:

复制代码
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
复制代码
该方法很简单,找到当前 Context 的 next 节点(inbound 类型的)并返回。这样就能将请求传递给后面的 inbound handler 了。我们来看看 invokeChannelRead(findContextInbound(), msg);

复制代码
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, “msg”), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}

}
复制代码
上面我们找到了next节点(inbound类型的),然后直接调用 next.invokeChannelRead(m);如果这个next是我们自定义的handler,此时我们自定义的handler的父类是AbstractChannelHandlerContext,则又回到了AbstractChannelHandlerContext中实现的invokeChannelRead,代码如下:

AbstractChannelHandlerContext

复制代码
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

public ChannelHandler handler() {
return this;
}
复制代码
此时的handler()就是我们自定义的handler了,然后调用我们自定义handler中的 channelRead(this, msg);

请求进来时,pipeline 会从 head 节点开始输送,通过配合 invoker 接口的 fire 系列方法,实现 Context 链在 pipeline 中的完美传递。最终到达我们自定义的 handler。

注意:此时如果我们想继续向后传递该怎么办呢?我们前面说过,可以调用 Context 的 fire 系列方法,就像 head 的 channelRead 方法一样,调用 fire 系列方法,直接向后传递就 ok 了。

如果所有的handler都调用了fire系列方法,则会传递到最后一个inbound类型的handler,也就是——tail节点,那我们就来看看tail节点

回到顶部
pipeline中的tail
复制代码
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

TailContext(DefaultChannelPipeline pipeline) {super(pipeline, null, TAIL_NAME, true, false);setAddComplete();
}@Override
public ChannelHandler handler() {return this;
}@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// This may not be a configuration error and so don't log anything.// The event may be superfluous for the current pipeline configuration.ReferenceCountUtil.release(evt);
}@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {onUnhandledInboundException(cause);
}@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {onUnhandledInboundMessage(msg);
}@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }

}
复制代码
正如我们前面所提到的,tail节点的大部分作用即终止事件的传播(方法体为空)

channelRead

复制代码
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
“Please check your pipeline configuration.”, msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
复制代码
tail节点在发现字节数据(ByteBuf)或者decoder之后的业务对象在pipeline流转过程中没有被消费,落到tail节点,tail节点就会给你发出一个警告,告诉你,我已经将你未处理的数据给丢掉了

总结一下,tail节点的作用就是结束事件传播,并且对一些重要的事件做一些善意提醒

回到顶部
pipeline中的outBound事件传播
上一节中,我们在阐述tail节点的功能时,忽略了其父类AbstractChannelHandlerContext所具有的功能,这一节中,我们以最常见的writeAndFlush操作来看下pipeline中的outBound事件是如何向外传播的

典型的消息推送系统中,会有类似下面的一段代码

Channel channel = getChannel(userInfo);
channel.writeAndFlush(pushInfo);
这段代码的含义就是根据用户信息拿到对应的Channel,然后给用户推送消息,跟进 channel.writeAndFlush

NioSocketChannel

public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
从pipeline开始往外传播

public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
Channel 中大部分outBound事件都是从tail开始往外传播, writeAndFlush()方法是tail继承而来的方法,我们跟进去

AbstractChannelHandlerContext

复制代码
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);

return promise;

}
复制代码
AbstractChannelHandlerContext

复制代码
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
复制代码
先调用findContextOutbound()方法找到下一个outBound()节点

AbstractChannelHandlerContext

复制代码
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
复制代码
找outBound节点的过程和找inBound节点类似,反方向遍历pipeline中的双向链表,直到第一个outBound节点next,然后调用next.invokeWriteAndFlush(m, promise)

AbstractChannelHandlerContext

复制代码
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
复制代码
调用该节点的ChannelHandler的write方法,flush方法我们暂且忽略,后面会专门讲writeAndFlush的完整流程

AbstractChannelHandlerContext

复制代码
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
复制代码
可以看到,数据开始出站,从后向前开始流动,和入站的方向是反的。那么最后会走到哪里呢,当然是走到 head 节点,因为 head 节点就是 outbound 类型的 handler。

HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
调用了 底层的 unsafe 操作数据,这里,加深了我们对head节点的理解,即所有的数据写出都会经过head节点

当执行完这个 write 方法后,方法开始退栈。逐步退到 unsafe 的 read 方法,回到最初开始的地方,然后继续调用 pipeline.fireChannelReadComplete() 方法

回到顶部
总结
总结一下一个请求在 pipeline 中的流转过程:

调用 pipeline 的 fire 系列方法,这些方法是接口 invoker 设计的,pipeline 实现了 invoker 的所有方法,inbound 事件从 head 开始流入,outbound 事件从 tail 开始流出。
pipeline 会将请求交给 Context,然后 Context 通过抽象父类 AbstractChannelHandlerContext 的 invoke 系列方法(静态和非静态的)配合 AbstractChannelHandlerContext 的 fire 系列方法再配合 findContextInbound 和 findContextOutbound 方法完成各个 Context 的数据流转。
当入站过程中,调用 了出站的方法,那么请求就不会向后走了。后面的处理器将不会有任何作用。想继续相会传递就调用 Context 的 fire 系列方法,让 Netty 在内部帮你传递数据到下一个节点。如果你想在整个通道传递,就在 handler 中调用 channel 或者 pipeline 的对应方法,这两个方法会将数据从头到尾或者从尾到头的流转一遍。
深圳网站建设www.sz886.com

这篇关于在Channel定义,属于Channel的内部类,表明Unsafe和Channel密切相关的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/891152

相关文章

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

STM32内部闪存FLASH(内部ROM)、IAP

1 FLASH简介  1 利用程序存储器的剩余空间来保存掉电不丢失的用户数据 2 通过在程序中编程(IAP)实现程序的自我更新 (OTA) 3在线编程(ICP把整个程序都更新掉) 1 系统的Bootloader写死了,只能用串口下载到指定的位置,启动方式也不方便需要配置BOOT引脚触发启动  4 IAP(自己写的Bootloader,实现程序升级) 1 比如蓝牙转串口,

FreeRTOS内部机制学习03(事件组内部机制)

文章目录 事件组使用的场景事件组的核心以及Set事件API做的事情事件组的特殊之处事件组为什么不关闭中断xEventGroupSetBitsFromISR内部是怎么做的? 事件组使用的场景 学校组织秋游,组长在等待: 张三:我到了 李四:我到了 王五:我到了 组长说:好,大家都到齐了,出发! 秋游回来第二天就要提交一篇心得报告,组长在焦急等待:张三、李四、王五谁先写好就交谁的

java线程深度解析(一)——java new 接口?匿名内部类给你答案

http://blog.csdn.net/daybreak1209/article/details/51305477 一、内部类 1、内部类初识 一般,一个类里主要包含类的方法和属性,但在Java中还提出在类中继续定义类(内部类)的概念。 内部类的定义:类的内部定义类 先来看一个实例 [html]  view plain copy pu

Go Channel的实现

channel作为goroutine间通信和同步的重要途径,是Go runtime层实现CSP并发模型重要的成员。在不理解底层实现时,经常在使用中对channe相关语法的表现感到疑惑,尤其是select case的行为。因此在了解channel的应用前先看一眼channel的实现。 Channel内存布局 channel是go的内置类型,它可以被存储到变量中,可以作为函数的参数或返回值,它在r

浙大数据结构:树的定义与操作

四种遍历 #include<iostream>#include<queue>using namespace std;typedef struct treenode *BinTree;typedef BinTree position;typedef int ElementType;struct treenode{ElementType data;BinTree left;BinTre

类和对象的定义和调用演示(C++)

我习惯把类的定义放在头文件中 Student.h #define _CRT_SECURE_NO_WARNINGS#include <string>using namespace std;class student{public:char m_name[25];int m_age;int m_score;char* get_name(){return m_name;}int set_name

c++ 定义二位数组

在 C++ 中,定义二维数组有几种常见的方式。以下是几个示例: 1. 静态二维数组 定义: int array[3][4]; 这里,array 是一个 3 行 4 列的整数二维数组。 初始化: int array[3][4] = {{1, 2, 3, 4},{5, 6, 7, 8},{9, 10, 11, 12}}; 2. 动态二维数组 使用指针和动态内存分配: 定义:

java类中定义接口的有哪些好处

第一步:首先是是定义一个类,同时里面定义接口 public class Util { public interface Worker { void work(int a); } } 第二步:定义一个类去实现第一步类中定义的接口 public class Demo implements Worker { @Override public void work(int a) { System

vue3 为组件的 emits 标注类型,defineEmits基于类型的定义的简单理解

1)在 <script setup> 中,emit 函数的类型标注也可以通过运行时声明或是类型声明进行。 2)基于类型的: const emit = defineEmits<{ (e: 'change', id: number): void (e: 'update', value: string): void }>() 说明:e: 指定了方法名,id:数字型的参数,这个就是限定了方法名及