HTTP2:netty http2 StreamChannel多流实现与Http2StreamFrame解码器的源码分析

本文主要是介绍HTTP2:netty http2 StreamChannel多流实现与Http2StreamFrame解码器的源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

netty http2 server侧的核心逻辑个人认为,主要在编解码处理器和Stream Transform Channel这块,分别处理Http2 消息帧的编解码,以及连接的多流处理机制。对应用的处理类分别:

ChannelHandlerDesc
io.netty.handler.codec.http2.Http2FrameCodec负责http2帧和消息的编解码
io.netty.handler.codec.http2.Http2MultiplexHandler负责流的连接通道复用,将Stream转为Http2MultiplexHandlerStreamChannel

对于netty http2 server的搭建代码,可以见前面的文章:HTTP2: netty http2 server demo

Http2FrameCodec 和Http2MultiplexHandler源码分析

Http2FrameCodec会对Header和Data帧进行解码,先看看调用栈:
在这里插入图片描述

获取StreamId

对帧的解码第一个核心逻辑是在io.netty.handler.codec.http2.DefaultHttp2FrameReader#readFrame(ChannelHandlerContext, ByteBuf, Http2FrameListener),此时ChannelHandlerContext对应的Channel是NioSocketChannel,还没有到StreamChannel。看看该方法的代码逻辑:

@Overridepublic void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener)throws Http2Exception {if (readError) {input.skipBytes(input.readableBytes());return;}try {do {if (readingHeaders) {processHeaderState(input);if (readingHeaders) {// Wait until the entire header has arrived.return;}}// The header is complete, fall into the next case to process the payload.// This is to ensure the proper handling of zero-length payloads. In this// case, we don't want to loop around because there may be no more data// available, causing us to exit the loop. Instead, we just want to perform// the first pass at payload processing now.processPayloadState(ctx, input, listener);if (!readingHeaders) {// Wait until the entire payload has arrived.return;}} while (input.isReadable());} catch (Http2Exception e) {readError = !Http2Exception.isStreamError(e);throw e;} catch (RuntimeException e) {readError = true;throw e;} catch (Throwable cause) {readError = true;PlatformDependent.throwException(cause);}}

该方法会对Socket中的字节进行解码,首先是处理帧头的数据,由processHeaderState(ByteBuf)处理,该方法会读取payloadLength、frameType、flags以及streamId信息,并进行正确性校验,如果校验失败,会抛出相应的Http2Exception异常。代码如下:

private void processHeaderState(ByteBuf in) throws Http2Exception {if (in.readableBytes() < FRAME_HEADER_LENGTH) {// Wait until the entire frame header has been read.return;}// Read the header and prepare the unmarshaller to read the frame.payloadLength = in.readUnsignedMedium();if (payloadLength > maxFrameSize) {throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,maxFrameSize);}frameType = in.readByte();flags = new Http2Flags(in.readUnsignedByte());streamId = readUnsignedInt(in);// We have consumed the data, next time we read we will be expecting to read the frame payload.readingHeaders = false;switch (frameType) {case DATA:verifyDataFrame();break;case HEADERS:verifyHeadersFrame();break;case PRIORITY:verifyPriorityFrame();break;case RST_STREAM:verifyRstStreamFrame();break;case SETTINGS:verifySettingsFrame();break;case PUSH_PROMISE:verifyPushPromiseFrame();break;case PING:verifyPingFrame();break;case GO_AWAY:verifyGoAwayFrame();break;case WINDOW_UPDATE:verifyWindowUpdateFrame();break;case CONTINUATION:verifyContinuationFrame();break;default:// Unknown frame type, could be an extension.verifyUnknownFrame();break;}}

当帧头信息处理完后,就处理fram payload了,这个逻辑由processPayloadState(ChannelHandlerContext, ByteBuf, Http2FrameListener)方法完成。它会根据frameType进行执行相应解码方法,当把帧的数据处理好后,再调用Http2FrameListener对应的onxxxx()方法,执行下一步的逻辑。processPayloadState(ChannelHandlerContext, ByteBuf, Http2FrameListener)方法的代码逻辑如下:

private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)throws Http2Exception {if (in.readableBytes() < payloadLength) {// Wait until the entire payload has been read.return;}// Only process up to payloadLength bytes.int payloadEndIndex = in.readerIndex() + payloadLength;// We have consumed the data, next time we read we will be expecting to read a frame header.readingHeaders = true;// Read the payload and fire the frame event to the listener.switch (frameType) {case DATA://处理data帧数据readDataFrame(ctx, in, payloadEndIndex, listener);break;case HEADERS://处理Header首帧数据readHeadersFrame(ctx, in, payloadEndIndex, listener);break;...case CONTINUATION://处理Header或PushPromise的连续帧数据readContinuationFrame(in, payloadEndIndex, listener);break;default:readUnknownFrame(ctx, in, payloadEndIndex, listener);break;}in.readerIndex(payloadEndIndex);}

处理Header帧数据

Header帧体的处理分两种情况,分别是Header首帧和首帧后面跟着的CONTINUATION Header帧, 两种处理方法在processHeaderState()方法中的校验逻辑也不同,如下:
对于Header首帧的校验:

	// Header首帧校验private void verifyHeadersFrame() throws Http2Exception {verifyAssociatedWithAStream();verifyNotProcessingHeaders();verifyPayloadLength(payloadLength);int requiredLength = flags.getPaddingPresenceFieldLength() + flags.getNumPriorityBytes();if (payloadLength < requiredLength) {throw streamError(streamId, FRAME_SIZE_ERROR,"Frame length too small." + payloadLength);}}//Continuation 帧的校验private void verifyContinuationFrame() throws Http2Exception {verifyAssociatedWithAStream();verifyPayloadLength(payloadLength);if (headersContinuation == null) {throw connectionError(PROTOCOL_ERROR, "Received %s frame but not currently processing headers.",frameType);}if (streamId != headersContinuation.getStreamId()) {throw connectionError(PROTOCOL_ERROR, "Continuation stream ID does not match pending headers. "+ "Expected %d, but received %d.", headersContinuation.getStreamId(), streamId);}if (payloadLength < flags.getPaddingPresenceFieldLength()) {throw streamError(streamId, FRAME_SIZE_ERROR,"Frame length %d too small for padding.", payloadLength);}}private void verifyNotProcessingHeaders() throws Http2Exception {if (headersContinuation != null) {throw connectionError(PROTOCOL_ERROR, "Received frame of type %s while processing headers on stream %d.",frameType, headersContinuation.getStreamId());}}private void verifyPayloadLength(int payloadLength) throws Http2Exception {if (payloadLength > maxFrameSize) {throw connectionError(PROTOCOL_ERROR, "Total payload length %d exceeds max frame length.", payloadLength);}}private void verifyAssociatedWithAStream() throws Http2Exception {if (streamId == 0) {throw connectionError(PROTOCOL_ERROR, "Frame of type %s must be associated with a stream.", frameType);}}

Header首帧处理

Header首帧校验项:

ItemMethodException
streamId是否正常(!=0)verifyAssociatedWithAStream异常:streamId==0
headersContinuation 是否为空verifyNotProcessingHeadersheadersContinuation!=null,headersContinuation是处理continuation header帧的一种手段,出现headersContinuation!=null时,说明流的帧数据出现错乱
payloadLength<=maxFrameSizeverifyPayloadLength帧的大小超出了上限
payloadLength >= (flags.getPaddingPresenceFieldLength() + flags.getNumPriorityBytes())帧体长度太小

Header帧的处理逻辑在readHeadersFrame(ChannelHandlerContext, ByteBuf, int, Http2FrameListener)方法中, 该方法有个priority information的处理分支,这个暂时不看。代码逻辑如下:

	private void readHeadersFrame(final ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,Http2FrameListener listener) throws Http2Exception {final int headersStreamId = streamId;final Http2Flags headersFlags = flags;final int padding = readPadding(payload);verifyPadding(padding);// The callback that is invoked is different depending on whether priority information// is present in the headers frame.if (flags.priorityPresent()) {...return;}// The priority fields are not present in the frame. Prepare a continuation that invokes// the listener callback without priority information.headersContinuation = new HeadersContinuation() {@Overridepublic int getStreamId() {return headersStreamId;}@Overridepublic void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,Http2FrameListener listener) throws Http2Exception {final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);if (endOfHeaders) {listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), padding,headersFlags.endOfStream());}}};// Process the initial fragment, invoking the listener's callback if end of headers.int len = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);headersContinuation.processFragment(flags.endOfHeaders(), payload, len, listener);resetHeadersContinuationIfEnd(flags.endOfHeaders());}

该方法会创建一个HeadersContinuation对象,主要是用于对Continuation Header帧数据的累加。
帧数据的读取的代码逻辑是通过headersContinuation.processFragment()方法去读取,并继续解析向下调用,处理完后关闭并销毁headersContinuation对象。代码逻辑:

	int len = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);headersContinuation.processFragment(flags.endOfHeaders(), payload, len, listener);resetHeadersContinuationIfEnd(flags.endOfHeaders());

Header 处理完成后关闭并销毁headersContinuation对象:

	private void resetHeadersContinuationIfEnd(boolean endOfHeaders) {if (endOfHeaders) {closeHeadersContinuation();}}private void closeHeadersContinuation() {if (headersContinuation != null) {headersContinuation.close();headersContinuation = null;}}

HeadersContinuation.processFragment()方法会调用HeadersBlockBuilder.addFragment()方法将Header帧的数据读入另一个ByteBuf对象headerBlock中,在Continuation Header帧数据读取时,也是同样的逻辑会读入到headerBlock中,这样Header完整的字节数据都会在headerBlock中。
代码逻辑:

			public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,Http2FrameListener listener) throws Http2Exception {final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);if (endOfHeaders) {listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), padding,headersFlags.endOfStream());}}

HeadersBlockBuilder的代码逻辑:

        final HeadersBlockBuilder headersBlockBuilder() {return builder;}final void addFragment(ByteBuf fragment, int len, ByteBufAllocator alloc,boolean endOfHeaders) throws Http2Exception {if (headerBlock == null) {if (len > headersDecoder.configuration().maxHeaderListSizeGoAway()) {headerSizeExceeded();}if (endOfHeaders) {// Optimization - don't bother copying, just use the buffer as-is. Need// to retain since we release when the header block is built.headerBlock = fragment.readRetainedSlice(len);} else {headerBlock = alloc.buffer(len).writeBytes(fragment, len);}return;}if (headersDecoder.configuration().maxHeaderListSizeGoAway() - len <headerBlock.readableBytes()) {headerSizeExceeded();}if (headerBlock.isWritable(len)) {// The buffer can hold the requested bytes, just write it directly.headerBlock.writeBytes(fragment, len);} else {// Allocate a new buffer that is big enough to hold the entire header block so far.ByteBuf buf = alloc.buffer(headerBlock.readableBytes() + len);buf.writeBytes(headerBlock).writeBytes(fragment, len);headerBlock.release();headerBlock = buf;}}

HeadersContinuation.processFragment()方法中,在HeadersBlockBuilder.addFragment()方法将Header帧的数据读取并加入到headerBlock中后,会判断Header帧是否结束,如果结束了,则会执行hdrBlockBuilder.headers()将headerBlock中的字节解码在io.netty.handler.codec.http2.Http2Headers对象 ,再调用Http2FrameListener.onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream)方法,继续执行后面的Header解析、创建StreamChannel、处理Header消息等逻辑。

Continuation Header帧处理

Continuation Header帧是Header帧的数据补充,另外Continuation帧还可以对PushPromise帧进行数据补充。Continuation Header帧的处理方式跟Header帧没多大差别。

Continuation Header帧校验项:

ItemMethodException
streamId是否正常(!=0)verifyAssociatedWithAStream异常:streamId==0
payloadLength<=maxFrameSizeverifyPayloadLength帧的大小超出了上限
headersContinuation 是否不为空headersContinuation==null,headersContinuation是处理continuation header帧的一种手段,出现headersContinuation=null时,说明流的帧数据出现错乱
streamId != headersContinuation.getStreamId()如果当前帧的streamId与headersContinuation的streamId不同, 流的帧传输或读取出现了乱序
payloadLength >= flags.getPaddingPresenceFieldLength()帧体长度太小

代码逻辑如下:

	private void verifyContinuationFrame() throws Http2Exception {verifyAssociatedWithAStream();verifyPayloadLength(payloadLength);if (headersContinuation == null) {throw connectionError(PROTOCOL_ERROR, "Received %s frame but not currently processing headers.",frameType);}if (streamId != headersContinuation.getStreamId()) {throw connectionError(PROTOCOL_ERROR, "Continuation stream ID does not match pending headers. "+ "Expected %d, but received %d.", headersContinuation.getStreamId(), streamId);}if (payloadLength < flags.getPaddingPresenceFieldLength()) {throw streamError(streamId, FRAME_SIZE_ERROR,"Frame length %d too small for padding.", payloadLength);}}

Continuation Header帧的数据读入更是同Header帧的数据读入没什么区别,都是调用HeadersContinuation.processFragment()方法,通过HeadersBlockBuilder.addFragment()方法将Continuation Header帧的数据读取并加入到headerBlock中,然后判断Header帧是否结束,如果结束了,则会执行hdrBlockBuilder.headers()将headerBlock中的字节解码在io.netty.handler.codec.http2.Http2Headers对象 ,再调用Http2FrameListener.onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream)方法,继续执行后面的Header解析、创建StreamChannel、处理Header消息等逻辑。
代码逻辑如下:

	private void readContinuationFrame(ByteBuf payload, int payloadEndIndex, Http2FrameListener listener)throws Http2Exception {// Process the initial fragment, invoking the listener's callback if end of headers.headersContinuation.processFragment(flags.endOfHeaders(), payload,payloadEndIndex - payload.readerIndex(), listener);resetHeadersContinuationIfEnd(flags.endOfHeaders());}
疑问

这里其实有个疑问,为什么headersContinuation是做为实例对象放在DefaultHttp2FrameReader中,Http2的多流之间的帧数据是可以中间穿插串起来的,如果相连的两个Header帧和Continuation Header帧分别是不同流的,那不是就出问题了么?

后来我明白过来了,虽然多流之间的帧数据是可以中间穿插串起来,但是仍是使用同一个NioChannel,在写数据时保证Header帧和Continuation Header帧是紧紧相连一起写出去的,那么出现问题的可能性就几乎没有了。

Header解码

Header解码的动作入口是在headersContinuation.processFragment() -> hdrBlockBuilder.headers() -> DefaultHttp2HeadersDecoder.decodeHeaders() -> HpackDecoder.decode()。所以最终的解码工作是由io.netty.handler.codec.http2.HpackDecoder.decode()方法完成的。
调用栈如下:
在这里插入图片描述

HpackDecoder维护了一组状态常量,代表的是当前对Header的读取状态,不同的状态做的事情是不一样的,HpackDecoder通过一个While循环来读取Header,因为一个Frame里面包含若干个Header,根据读取到的数据判断,State会不断变化流转。

State说明
READ_HEADER_REPRESENTATION读取header的初试状态
READ_INDEXED_HEADER读取被索引的完整header,即name和value均被索引
READ_INDEXED_HEADER_NAME读取被索引的header name
READ_LITERAL_HEADER_NAME_LENGTH_PREFIXname未被索引,读取name长度前缀,判断是否使用哈夫曼编码
READ_LITERAL_HEADER_NAME_LENGTHname未被索引,读取name长度
READ_LITERAL_HEADER_NAMEname未被索引,读取header name
READ_LITERAL_HEADER_VALUE_LENGTH_PREFIX读取value长度前缀,判断是否使用哈夫曼编码
READ_LITERAL_HEADER_VALUE_LENGTH读取value长度
READ_LITERAL_HEADER_VALUEvalue未被索引,读取value

更多详细的解读见Netty对HPACK头部压缩的支持

与Http2MultiplexHandler 的集成处理

当HttpFrameCodec将帧解码之后,会通过Http2FrameListener进行封装处理,然后再调用netty的相应逻辑,与Http2MultiplexHandler的userEventTriggered()方法和channelRead()打通连起来。
Http2MultiplexHandler的两个方法主要逻辑:

MethodDesc
userEventTriggered()将Http2FrameCodec.DefaultHttp2FrameStream对象转换成Http2MultiplexHandlerStreamChannel对象,将触发Http2MultiplexHandlerStreamChannel对应pipeline的fireChannelRegistered()方法
channelRead()将Http2StreamFrame对象(DefaultHttp2HeadersFrame/DefaultHttp2DataFrame)传为参数,执行AbstractHttp2StreamChannel.pipeline的fireChannelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换
创建Http2Stream并触发ChannelHandlerContext.fireUserEventTriggered(Http2FrameStreamEvent)

HeadersContinuation.processFragment()方法调用的Http2FrameListener.onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream)方法逻辑中,Http2FrameListener是DefaultHttp2ConnectionDecoder$FrameReadListener对象,在该对象的onHeadersRead()方法中,会在Http2Connection先通过streamId查找Http2Stream对象,如果没有,则执行Http2Connection.DefaultEndpoint.createStream()新建一个Http2Stream对象,并将其激活。
调用栈如下:
在这里插入图片描述

DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead()方法代码逻辑如下:

public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {Http2Stream stream = connection.stream(streamId);boolean allowHalfClosedRemote = false;if (stream == null && !connection.streamMayHaveExisted(streamId)) {// 创建Http2Streamstream = connection.remote().createStream(streamId, endOfStream);// Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;}if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "HEADERS")) {return;}boolean isInformational = !connection.isServer() &&HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;if ((isInformational || !endOfStream) && stream.isHeadersReceived() || stream.isTrailersReceived()) {throw streamError(streamId, PROTOCOL_ERROR,"Stream %d received too many headers EOS: %s state: %s",streamId, endOfStream, stream.state());}switch (stream.state()) {case RESERVED_REMOTE:stream.open(endOfStream);break;case OPEN:case HALF_CLOSED_LOCAL:// Allowed to receive headers in these states.break;case HALF_CLOSED_REMOTE:if (!allowHalfClosedRemote) {throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",stream.id(), stream.state());}break;case CLOSED:throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",stream.id(), stream.state());default:// Connection error.throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),stream.state());}stream.headersReceived(isInformational);encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);//触发...listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream);// If the headers completes this stream, close it.if (endOfStream) {lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());}}

Http2Connection.DefaultEndpoint.createStream() 方法代码逻辑如下:

	public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {State state = activeState(streamId, IDLE, isLocal(), halfClosed);checkNewStreamAllowed(streamId, state);// Create and initialize the stream.DefaultStream stream = new DefaultStream(streamId, state);incrementExpectedStreamId(streamId);addStream(stream);stream.activate();return stream;}

创建好Http2Stream后,同时调用Http2Stream.activate()方法进行激活。

关键调用栈

触发ChannelHandlerContext.fireUserEventTriggered(Http2FrameStreamEvent)方法的调用链:

MethodDesc
-DefaultHttp2FrameReader$HeadersContinuation.processFragment()从IO中读取帧数据到添加到DefaultHttp2FrameReader$HeadersBlockBuilder.headerBlock中,当Header数据结束时,调用 Http2FrameListener.onHeadersRead()方法
-DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead()根据streamId从Http2Connection查找Http2Stream对象,如果没有,则执行Http2Connection.DefaultEndpoint.createStream()新建一个Http2Stream对象,并将其激活 (详情见上面的创建Http2Stream并触发ChannelHandlerContext.fireUserEventTriggered(Http2FrameStreamEvent))。再然后调用Http2EmptyDataFrameListener.onHeadersRead()方法向下执行
1Http2Connection.DefaultEndpoint.createStream()创建DefaultHttp2Connection.DefaultStream(Http2Stream)对象
2DefaultHttp2Connection.DefaultStream.activate()流的激活方法
3DefaultHttp2Connection.ActiveStreams.activate()激活流
4DefaultHttp2Connection.ActiveStreams.addToActiveStreams()添加到集合DefaultHttp2Connection.ActiveStreams.streams中,并调用Http2FrameCodec$ConnectionListener.onStreamActive()方法
5Http2FrameCodec.onStreamActive0()将DefaultHttp2Connection.DefaultStream对象包装成Http2FrameCodec.DefaultHttp2FrameStream对象
6Http2FrameCodec.onHttp2StreamStateChanged()调用ChannelHandlerContext.fireUserEventTriggered()方法,触发Http2FrameStreamEvent
7ChannelHandlerContext.fireUserEventTriggered()
8ChannelHandlerContext.invokeUserEventTriggered()触发下一个AbstractChannelHandlerContext.invokeUserEventTriggered()并使用下一个ChannelHandler绑定的EventExecutor执行。 相当于触发ChannelInboundHandler链的userEventTriggered()
9ChannelInboundHandler.userEventTriggered()
10Http2MultiplexHandler.userEventTriggered()将Http2FrameCodec.DefaultHttp2FrameStream对象转换成Http2MultiplexHandlerStreamChannel对象,并将新建的Http2MultiplexHandlerStreamChannel对象注册到EventLoop中
11SingleThreadEventLoop.register()
12AbstractHttp2StreamChannel.Http2ChannelUnsafe.register()触发AbstractHttp2StreamChannel.pipeline 的fireChannelRegistered()方法,但并没有使用上面传递过来的EventLoop
13DefaultChannelPipeline.fireChannelRegistered()触发AbstractHttp2StreamChannel.pipeline 的fireChannelRegistered()方法,并执行第一个DefaultChannelPipeline.AbstractChannelHandlerContext的invokeChannelRegistered()方法
14ChannelInboundHandler.channelRegistered()执行ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法
关键代码逻辑

Http2MultiplexHandler.userEventTriggered()方法会将Stream转换为Http2MultiplexHandlerStreamChannel,实现一个连接的多流Channel隔离,每个流可以对数据字节(ByteBuf)单独处理。
代码逻辑如下:

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof Http2FrameStreamEvent) {Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();if (event.type() == Http2FrameStreamEvent.Type.State) {switch (stream.state()) {case HALF_CLOSED_LOCAL:if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {// Ignore everything which was not caused by an upgradebreak;}// fall-throughcase HALF_CLOSED_REMOTE:// fall-throughcase OPEN:if (stream.attachment != null) {// ignore if child channel was already created.break;}final AbstractHttp2StreamChannel ch;// We need to handle upgrades special when on the client side.if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {// We must have an upgrade handler or else we can't handle the streamif (upgradeStreamHandler == null) {throw connectionError(INTERNAL_ERROR,"Client is misconfigured for upgrade requests");}// 客户端升级流场景,将Stream转换创建Http2MultiplexHandlerStreamChannelch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);ch.closeOutbound();} else {//将Stream转换创建Http2MultiplexHandlerStreamChannelch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);}// 触发Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRegistered()方法ChannelFuture future = ctx.channel().eventLoop().register(ch);if (future.isDone()) {registerDone(future);} else {future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);}break;case CLOSED:AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;if (channel != null) {channel.streamClosed();}break;default:// ignore for nowbreak;}}return;}ctx.fireUserEventTriggered(evt);}
接收并处理Http2HeadersFrame消息

最终Http2Headers对象会转换为DefaultHttp2HeadersFrame对象,经由Http2MultiplexHandler.channelRead()的处理,调用AbstractHttp2StreamChannel.pipeline的fireChannelRead()方法,DefaultHttp2HeadersFrame对象对象做为参数,执行 AbstractHttp2StreamChannel.pipeline中ChannelInboundHandler的channelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换。
调用栈如下:
在这里插入图片描述

关键调用栈如下
MethodDesc
1DefaultHttp2FrameReader$HeadersContinuation.processFragment()从IO中读取帧数据到添加到DefaultHttp2FrameReader$HeadersBlockBuilder.headerBlock中,当Header数据结束时,调用 Http2FrameListener.onHeadersRead()方法
2DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead()根据streamId从Http2Connection查找Http2Stream对象,如果没有,则执行Http2Connection.DefaultEndpoint.createStream()新建一个Http2Stream对象,并将其激活 (详情见上面的创建Http2Stream并触发ChannelHandlerContext.fireUserEventTriggered(Http2FrameStreamEvent))。再然后调用Http2EmptyDataFrameListener.onHeadersRead()方法向下执行
3Http2EmptyDataFrameListener.onHeadersRead()调用 Http2FrameCodec$FrameListener.onHeadersRead()方法
4Http2FrameCodec$FrameListener.onHeadersRead()将Http2Headers对象转换为DefaultHttp2HeadersFrame对象,并调用onHttp2Frame()方法继续向下执行
5Http2FrameCodec.onHttp2Frame()将DefaultHttp2HeadersFrame对象做为参数触发DefaultChannelHandlerContext.fireChannelRead()方法
6AbstractChannelHandlerContext.fireChannelRead()触发DefaultChannelHandlerContext.invokeChannelRead()方法
7Http2MultiplexHandler.channelRead()将Http2StreamFrame对象(DefaultHttp2HeadersFrame/DefaultHttp2DataFrame)传为参数,执行Http2MultiplexHandler.Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换
8AbstractHttp2StreamChannel.fireChildRead()
9AbstractHttp2StreamChannel.Http2ChannelUnsafe.doRead0()触发AbstractHttp2StreamChannel.pipeline 的fireChannelRegistered()方法
10DefaultChannelPipeline.fireChannelRead()触发AbstractHttp2StreamChannel.pipeline 的fireChannelRead()方法,并执行第一个DefaultChannelPipeline.AbstractChannelHandlerContext的invokeChannelRead()方法
AbstractChannelHandlerContext.invokeChannelRead()netty 逻辑
AbstractChannelHandlerContext.fireChannelRead()netty 逻辑
AbstractChannelHandlerContext.invokeChannelRead()netty 逻辑
ChannelInboundHandlerAdapter.channelRead()netty 逻辑
关键代码逻辑

DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead()相关代码逻辑:

public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {Http2Stream stream = connection.stream(streamId);boolean allowHalfClosedRemote = false;if (stream == null && !connection.streamMayHaveExisted(streamId)) {//创建Http2Stream,并触发Http2MultiplexHandlerStreamChannel.pipeline的fireUserEventTriggered()和fireChannelRegistered()stream = connection.remote().createStream(streamId, endOfStream);// Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;}...stream.headersReceived(isInformational);encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);//Http2Headers对象转换为DefaultHttp2HeadersFrame对象,并将DefaultHttp2HeadersFrame对象做为参数触发DefaultChannelHandlerContext.fireChannelRead()方法,然后调用Http2MultiplexHandler.channelRead()方法listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream);// If the headers completes this stream, close it.if (endOfStream) {lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());}}

Http2FrameCodec$FrameListener.onHeadersRead() 方法逻辑:

	public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,int padding, boolean endOfStream) {// 将Http2Headers对象转换为DefaultHttp2HeadersFrame对象,并调用onHttp2Frame()方法继续向下执行onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding).stream(requireStream(streamId)));}

Http2FrameCodec.onHttp2Frame()方法逻辑:

	void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {ctx.fireChannelRead(frame);}

Http2MultiplexHandler.channelRead()方法逻辑:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {parentReadInProgress = true;if (msg instanceof Http2StreamFrame) {if (msg instanceof Http2WindowUpdateFrame) {// We dont want to propagate update frames to the userreturn;}Http2StreamFrame streamFrame = (Http2StreamFrame) msg;DefaultHttp2FrameStream s =(DefaultHttp2FrameStream) streamFrame.stream();AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;if (msg instanceof Http2ResetFrame) {// Reset frames needs to be propagated via user events as these are not flow-controlled and so// must not be controlled by suppressing channel.read() on the child channel.channel.pipeline().fireUserEventTriggered(msg);// RST frames will also trigger closing of the streams which then will call// AbstractHttp2StreamChannel.streamClosed()} else {// 将Http2StreamFrame对象(DefaultHttp2HeadersFrame/DefaultHttp2DataFrame)传为参数,执行Http2MultiplexHandler.Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRead()方法。//完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换channel.fireChildRead(streamFrame);}return;}if (msg instanceof Http2GoAwayFrame) {// goaway frames will also trigger closing of the streams which then will call// AbstractHttp2StreamChannel.streamClosed()onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);}// Send everything down the pipelinectx.fireChannelRead(msg);}

处理Data帧

HttpFrameCodec读取Data帧数据跟的逻辑跟读取Header帧数据的逻辑非常相似,不同的有几点:

  • Data帧没有Continuation帧需要处理
  • Data帧不会创建Http2Stream使用,而是使用之前处理Header帧时已经创建好的Http2Stream对象
  • Data帧的数据读取后,不需要解码,直接包装成往下传DefaultHttp2DataFrame对象往下传递

调用栈如下:
在这里插入图片描述

跟读取Header帧和Continuation帧一样,读取入口也是在DefaultHttp2FrameReader.readFrame()方法中。主要代码逻辑如下:

public void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener)throws Http2Exception {if (readError) {input.skipBytes(input.readableBytes());return;}try {do {if (readingHeaders) {//校验processHeaderState(input);if (readingHeaders) {// Wait until the entire header has arrived.return;}}// the first pass at payload processing now.//读取processPayloadState(ctx, input, listener);if (!readingHeaders) {// Wait until the entire payload has arrived.return;}} while (input.isReadable());} catch (Http2Exception e) {readError = !Http2Exception.isStreamError(e);throw e;} catch (RuntimeException e) {readError = true;throw e;} catch (Throwable cause) {readError = true;PlatformDependent.throwException(cause);}}private void processHeaderState(ByteBuf in) throws Http2Exception {if (in.readableBytes() < FRAME_HEADER_LENGTH) {// Wait until the entire frame header has been read.return;}// Read the header and prepare the unmarshaller to read the frame.payloadLength = in.readUnsignedMedium();if (payloadLength > maxFrameSize) {throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,maxFrameSize);}frameType = in.readByte();flags = new Http2Flags(in.readUnsignedByte());streamId = readUnsignedInt(in);// We have consumed the data, next time we read we will be expecting to read the frame payload.readingHeaders = false;switch (frameType) {case DATA:verifyDataFrame();break;...default:// Unknown frame type, could be an extension.verifyUnknownFrame();break;}}private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)throws Http2Exception {if (in.readableBytes() < payloadLength) {// Wait until the entire payload has been read.return;}// Only process up to payloadLength bytes.int payloadEndIndex = in.readerIndex() + payloadLength;readingHeaders = true;// Read the payload and fire the frame event to the listener.switch (frameType) {case DATA://读取Data帧数据readDataFrame(ctx, in, payloadEndIndex, listener);break;...}in.readerIndex(payloadEndIndex);}//Data帧校验方法private void verifyDataFrame() throws Http2Exception {verifyAssociatedWithAStream();verifyNotProcessingHeaders();verifyPayloadLength(payloadLength);if (payloadLength < flags.getPaddingPresenceFieldLength()) {throw streamError(streamId, FRAME_SIZE_ERROR,"Frame length %d too small.", payloadLength);}}//Data帧数据读取方法private void readDataFrame(ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,Http2FrameListener listener) throws Http2Exception {int padding = readPadding(payload);verifyPadding(padding);// Determine how much data there is to read by removing the trailing// padding.int dataLength = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);//将frame payload部分的数据切片之后返回新的一个ByteBuf对象直接向下传递ByteBuf data = payload.readSlice(dataLength);listener.onDataRead(ctx, streamId, data, padding, flags.endOfStream());}
Data帧校验项
ItemMethodException
streamId是否正常(!=0)verifyAssociatedWithAStream异常:streamId==0
headersContinuation 是否为空verifyNotProcessingHeadersheadersContinuation!=null,headersContinuation是处理continuation header帧的一种手段,出现headersContinuation!=null时,说明流的帧数据出现错乱
payloadLength<=maxFrameSizeverifyPayloadLength帧的大小超出了上限
payloadLength >= flags.getPaddingPresenceFieldLength()帧体长度太小
关键调用栈如下
MethodDesc
1DefaultHttp2FrameReader.readDataFrame()将frame payload部分的数据切片之后返回新的一个ByteBuf对象直接向下传递
2DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead()
3Http2FrameCodec$FrameListener.onDataRead()将ByteBuf类型的frame payload数据包装成DefaultHttp2DataFrame对象,向下传递执行
4Http2FrameCodec.onHttp2Frame()将DefaultHttp2DataFrame对象做为参数触发DefaultChannelHandlerContext.fireChannelRead()方法
5AbstractChannelHandlerContext.fireChannelRead()触发DefaultChannelHandlerContext.invokeChannelRead()方法
6Http2MultiplexHandler.channelRead()将Http2StreamFrame对象(DefaultHttp2HeadersFrame/DefaultHttp2DataFrame)传为参数,执行Http2MultiplexHandler.Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换
7AbstractHttp2StreamChannel.fireChildRead()
8AbstractHttp2StreamChannel.Http2ChannelUnsafe.doRead0()触发AbstractHttp2StreamChannel.pipeline 的fireChannelRegistered()方法
9DefaultChannelPipeline.fireChannelRead()触发AbstractHttp2StreamChannel.pipeline 的fireChannelRead()方法,并执行第一个DefaultChannelPipeline.AbstractChannelHandlerContext的invokeChannelRead()方法
AbstractChannelHandlerContext.invokeChannelRead()netty 逻辑
AbstractChannelHandlerContext.fireChannelRead()netty 逻辑
AbstractChannelHandlerContext.invokeChannelRead()netty 逻辑
ChannelInboundHandlerAdapter.channelRead()netty 逻辑
关键代码逻辑

DefaultHttp2FrameReader.readDataFrame():

	private void readDataFrame(ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,Http2FrameListener listener) throws Http2Exception {int padding = readPadding(payload);verifyPadding(padding);// Determine how much data there is to read by removing the trailing// padding.int dataLength = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);ByteBuf data = payload.readSlice(dataLength);listener.onDataRead(ctx, streamId, data, padding, flags.endOfStream());}

将frame payload部分的数据切片之后返回新的一个ByteBuf对象直接向下传递。

Http2FrameCodec$FrameListener.onDataRead() 方法的代码逻辑:

public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,boolean endOfStream) {onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding).stream(requireStream(streamId)).retain());// We return the bytes in consumeBytes() once the stream channel consumed the bytes.return 0;}

ByteBuf类型的frame payload数据包装成DefaultHttp2DataFrame对象,调用Http2FrameCodec.onHttp2Frame() 将DefaultHttp2DataFrame对象做为参数触发DefaultChannelHandlerContext.fireChannelRead()方法,这里的逻辑跟处理Header帧时一样都 是进入netty的逻辑了,会调用 io.netty.channel.ChannelHandlerContext.fireChannelRead(),直到运行到Http2MultiplexHandler.channelRead()方法中,根据Http2Stream拿到对应的Http2MultiplexHandlerStreamChannel对象,将DefaultHttp2DataFrame对象做为参数,执行Http2MultiplexHandler.Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换。

参考

Netty对HPACK头部压缩的支持

这篇关于HTTP2:netty http2 StreamChannel多流实现与Http2StreamFrame解码器的源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/694456

相关文章

C++使用栈实现括号匹配的代码详解

《C++使用栈实现括号匹配的代码详解》在编程中,括号匹配是一个常见问题,尤其是在处理数学表达式、编译器解析等任务时,栈是一种非常适合处理此类问题的数据结构,能够精确地管理括号的匹配问题,本文将通过C+... 目录引言问题描述代码讲解代码解析栈的状态表示测试总结引言在编程中,括号匹配是一个常见问题,尤其是在

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Python如何实现PDF隐私信息检测

《Python如何实现PDF隐私信息检测》随着越来越多的个人信息以电子形式存储和传输,确保这些信息的安全至关重要,本文将介绍如何使用Python检测PDF文件中的隐私信息,需要的可以参考下... 目录项目背景技术栈代码解析功能说明运行结php果在当今,数据隐私保护变得尤为重要。随着越来越多的个人信息以电子形

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景