grpc-go源码剖析二十四之在同一个进程中如何使用通知的方式消费数据帧?

本文主要是介绍grpc-go源码剖析二十四之在同一个进程中如何使用通知的方式消费数据帧?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

已发表的技术专栏
0  grpc-go、protobuf、multus-cni 技术专栏 总入口

1  grpc-go 源码剖析与实战  文章目录

2  Protobuf介绍与实战 图文专栏  文章目录

3  multus-cni   文章目录(k8s多网络实现方案)

4  grpc、oauth2、openssl、双向认证、单向认证等专栏文章目录)

假设现在有生产数据一方,消费数据一方,存数据一方

场景一:生产数据一方跟消费数据一方在不同一个进程里,可以使用MQ来实现,如下图所示:

生产数据消费数据
  消费者通过监听的方式来获取数据

场景二:生产数据一方跟消费数据一方在同一个进程里,如何实现?

  比方说,消费者可以采用每隔一定时长的方式去查询存数据的内存是否有数据,有的话,就消费。

  在grpc框架中,会有同样的问题;

  当帧发送器从帧缓存器controlBufer里获取数据的时候,如果采用每隔一定时长的方式去查询的话?这种方案怎么样?

  那么问题来了,时长定多少时间合适,不同的物理服务器性能不一样,或者说,数据帧的量大小也不一样,间隔时长就不好界定。

  在grpc框架中,采用通知方式,结合通道技术来实现。

  当消费者发现数据存储器里没有数据了,就阻塞着;当生产者将新的数据存储到数据存储器时,会给消费者的通道发送消息,告诉它有新的数据了,不需要继续阻塞了;消费者就可以继续获取数据帧了。

1、帧存储器executeAndPut是如何存储的?

其实,在前面的文章中,我们已经分析了最核心的功能,就是帧是如何具体的在单链表里存储的;
接下来,我们要考虑的问题是,在将帧存储到单链表前,要不要做一些操作,如校验操作;或者说在什么情况下,不允许往单链表里存储数据呢?

好,接下来,具体看一下,帧存储器executeAndPut是如何解决的:
进入grpc-go/internal/transport/controlbuf.go文件中的executeAndPut方法里:

1func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
2var wakeUp bool
3.	c.mu.Lock()
4if c.err != nil {
5.		c.mu.Unlock()
6return false, c.err
7}8if f != nil {
9if !f(it) { // f wasn't successful
10.			c.mu.Unlock()
11return false, nil
12}
13}14if c.consumerWaiting {
15.		wakeUp = true
16.		c.consumerWaiting = false
17}
18.	c.list.enqueue(it)19if it.isTransportResponseFrame() {
20.		c.transportResponseFrames++
21if c.transportResponseFrames == maxQueuedTransportResponseFrames {
22// We are adding the frame that puts us over the threshold; create
23// a throttling channel.
24.			ch := make(chan struct{})
25.			c.trfChan.Store(&ch)
26}
27}28.	c.mu.Unlock()
29if wakeUp {
30select {
31case c.ch <- struct{}{}:
32default:
33}
34}35return true, nil
36}

主要代码说明:

  • 第2行:声明一个bool类型的变量wakeUp;该变量的目的是,根据wakeUp的值来判断是否将通道ch解除阻塞;若阻塞时不允许从单链表c.list继续读取帧数据;
  • 第4-7行:判断controlBuf是否存在异常,并提供了异常处理逻辑
  • 第8-13行:主要是在将帧存储到单链表c.list前,需要将帧扔进函数f里,做一些处理。
    • 函数f的参考例子,如grpc-go/internal/transport/http2_client.go文件中NewStream方法里,将创建好的头帧存储到controlBuf里场景下时,需要校验streamQuota值以及HeaderListSize大小
  • 第14-17行:若consumerWaiting 为true,就是表示存储帧的单链表c.list为空,而此时,帧加载器get正等待从单链表c.list里获取帧呢;
    • a)第15行:将wakeUp 置为true,表示允许给通道ch发送数据,解除阻塞;但是,具体还没有执行。
    • b)第16行:将c.consumerWaiting 重新置为false,达到的效果是,不必每次往单链表c.list里添加帧数据都给通道ch发送数据,也就是说,只有c.consumerWaiting 为true时,才允许给通道ch发送消息,解除阻塞。
      • c.consumerWaiting 为true时,是说,消费者需要等待,此时单链表c.list里没有数据,为空链表。
  • 第18行:调用单链表c.list的enqueue方法,具体将帧存储到链表c.list里;具体存储原理,前面章节已经介绍过了。
  • 第19-27行:服务器端给客户端发送某些类型的帧,客户端接收到这些帧后,需要给服务器端反馈一个ACK帧,或者RST帧;比方说客户端接收到的Ping帧后,需要给服务器端一个反馈ACK帧;变量transportResponseFrames就是用来统计给服务器端反馈了多少个ACK帧以及RST帧;当变量transportResponseFrames累加到一定的阈值maxQueuedTransportResponseFrames 后,就会创建一个通道ch,这个通道,就是用为阀门使用的;
    • 什么场景下使用?或者说什么地方调用了?
      在grpc-go/internal/transport/http2_client.go文件中reader方法里:“t.controlBuf.throttle()”语句里使用了;达到的效果就是在读取帧前,先判断一下阀门通道ch是阻塞状态,还是非阻塞状态;
      • 如果是非阻塞状态,就可以立马读取帧,
      • 如果是阻塞状态,就暂停读取帧,直到解除阀门通道的阻塞。
  • 第29-34行:当wakeUp为true时,就可以给通道ch发送数据,以解除通道ch的阻塞;从而使得帧加载器可以重新从单链表c.list获取帧。
这个方法的核心目的是:
  • 将各种类型的帧,存储到单链表c.list里;
  • 只不过,在存储前后,做了一定的事情;比方说,存储前的校验工作;对单链表c.list的存储容量判断,要不要暂停接收帧的读取工作;
  • 这其实,实现了一个流控的作用;当读取的帧的速度超过了帧加载器get获取的速度,就暂停读取帧,等待后台处理完成后,再允许重新读取帧。

2、帧加载器get如何从列表里获取帧

接下来,分析一下,帧加载器如何从单链表c.list里获取帧:
进入grpc-go/internal/transport/controlbuf.go文件中的get方法里:

1func (c *controlBuffer) get(block bool) (interface{}, error) {
2for {
3.		c.mu.Lock()
4if c.err != nil {
5.			c.mu.Unlock()
6return nil, c.err
7}8if !c.list.isEmpty() {
9.			h := c.list.dequeue().(cbItem)
10if h.isTransportResponseFrame() {
11if c.transportResponseFrames == maxQueuedTransportResponseFrames {
12// We are removing the frame that put us over the
13// threshold; close and clear the throttling channel.
14.					ch := c.trfChan.Load().(*chan struct{})
15close(*ch)
16.					c.trfChan.Store((*chan struct{})(nil))
17}
18.				c.transportResponseFrames--
19}
20.			c.mu.Unlock()
21return h, nil
22}23if !block {
24.			c.mu.Unlock()
25return nil, nil
26}
27.		c.consumerWaiting = true
28.		c.mu.Unlock()29select {
30case <-c.ch:31case <-c.done:
32.			c.finish()
33return nil, ErrConnClosing
34}
35}
36}

在获取帧的时候,提供了两种模式:

1.阻塞式获取帧:  参数block为true时,即阻塞式获取数据;
  • 假设,存储帧的单链表不为空,那直接从单链表里的头部取出帧数据,返回即可
  • 假设,存储帧的单链表为空,进入阻塞状态,等待阻塞解除后,重新尝试获取帧数据
2.非阻塞式获取帧:  参数block为false时,即非阻塞式获取数据;
  • 假设,存储帧的单链表不为空,那直接从单链表里的头部取出帧数据,返回即可

  • 假设,存储帧的单链表为空,直接返回nil,不会再尝试获取数据的。

可见,不管是阻塞式还是非阻塞式,只要存储帧的单链表有帧数据,直接返回帧数据;只是存储帧的单链表为空时有区别。

主要流程说明:

  • 第4-7行:判断controlBuf是否存在异常,有异常时就直接返回;其实,就是获取帧数据前的校验工作
  • 第8-22行:假设c.list.isEmpty() =false的情况下,即列表里存在帧数据
    • 第9行:从列表里获取帧数据
    • 第10行:判断此帧是否是反馈给服务器端的帧;
    • 第11行:当反馈给服务器端的帧的数量满足阈值时
    • 第14-15行:获取阀门通道,并且关闭阀门通道,相当于给阀门通道发送了一个消息;这样的话,在grpc-go/internal/transport/http2_client.go文件中reader方法里:“t.controlBuf.throttle()” 就解除阻塞了,帧接收器又可以接收帧了。
    • 第18行:将transportResponseFrames递减1;其实,在帧的接收器章节已经分析过了,如果阀门通道阻塞了,肯定transportResponseFrames 是达到了阈值,此现象可能的原因:往帧列表里存储的速度特别快,而获取的速度相对慢,以至于transportResponseFrames 的累加速度远大于递减的速度
    • 第21行:将获取的数据,返还;结束循环
  • 第23-26行:如果block为false时,就退出
  • 第27行:将c.consumerWaiting 置为 true,表示消费者正等待获取数据
  • 第30行:从通道里获取数据,没有的话,就处于阻塞状态;直到有新的数据存储到帧列表里,解除阻塞;重洗获取数据。

3、帧存储器executeAndPut和帧加载器get一起分析

接下来,我们将帧存储器executeAndPut和帧加载器get整合起来分析:

1.场景一:采用非阻塞式获取数据,不管单链表c.list里是否有帧数据
  这种场景,帧存储器和帧加载器是没有交互的,各自独立,互不影响
2.场景二:采用阻塞式获取数据,并且单链表c.list为空
  这种场景,帧存储器和帧加载器是有交互的;通过controlBuf里的consumerWaiting,ch来交互的;

grpc 帧存储器和帧加载器

  • 帧存储器将帧存储到单链表的尾部,帧加载器从单链表里获取帧;
  • 若帧加载器get发现单链表里没有帧数据了,此时为空的单链表时,就会将consumerWaiting置为true,以表明,帧加载器处于等待数据状态;于此同时,帧加载器会创建一个通过ch
  • 帧存储器有数据时,发现consumerWaiting为true时,说明,有消费者(帧加载器)处于阻塞等待数据状态,就给通过ch发送信号,告诉消费者,已经有数据了。消费者不用等待了,可以继续消费数据了。

4、总结

  本篇文章,我们主要分析了在将帧数据存储到单链表时前后都做了哪些事情;以及如何从单链表里获取帧数据的方式;可以阻塞式获取,也支持非阻塞式获取;

  到目前为止,我们已经知道了:

  • 如何将帧数据存储起来,
  • 如何获取帧数据,
  • 以及阻塞式获取帧数据的场景下,帧存储器和帧加载器如何交互的;

  接下来会介绍帧发送器的核心原理,而在帧发送器的核心原理中,同时使用了阻塞式获取帧数据和非阻塞式获取帧数据,有了刚才的铺垫,对理解帧发送器的原理应该会有帮助的。

下一篇文章
  帧发送器执行逻辑介绍

点击下面的图片,返回到专栏大纲

gRPC-go源码剖析与实战之专栏大纲

您的每一次点赞,每一次关注,每一次收藏都是对我工作的最大支持,让我们开始 吧!
gRPC-go源码剖析与实战之点赞之交

gRPC-go源码剖析与实战感谢

这篇关于grpc-go源码剖析二十四之在同一个进程中如何使用通知的方式消费数据帧?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1054354

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传