本文主要是介绍Fabric源码分析之四背书(endoser)节点分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、记帐介绍
背书节点只是一个逻辑意义上的节点,并不说专门有一个背书节点的模块。这个在前面的分析中已经提到过。背书和背书策略,在Fabric中是很重要的一个环节,他是整个交易流程中直到确认交易的一个重要环节。背书是对交易的确认(正如银行业的背书),而背书策略是对交易确认的条件。如果不指定背书策略,默认是当前组织的任意成员实现背书即可。
二、启动流程
1、入口
背书节点的启动一如普通的Peer节点启动,但是,这里更关心的是背书服务的启动,毕竟此节点已经成为了一个背书节点。这里先看一下相关启动的代码:
//peer文件夹start.go
func serve(args []string) error{......//将背书服务注册到过滤器中auth := authHandler.ChainFilters(serverEndorser, authFilters...)......
}
这段代码在Peer节点启动时遇到过,但没有深入分析,这里分析一下。在serverConfig中注册了Interceptor,而在interceptor.go中提供了Register这个函数,实现了注册到gRPC服务的流程。ChaincodeSupportServer函数中调用了这个函数,将chaincodeSupport注册到了拦截器中。(server–startChaincodeServer—registerChaincodeSupport),而在前面的
serverConfig.StreamInterceptors = append(serverConfig.StreamInterceptors,grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),throttle.StreamServerInterceptor,
)
已经注册了相关的拦截器。这段代码可以认为是默认创建的拦截器。而背书服务的流程也和这个类似:先是解析core.yaml生成libary.config,然后利用library.InitRegistry(libConf)来生成authFilters,用来过滤合法的消息。最后调用最开始的代码来加入新的过滤器,继续看函数的实现:
func ChainFilters(endorser peer.EndorserServer, filters ...Filter) peer.EndorserServer {if len(filters) == 0 {return endorser}// Each filter forwards to the nextfor i := 0; i < len(filters)-1; i++ {filters[i].Init(filters[i+1])}// Last filter forwards to the endorserfilters[len(filters)-1].Init(endorser)return filters[0]
}
调用的是Filter.go中的代码:
// Init initializes the Filter with the next EndorserServer
func (f *filter) Init(next peer.EndorserServer) {f.next = next
}// ProcessProposal processes a signed proposal
func (f *filter) ProcessProposal(ctx context.Context, signedProp *peer.SignedProposal) (*peer.ProposalResponse, error) {return f.next.ProcessProposal(ctx, signedProp)
}
注意最后一个函数,这才是真刀真枪的干活的入口。它调用的是endorser.go中的ProcessProposal函数,下面会继续分析它。
2、启动相关服务
上面的代码注册好后,启动是在函数sever中的pb.RegisterEndorserServer(peerServer.Server(), auth)这个函数中:
func RegisterEndorserServer(s *grpc.Server, srv EndorserServer) {s.RegisterService(&_Endorser_serviceDesc, srv)
}
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {ht := reflect.TypeOf(sd.HandlerType).Elem()st := reflect.TypeOf(ss)if !st.Implements(ht) {grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)}s.register(sd, ss)
}
func (s *Server) register(sd *ServiceDesc, ss interface{}) {s.mu.Lock()defer s.mu.Unlock()s.printf("RegisterService(%q)", sd.ServiceName)if s.serve {grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)}if _, ok := s.m[sd.ServiceName]; ok {grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)}srv := &service{server: ss,md: make(map[string]*MethodDesc),sd: make(map[string]*StreamDesc),mdata: sd.Metadata,}for i := range sd.Methods {d := &sd.Methods[i]srv.md[d.MethodName] = d}for i := range sd.Streams {d := &sd.Streams[i]srv.sd[d.StreamName] = d}s.m[sd.ServiceName] = srv
}
注册到gRPC服务后,随着gRPC服务的启动,这个服务也就启动了。
3、背书需要的系统链码
背书是通过系统的链码来实现的,所以初始化也离不开这个,首先:
//server函数
peer.Initialize(func(cid string) {logger.Debugf("Deploying system CC, for channel <%s>", cid)sccp.DeploySysCCs(cid, ccp)sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {return peer.GetLedger(cid).NewQueryExecutor()}))if err != nil {logger.Panicf("Failed subscribing to chaincode lifecycle updates")}cceventmgmt.GetMgr().Register(cid, sub)
}, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)
其次仍然在上面提到的startChaincodeServer函数中:
......
ccSrv, ccEndpoint, err := createChaincodeServer(ca, peerHost)
if err != nil {logger.Panicf("Failed to create chaincode server: %s", err)
}
chaincodeSupport, ccp, sccp := registerChaincodeSupport(ccSrv,ccEndpoint,ca,packageProvider,aclProvider,pr,lifecycleSCC,ops,
)
go ccSrv.Start()
......
这里调用了registerChaincodeSupport这个函数:
func registerChaincodeSupport(grpcServer *comm.GRPCServer,ccEndpoint string,ca tlsgen.CA,packageProvider *persistence.PackageProvider,aclProvider aclmgmt.ACLProvider,pr *platforms.Registry,lifecycleSCC *lifecycle.SCC,ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider) {//get user modeuserRunsCC := chaincode.IsDevMode()tlsEnabled := viper.GetBool("peer.tls.enabled")authenticator := accesscontrol.NewAuthenticator(ca)//系统链码的容器虚拟机ipRegistry := inproccontroller.NewRegistry()sccp := scc.NewProvider(peer.Default, peer.DefaultSupport, ipRegistry)lsccInst := lscc.New(sccp, aclProvider, pr)//系统链码容器dockerProvider := dockercontroller.NewProvider(viper.GetString("peer.id"),viper.GetString("peer.networkId"),ops.Provider,)dockerVM := dockercontroller.NewDockerVM(dockerProvider.PeerID,dockerProvider.NetworkID,dockerProvider.BuildMetrics,)err := ops.RegisterChecker("docker", dockerVM)if err != nil {logger.Panicf("failed to register docker health check: %s", err)}chaincodeSupport := chaincode.NewChaincodeSupport(chaincode.GlobalConfig(),ccEndpoint,userRunsCC,ca.CertBytes(),authenticator,packageProvider,lsccInst,aclProvider,container.NewVMController(map[string]container.VMProvider{//系统链码控制器dockercontroller.ContainerType: dockerProvider,//用户链码控制器inproccontroller.ContainerType: ipRegistry,},),sccp,pr,peer.DefaultSupport,ops.Provider,)ipRegistry.ChaincodeSupport = chaincodeSupportccp := chaincode.NewProvider(chaincodeSupport)ccSrv := pb.ChaincodeSupportServer(chaincodeSupport)if tlsEnabled {ccSrv = authenticator.Wrap(ccSrv)}csccInst := cscc.New(ccp, sccp, aclProvider)qsccInst := qscc.New(aclProvider)//Now that chaincode is initialized, register all system chaincodes.sccs := scc.CreatePluginSysCCs(sccp)for _, cc := range append([]scc.SelfDescribingSysCC{lsccInst, csccInst, qsccInst, lifecycleSCC}, sccs...) {sccp.RegisterSysCC(cc)}pb.RegisterChaincodeSupportServer(grpcServer.Server(), ccSrv)return chaincodeSupport, ccp, sccp
}
// RegisterSysCC registers a system chaincode with the syscc provider.
func (p *Provider) RegisterSysCC(scc SelfDescribingSysCC) {p.SysCCs = append(p.SysCCs, scc)_, err := p.registerSysCC(scc)if err != nil {sysccLogger.Panicf("Could not register system chaincode: %s", err)}
}
func (p *Provider) registerSysCC(syscc SelfDescribingSysCC) (bool, error) {if !syscc.Enabled() || !isWhitelisted(syscc) {sysccLogger.Info(fmt.Sprintf("system chaincode (%s,%s,%t) disabled", syscc.Name(), syscc.Path(), syscc.Enabled()))return false, nil}// XXX This is an ugly hack, version should be tied to the chaincode instance, not he peer binaryversion := util.GetSysCCVersion()ccid := &ccintf.CCID{Name: syscc.Name(),Version: version,}err := p.Registrar.Register(ccid, syscc.Chaincode())if err != nil {//if the type is registered, the instance may not be... keep goingif _, ok := err.(inproccontroller.SysCCRegisteredErr); !ok {errStr := fmt.Sprintf("could not register (%s,%v): %s", syscc.Path(), syscc, err)sysccLogger.Error(errStr)return false, fmt.Errorf(errStr)}}sysccLogger.Infof("system chaincode %s(%s) registered", syscc.Name(), syscc.Path())return true, err
}
//Register registers system chaincode with given path. The deploy should be called to initialize
func (r *Registry) Register(ccid *ccintf.CCID, cc shim.Chaincode) error {r.mutex.Lock()defer r.mutex.Unlock()name := ccid.GetName()inprocLogger.Debugf("Registering chaincode instance: %s", name)tmp := r.typeRegistry[name]if tmp != nil {return SysCCRegisteredErr(name)}r.typeRegistry[name] = &inprocContainer{chaincode: cc}return nil
}
通过上述的操作,就会把cscc(配置),lscc(生命周期),escc(背书),vscc(验证),qscc(查询)等系统链码注册到服务中。
三、背书过程
来看正主:
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {// start time for computing elapsed time metric for successfully endorsed proposalsstartTime := time.Now()e.Metrics.ProposalsReceived.Add(1)addr := util.ExtractRemoteAddress(ctx)endorserLogger.Debug("Entering: request from", addr)// variables to capture proposal duration metricvar chainID stringvar hdrExt *pb.ChaincodeHeaderExtensionvar success booldefer func() {// capture proposal duration metric. hdrExt == nil indicates early failure// where we don't capture latency metric. But the ProposalValidationFailed// counter metric should shed light on those failures.if hdrExt != nil {meterLabels := []string{"channel", chainID,"chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version,"success", strconv.FormatBool(success),}e.Metrics.ProposalDuration.With(meterLabels...).Observe(time.Since(startTime).Seconds())}endorserLogger.Debug("Exit: request from", addr)}()// 0 -- check and validate检查并验证消息提案的合法性vr, err := e.preProcess(signedProp)if err != nil {resp := vr.respreturn resp, err}prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid// obtaining once the tx simulator for this proposal. This will be nil// for chainless proposals// Also obtain a history query executor for history queries, since tx simulator does not cover history//获得Tx交易模拟器和历史查询器var txsim ledger.TxSimulatorvar historyQueryExecutor ledger.HistoryQueryExecutorif acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil}// txsim acquires a shared lock on the stateDB. As this would impact the block commits (i.e., commit// of valid write-sets to the stateDB), we must release the lock as early as possible.// Hence, this txsim object is closed in simulateProposal() as soon as the tx is simulated and// rwset is collected before gossip dissemination if required for privateData. For safety, we// add the following defer statement and is useful when an error occur. Note that calling// txsim.Done() more than once does not cause any issue. If the txsim is already// released, the following txsim.Done() simply returns.//模拟器对读写集的控制defer txsim.Done()if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil}}txParams := &ccprovider.TransactionParams{ChannelID: chainID,TxID: txid,SignedProp: signedProp,Proposal: prop,TXSimulator: txsim,HistoryQueryExecutor: historyQueryExecutor,}// this could be a request to a chainless SysCC// TODO: if the proposal has an extension, it will be of type ChaincodeAction;// if it's present it means that no simulation is to be performed because// we're trying to emulate a submitting peer. On the other hand, we need// to validate the supplied action before endorsing it// 1 -- simulate模拟执行并得到相关结果cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)if err != nil {return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil}if res != nil {if res.Status >= shim.ERROR {endorserLogger.Errorf("[%s][%s] simulateProposal() resulted in chaincode %s response status %d for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, res.Status, txid)var cceventBytes []byteif ccevent != nil {cceventBytes, err = putils.GetBytesChaincodeEvent(ccevent)if err != nil {return nil, errors.Wrap(err, "failed to marshal event bytes")}}pResp, err := putils.CreateProposalResponseFailure(prop.Header, prop.Payload, res, simulationResult, cceventBytes, hdrExt.ChaincodeId, hdrExt.PayloadVisibility)if err != nil {return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil}return pResp, nil}}// 2 -- endorse and get a marshalled ProposalResponse message背书并确认消息var pResp *pb.ProposalResponse// TODO till we implement global ESCC, CSCC for system chaincodes// chainless proposals (such as CSCC) don't have to be endorsed没有链提案不用背书if chainID == "" {pResp = &pb.ProposalResponse{Response: res}} else {// Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim//真正干活的地方来了pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)// if error, capture endorsement failure metricmeterLabels := []string{"channel", chainID,"chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version,}if err != nil {meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(false))e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1)return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil}if pResp.Response.Status >= shim.ERRORTHRESHOLD {// the default ESCC treats all status codes about threshold as errors and fails endorsement// useful to track this as a separate metricmeterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(true))e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1)endorserLogger.Debugf("[%s][%s] endorseProposal() resulted in chaincode %s error for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, txid)return pResp, nil}}// Set the proposal response payload - it// contains the "return value" from the// chaincode invocation//创建相关的返回值pResp.Response = res// total failed proposals = ProposalsReceived-SuccessfulProposalse.Metrics.SuccessfulProposals.Add(1)success = truereturn pResp, nil
}
这才是正主,真正干活的,看看它如何工作。它会调用:
// preProcess checks the tx proposal headers, uniqueness and ACL
func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, error) {vr := &validateResult{}// at first, we check whether the message is validprop, hdr, hdrExt, err := validation.ValidateProposalMessage(signedProp)if err != nil {e.Metrics.ProposalValidationFailed.Add(1)vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}return vr, err}chdr, err := putils.UnmarshalChannelHeader(hdr.ChannelHeader)if err != nil {vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}return vr, err}shdr, err := putils.GetSignatureHeader(hdr.SignatureHeader)if err != nil {vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}return vr, err}// block invocations to security-sensitive system chaincodesif e.s.IsSysCCAndNotInvokableExternal(hdrExt.ChaincodeId.Name) {endorserLogger.Errorf("Error: an attempt was made by %#v to invoke system chaincode %s", shdr.Creator, hdrExt.ChaincodeId.Name)err = errors.Errorf("chaincode %s cannot be invoked through a proposal", hdrExt.ChaincodeId.Name)vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}return vr, err}chainID := chdr.ChannelIdtxid := chdr.TxIdendorserLogger.Debugf("[%s][%s] processing txid: %s", chainID, shorttxid(txid), txid)if chainID != "" {// labels that provide context for failure metricsmeterLabels := []string{"channel", chainID,"chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version,}// Here we handle uniqueness check and ACLs for proposals targeting a chain// Notice that ValidateProposalMessage has already verified that TxID is computed properlyif _, err = e.s.GetTransactionByID(chainID, txid); err == nil {// increment failure due to duplicate transactions. Useful for catching replay attacks in// addition to benign retriese.Metrics.DuplicateTxsFailure.With(meterLabels...).Add(1)err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator)vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}return vr, err}// check ACL only for application chaincodes; ACLs// for system chaincodes are checked elsewhereif !e.s.IsSysCC(hdrExt.ChaincodeId.Name) {// check that the proposal complies with the Channel's writersif err = e.s.CheckACL(signedProp, chdr, shdr, hdrExt); err != nil {e.Metrics.ProposalACLCheckFailed.With(meterLabels...).Add(1)vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}return vr, err}}} else {// chainless proposals do not/cannot affect ledger and cannot be submitted as transactions// ignore uniqueness checks; also, chainless proposals are not validated using the policies// of the chain since by definition there is no chain; they are validated against the local// MSP of the peer instead by the call to ValidateProposalMessage above}vr.prop, vr.hdrExt, vr.chainID, vr.txid = prop, hdrExt, chainID, txidreturn vr, nil
}// SimulateProposal simulates the proposal by calling the chaincode
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) {endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", txParams.ChannelID, shorttxid(txParams.TxID), cid)defer endorserLogger.Debugf("[%s][%s] Exit", txParams.ChannelID, shorttxid(txParams.TxID))// we do expect the payload to be a ChaincodeInvocationSpec// if we are supporting other payloads in future, this be glaringly point// as something that should changecis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal)if err != nil {return nil, nil, nil, nil, err}var cdLedger ccprovider.ChaincodeDefinitionvar version stringif !e.s.IsSysCC(cid.Name) {cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator)if err != nil {return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name))}version = cdLedger.CCVersion()err = e.s.CheckInstantiationPolicy(cid.Name, version, cdLedger)if err != nil {return nil, nil, nil, nil, err}} else {version = util.GetSysCCVersion()}// ---3. execute the proposal and get simulation resultsvar simResult *ledger.TxSimulationResultsvar pubSimResBytes []bytevar res *pb.Responsevar ccevent *pb.ChaincodeEventres, ccevent, err = e.callChaincode(txParams, version, cis.ChaincodeSpec.Input, cid)if err != nil {endorserLogger.Errorf("[%s][%s] failed to invoke chaincode %s, error: %+v", txParams.ChannelID, shorttxid(txParams.TxID), cid, err)return nil, nil, nil, nil, err}if txParams.TXSimulator != nil {if simResult, err = txParams.TXSimulator.GetTxSimulationResults(); err != nil {txParams.TXSimulator.Done()return nil, nil, nil, nil, err}if simResult.PvtSimulationResults != nil {if cid.Name == "lscc" {// TODO: remove once we can store collection configuration outside of LSCCtxParams.TXSimulator.Done()return nil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate")}pvtDataWithConfig, err := e.AssemblePvtRWSet(simResult.PvtSimulationResults, txParams.TXSimulator)// To read collection config need to read collection updates before// releasing the lock, hence txParams.TXSimulator.Done() moved down heretxParams.TXSimulator.Done()if err != nil {return nil, nil, nil, nil, errors.WithMessage(err, "failed to obtain collections config")}endorsedAt, err := e.s.GetLedgerHeight(txParams.ChannelID)if err != nil {return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprint("failed to obtain ledger height for channel", txParams.ChannelID))}// Add ledger height at which transaction was endorsed,// `endorsedAt` is obtained from the block storage and at times this could be 'endorsement Height + 1'.// However, since we use this height only to select the configuration (3rd parameter in distributePrivateData) and// manage transient store purge for orphaned private writesets (4th parameter in distributePrivateData), this works for now.// Ideally, ledger should add support in the simulator as a first class function `GetHeight()`.pvtDataWithConfig.EndorsedAt = endorsedAtif err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {return nil, nil, nil, nil, err}}txParams.TXSimulator.Done()if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {return nil, nil, nil, nil, err}}return cdLedger, res, pubSimResBytes, ccevent, nil
}// endorse the proposal by calling the ESCC
func (e *Endorser) endorseProposal(_ context.Context, chainID string, txid string, signedProp *pb.SignedProposal, proposal *pb.Proposal, response *pb.Response, simRes []byte, event *pb.ChaincodeEvent, visibility []byte, ccid *pb.ChaincodeID, txsim ledger.TxSimulator, cd ccprovider.ChaincodeDefinition) (*pb.ProposalResponse, error) {endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", chainID, shorttxid(txid), ccid)defer endorserLogger.Debugf("[%s][%s] Exit", chainID, shorttxid(txid))isSysCC := cd == nil// 1) extract the name of the escc that is requested to endorse this chaincodevar escc string// ie, "lscc" or system chaincodesif isSysCC {escc = "escc"} else {escc = cd.Endorsement()}endorserLogger.Debugf("[%s][%s] escc for chaincode %s is %s", chainID, shorttxid(txid), ccid, escc)// marshalling event bytesvar err errorvar eventBytes []byteif event != nil {eventBytes, err = putils.GetBytesChaincodeEvent(event)if err != nil {return nil, errors.Wrap(err, "failed to marshal event bytes")}}// set version of executing chaincodeif isSysCC {// if we want to allow mixed fabric levels we should// set syscc version to ""ccid.Version = util.GetSysCCVersion()} else {ccid.Version = cd.CCVersion()}ctx := Context{PluginName: escc,Channel: chainID,SignedProposal: signedProp,ChaincodeID: ccid,Event: eventBytes,SimRes: simRes,Response: response,Visibility: visibility,Proposal: proposal,TxID: txid,}return e.s.EndorseWithPlugin(ctx)
}
preProcess这个函数检查签名提案的消息的格式和签名是否正确,包括通道头、签名头等。然后通过交易的ID检查交易的唯一性,权限和策略。然后再调用simulateProposal启动模拟执行,并将结果记录到模拟交易器中。它又会调用 callChaincode启动链码容器并执行链码,而方法则会调用Exccute方法最终执行。再通过调用GetTxSimulationResults获得执行的结果的读写集。最后如果一切OK,则继续签名背书即调用 endorseProposal签名背书。在此函数中,先要检查一下系统链码的合法性,如果为空则直接创建返回escc实例对象。然后调用背书插件传入相关的上下文进行背书。真正的背书代码在插件:
// EndorseWithPlugin endorses the response with a plugin
func (pe *PluginEndorser) EndorseWithPlugin(ctx Context) (*pb.ProposalResponse, error) {endorserLogger.Debug("Entering endorsement for", ctx)if ctx.Response == nil {return nil, errors.New("response is nil")}if ctx.Response.Status >= shim.ERRORTHRESHOLD {return &pb.ProposalResponse{Response: ctx.Response}, nil}plugin, err := pe.getOrCreatePlugin(PluginName(ctx.PluginName), ctx.Channel)if err != nil {endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)return nil, errors.Errorf("plugin with name %s could not be used: %v", ctx.PluginName, err)}prpBytes, err := proposalResponsePayloadFromContext(ctx)if err != nil {endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)return nil, errors.Wrap(err, "failed assembling proposal response payload")}endorsement, prpBytes, err := plugin.Endorse(prpBytes, ctx.SignedProposal)if err != nil {endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)return nil, errors.WithStack(err)}resp := &pb.ProposalResponse{Version: 1,Endorsement: endorsement,Payload: prpBytes,Response: ctx.Response,}endorserLogger.Debug("Exiting", ctx)return resp, nil
}
func (e *DefaultEndorsement) Endorse(prpBytes []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error) {signer, err := e.SigningIdentityForRequest(sp)if err != nil {return nil, nil, errors.New(fmt.Sprintf("failed fetching signing identity: %v", err))}// serialize the signing identityidentityBytes, err := signer.Serialize()if err != nil {return nil, nil, errors.New(fmt.Sprintf("could not serialize the signing identity: %v", err))}// sign the concatenation of the proposal response and the serialized endorser identity with this endorser's keysignature, err := signer.Sign(append(prpBytes, identityBytes...))if err != nil {return nil, nil, errors.New(fmt.Sprintf("could not sign the proposal response payload: %v", err))}endorsement := &peer.Endorsement{Signature: signature, Endorser: identityBytes}return endorsement, prpBytes, nil
}
其实仍然是检查条件,如参数等,然后获得背书插件,加载上下文,背书,获得签名并配合策略签名,然后返回到背书插件中构建返回的消息数据。
四、总结
背书中的细节仍然有很多,比如如何启动容器,如何与容器通信,链码的执行等等,还是老样子,边走边看边分析,不要一下子扎到底,收不回来。注重细节,但是不要沉迷于细节,这才是正道。
这篇关于Fabric源码分析之四背书(endoser)节点分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!