Fabric solo源码之单元测试部分(1)

2023-11-05 00:58

本文主要是介绍Fabric solo源码之单元测试部分(1),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Fabric solo源码之单元测试部分(1)

Fabric solo共识很简单,其本身就是为Fabric的开发人员做实验用的。通过这个简单的本地可运行的共识,能够让开发人员在本地开发共识外的代码。也正是这样,我们可以本地运行solo共识,分析Fabric其他部分的代码。
这里先通过solo共识的单元测试及外围的辅助代码,分析Fabric的共识流程和系统逻辑。

fabric solo单元测试过程涉及到的文件:

  1. hyperledger/fabric/orderer/consensus/solo/ 共识部分及其单元测试
  2. hyperledger/fabric/orderer/mocks/common/blockcutter/ 切分块部分
  3. hyperledger/fabric/orderer/mocks/common/multichannel/ 写入账本的部分
基础

在通道关闭后,即close(chan),仍可尝试读取,再关闭后,直接返回空,但关闭后,不可写入。 参考Solo的测试函数TestStart。

package main
import "fmt"
func main() {ch := make(chan struct{})close(ch)defer func() {//为什么这样会异常?ch已经被close.//ch <- struct{}{}//如下,却可以?关闭后,可以尝试读取,在关闭后,直接返回nila := <-chfmt.Println("a", a)}()}
func TestStart(t *testing.T)

fabric/orderer/consensus/solo/consensus_test.go(源码):

//这个函数在测试什么功能?cutNext?跟TestHaltBeforeTimeout差不多呀?
func TestStart(t *testing.T) {batchTimeout, _ := time.ParseDuration("1ms")support := &mockmultichannel.ConsenterSupport{Blocks:          make(chan *cb.Block),BlockCutterVal:  mockblockcutter.NewReceiver(),SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},}//通过close关闭了mock的<-Receiver.Block//但是close不会阻塞吗?close之后还能用吗?close通道后,可尝试读取,关闭后,会返回空close(support.BlockCutterVal.Block)//在此,创建chainbs, _ := New().HandleChain(support, nil)//启动solo,在Start中启动协程bs.Start()defer bs.Halt()//CutNext干啥的还没搞清楚-->用于切块的,通过该标志,可以实现灵活的划分块,//CutNext=true接来下每次同步消息,都要成块//CutNext=false表示接下来的每次同步,都不成块support.BlockCutterVal.CutNext = true//这里,先调用bs.Order,将返回只传入到assert.Nil判断error是否为空//没有Recever.Block<-struct{}{},那mock ordered <-Receriver.Block不会阻塞吗?//因为之前close(chan)所以mock <-Block不会阻塞assert.Nil(t, bs.Order(testMessage, 0))select {case <-support.Blocks://应该读取空块吧?但是用来做什么的,表明块处理完了?收到的是写入的块//即solo中,ch.support.WriteBlock(block, nil)中进行的,实际上,//在fabric/orderer/mocks/common/multichannel/multichannel.go的WriteBlock中log.Println("support blocks")case <-bs.Errored():t.Fatalf("Expected not to exit")}
}
TestHaltBeforeTimeout
  • 分析方法:

采用return+日志的较笨的方式进行分析,使用了两个tips:

  1. go test不会输出标准日志,即fmt.Println不在再test时输出日志;但可以log.Println()
  2. 使用-run参数指定要测试的函数,可以指定分析test函数,有助于分析代码
localhost:solo liu$ go test -v -run TestHaltBeforeTimeout
=== RUN   TestHaltBeforeTimeout
2019/04/17 22:13:22 here
2019/04/17 22:13:22 chan order sendchan
2019/04/17 22:13:22 mock ordered
2019/04/17 22:13:22 sync will blocking
2019-04-17 22:13:22.875 CST [orderer/consensus/solo] main -> DEBU 001 Exiting
2019/04/17 22:13:22 solo finished
--- PASS: TestHaltBeforeTimeout (0.00s)
PASS
ok      github.com/hyperledger/fabric/orderer/consensus/solo    0.044s
localhost:solo liu$ 
  • 源码中可借鉴的部分:
  1. 通过chan struct{}的方式强调同步关系,以后再看到下代码,要立刻明白这是在进行同步操作,接着就要找到同步双方
bc.Block <- struct{}{}
  1. 使用interface管理具体实例,自己这样用的较少,需要学习下这种编程方式,体会其优势所在
//interface:
// Receiver defines a sink for the ordered broadcast messages
type Receiver interface {// Ordered should be invoked sequentially as messages are ordered// Each batch in `messageBatches` will be wrapped into a block.// `pending` indicates if there are still messages pending in the receiver. It// is useful for Kafka orderer to determine the `LastOffsetPersisted` of block.Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool)// Cut returns the current batch and starts a new oneCut() []*cb.Envelope
}
//...
//solo:batches, _ := ch.support.BlockCutter().Ordered(msg.normalMsg)func NewReceiver() *Receiver {return &Receiver{IsolatedTx:   false,CutAncestors: false,CutNext:      false,Block:        make(chan struct{}),}
}//...
//mock: 实际操作的对象
// Ordered will add or cut the batch according to the state of Receiver, it blocks reading from Block on return
func (mbc *Receiver) Ordered(env *cb.Envelope) ([][]*cb.Envelope, bool) {log.Println("mock ordered")defer func() {<-mbc.Block}()...
}

  • 源码逻辑

这里分析TestHaltBeforeTimeout(t *testing.T)的数据流,主要包括三个部分:

  1. 启动solo算法,Test调用goWithWait启动solo算法
  2. 调用syncQueueMessage小消息加入到处理队列中,其角色类似客户端
  3. 模拟执行,solo算法中,调用 chain.support.BlockCutter().Ordered(msg.normalMsg)模拟执行,执行完会通过写入Receiver.Block同步chan,告知syncQueueMessage处理完成;注意这里的chain.support.BlockCutter().Ordered(msg.normalMsg)是接口的抽象关系,真正调用的是fabric/orderer/mocks/common/blockcutter/blockcutter.go中的Ordered。

具体分析,见如下代码中的注释

fabric/orderer/consensus/solo/consensus_test.go :


func syncQueueMessage(msg *cb.Envelope, chain *chain, bc *mockblockcutter.Receiver) {chain.Order(msg, 0)bc.Block <- struct{}{}//这是在做什么?同步用吗,不会阻塞吗?//bc.Block单独用来同步的,会用struct{}{}来强调表示,夜即无缓冲chan//但他是跟谁同步呢?//从名字看,是同步消息队列-->通过"断路测试"(我自己起的名字嘿,就是通过return,continue,截断程序,加上日志以分析数据流)//该阻塞用于和模拟执行的同步,在fabric/orderer/mocks/common/blockcutter/blockcutter.go的Ordered函数中,该函数结束后//会向Receiver.Block写入同步信号,告知syncQue...模拟处理处已经完成
}type waitableGo struct {done chan struct{}
}func goWithWait(target func()) *waitableGo {wg := &waitableGo{done: make(chan struct{}),}go func() {target()//该协程会阻塞在这;处理从sync加入的消息close(wg.done)//用来对外通知}()//外边结束,里边还不结束吗?return wg
}
// This test checks that if consenter is halted before a timer fires, nothing is actually written.
func TestHaltBeforeTimeout(t *testing.T) {batchTimeout, _ := time.ParseDuration("1ms")//support的构造还不清楚support := &mockmultichannel.ConsenterSupport{Blocks:          make(chan *cb.Block),//消息发送BlockCutterVal:  mockblockcutter.NewReceiver(),SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},}defer close(support.BlockCutterVal.Block)bs := newChain(support)//bs.main是solo算法的启动函数,是个死循环,处理函数wg := goWithWait(bs.main)//启动solo算法,在goWithWait中启动协程,并通过通道通信defer bs.Halt()//中止log.Println("here")syncQueueMessage(testMessage, bs, support.BlockCutterVal)//将消息送入solo排序,//sync阻塞,根本不会接着执行啊-->sync是和mock处理同步的log.Println("sync will blocking")bs.Halt()//中止solo共识select {case <-support.Blocks: //应该读取空块吧?但是用来做什么的,表明块处理完了?收到的是写入的块//即solo中,ch.support.WriteBlock(block, nil)中进行的,实际上,//在fabric/orderer/mocks/common/multichannel/multichannel.go的WriteBlock中t.Fatalf("Expected no invocations of Append")log.Println("block exit")//不从这退出case <-wg.done://共识算法退出标志log.Println("solo finished")}
}

fabric/orderer/consensus/solo/consensus.go:

// Order accepts normal messages for ordering
func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error {//外部通过调用order加入消息//这里,这样处理的目的是什么?//注意这里是两个case,正常是同步sendchan,但是如果要结束系统,由exitchan告知退出//而不用一直阻塞在这里,等待接收处理消息,让外部服务顺滑退出select {case ch.sendChan <- &message{configSeq: configSeq,normalMsg: env,}:log.Println("chan order sendchan")return nilcase <-ch.exitChan: //退出信号return fmt.Errorf("Exiting")}
}
//solo共识算法的主题部分,这里只关心一条数据链路,其他分支省略
func (ch *chain) main() {var timer <-chan time.Timevar err errorfor {seq := ch.support.Sequence()err = nilselect {case msg := <-ch.sendChan://读取发来的消息log.Println("chan receive msg")//continue ;我的代码review的笨办法,日志+断路//事实证明,这里接收sync发来的同步消息if msg.configMsg == nil {// NormalMsgif msg.configSeq < seq {_, err = ch.support.ProcessNormalMsg(msg.normalMsg)if err != nil {logger.Warningf("Discarding bad normal message: %s", err)continue}}//多个断路返回,确定接收同步信号在此,ordered进行了模拟处理batches, _ := ch.support.BlockCutter().Ordered(msg.normalMsg)if len(batches) == 0 && timer == nil {//正常情况下,消息从这结束timer = time.After(ch.support.SharedConfig().BatchTimeout())continue}...} else {...}case <-timer:...case <-ch.exitChan:logger.Debugf("Exiting")return}}
}

fabric/orderer/mocks/common/blockcutter/blockcutter.go:

// Ordered will add or cut the batch according to the state of Receiver, it blocks reading from Block on return
func (mbc *Receiver) Ordered(env *cb.Envelope) ([][]*cb.Envelope, bool) {defer func() { //模拟执行完,会告知syncque...处理完<-mbc.Block}()...
}// Cut terminates the current batch, returning it
func (mbc *Receiver) Cut() []*cb.Envelope {...
}

fabric/orderer/mocks/common/multichannel/multichannel.go

// WriteBlock writes data to the Blocks channel
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {if encodedMetadataValue != nil {block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})}mcs.HeightVal++mcs.Blocks <- block //here
}

这篇关于Fabric solo源码之单元测试部分(1)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/346330

相关文章

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

poj 2976 分数规划二分贪心(部分对总体的贡献度) poj 3111

poj 2976: 题意: 在n场考试中,每场考试共有b题,答对的题目有a题。 允许去掉k场考试,求能达到的最高正确率是多少。 解析: 假设已知准确率为x,则每场考试对于准确率的贡献值为: a - b * x,将贡献值大的排序排在前面舍弃掉后k个。 然后二分x就行了。 代码: #include <iostream>#include <cstdio>#incl

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

red5-server源码

red5-server源码:https://github.com/Red5/red5-server

笔记整理—内核!启动!—kernel部分(2)从汇编阶段到start_kernel

kernel起始与ENTRY(stext),和uboot一样,都是从汇编阶段开始的,因为对于kernel而言,还没进行栈的维护,所以无法使用c语言。_HEAD定义了后面代码属于段名为.head .text的段。         内核起始部分代码被解压代码调用,前面关于uboot的文章中有提到过(eg:zImage)。uboot启动是无条件的,只要代码的位置对,上电就工作,kern