Hyperledger Fabric(3) - 源码分析之Peer启动流程

2024-05-07 13:32

本文主要是介绍Hyperledger Fabric(3) - 源码分析之Peer启动流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 背景

一个区块链项目的启动通常是通过命令行拉起peer,本文主要从代码角度分析Peer的启动流程。

2. /fabric/cmd目录

从命名上看,此目录大概率存放的是项目的命令行代码文件夹

✘ wujinquan@wujinquandeMacBook-Pro  ~/workspace/gospace/src/github.com/hyperledger/fabric/cmd  tree -L 1
.
├── common    		//公共代码
├── configtxgen  	//创世块等初始文件和通道初始文件,对应容器、共识等
├── configtxlator 	//二进制文件和JSON的转换,对应着帐本、交易等
├── cryptogen  		//加密相关服务
├── discover  		//发现节点服务
├── idemixgen
├── orderer  		//排序服务,对应共识、区块等
└── peer     		//对应着帐本数据、链码、容器、MSP、接口等。8 directories, 0 fileswujinquan@wujinquandeMacBook-Pro  ~/workspace/gospace/src/github.com/hyperledger/fabric/cmd 

3. Peer启动流程

3.1 入口函数 /fabric/cmd/peer/main.go
// The main command describes the service and
// defaults to printing the help message.
var mainCmd = &cobra.Command{Use: "peer"}func main() {//上篇文章分析过,加载环境变量// For environment variables.viper.SetEnvPrefix(common.CmdRoot)viper.AutomaticEnv()replacer := strings.NewReplacer(".", "_")viper.SetEnvKeyReplacer(replacer)//处理命令格式,包括所有的peer相关的命令// Define command-line flags that are valid for all peer commands and// subcommands.mainFlags := mainCmd.PersistentFlags()mainFlags.String("logging-level", "", "Legacy logging level flag")viper.BindPFlag("logging_level", mainFlags.Lookup("logging-level"))mainFlags.MarkHidden("logging-level")cryptoProvider := factory.GetDefault()//Add 相关的命令,如版本、node、chaincode、通道等mainCmd.AddCommand(version.Cmd())mainCmd.AddCommand(node.Cmd())mainCmd.AddCommand(chaincode.Cmd(nil, cryptoProvider))mainCmd.AddCommand(channel.Cmd(nil))mainCmd.AddCommand(lifecycle.Cmd(cryptoProvider))//真正执行的地方,并处理异常,主要的任务由cobra完成,此处仅在错误时返回一个非零值// On failure Cobra prints the usage message and error string, so we only// need to exit with a non-0 statusif mainCmd.Execute() != nil {os.Exit(1)}
}

代码简单得出奇,Golang项目都有相同的特征,如ETH的入口函数也是极简洁。然而代码少,估计背后的服务啥的启动的少不了。以node.Cmd为例,分析背后具体做了什么

func startCmd() *cobra.Command {// Set the flags on the node start command.flags := nodeStartCmd.Flags()flags.BoolVarP(&chaincodeDevMode, "peer-chaincodedev", "", false, "start peer in chaincode development mode")return nodeStartCmd
}
//返回的是下面的结构体实例
var nodeStartCmd = &cobra.Command{Use:   "start",Short: "Starts the node.",Long:  `Starts a node that interacts with the network.`,RunE: func(cmd *cobra.Command, args []string) error {if len(args) != 0 {return fmt.Errorf("trailing args detected")}// Parsing of the command line is done so silence cmd usagecmd.SilenceUsage = truereturn serve(args)},
}

以上是加载命令的过程,命令真正的执行过程如下

// Execute uses the args (os.Args[1:] by default)
// and run through the command tree finding appropriate matches
// for commands and then corresponding flags.
func (c *Command) Execute() error {_, err := c.ExecuteC()return err
}// ExecuteC executes the command.
func (c *Command) ExecuteC() (cmd *Command, err error) {// Regardless of what command execute is called on, run on Root only// 仅运行根命令if c.HasParent() {return c.Root().ExecuteC()}// windows hookif preExecHookFn != nil {preExecHookFn(c)}// initialize help as the last point possible to allow for user// overriding// 默认初始化helpc.InitDefaultHelpCmd()args := c.args// Workaround FAIL with "go test -v" or "cobra.test -test.v", see #155if c.args == nil && filepath.Base(os.Args[0]) != "cobra.test" {args = os.Args[1:]}var flags []stringif c.TraverseChildren {cmd, flags, err = c.Traverse(args)} else {cmd, flags, err = c.Find(args)}if err != nil {// If found parse to a subcommand and then failed, talk about the subcommandif cmd != nil {c = cmd}if !c.SilenceErrors {c.Println("Error:", err.Error())c.Printf("Run '%v --help' for usage.\n", c.CommandPath())}return c, err}cmd.commandCalledAs.called = trueif cmd.commandCalledAs.name == "" {cmd.commandCalledAs.name = cmd.Name()}//关键在这里err = cmd.execute(flags)
...return cmd, err
}func (c *Command) execute(a []string) (err error) {
...// initialize help and version flag at the last point possible to allow for user// overriding//初始化默认的help和版本标记c.InitDefaultHelpFlag()c.InitDefaultVersionFlag()//解析输入的标记err = c.ParseFlags(a)if err != nil {return c.FlagErrorFunc()(c, err)}// If help is called, regardless of other flags, return we want help.// Also say we need help if the command isn't runnable.helpVal, err := c.Flags().GetBool("help")if err != nil {// should be impossible to get here as we always declare a help// flag in InitDefaultHelpFlag()c.Println("\"help\" flag declared as non-bool. Please correct your code")return err}if helpVal {return flag.ErrHelp}// for back-compat, only add version flag behavior if version is definedif c.Version != "" {versionVal, err := c.Flags().GetBool("version")if err != nil {c.Println("\"version\" flag declared as non-bool. Please correct your code")return err}if versionVal {err := tmpl(c.OutOrStdout(), c.VersionTemplate(), c)if err != nil {c.Println(err)}return err}}if !c.Runnable() {return flag.ErrHelp}c.preRun()//解析参数argWoFlags := c.Flags().Args()if c.DisableFlagParsing {argWoFlags = a}if err := c.ValidateArgs(argWoFlags); err != nil {return err}//各种执行,包括父子命令和不同的命令格式情况下调用不同的命令形式运行for p := c; p != nil; p = p.Parent() {if p.PersistentPreRunE != nil {if err := p.PersistentPreRunE(c, argWoFlags); err != nil {return err}break} else if p.PersistentPreRun != nil {p.PersistentPreRun(c, argWoFlags)break}}if c.PreRunE != nil {if err := c.PreRunE(c, argWoFlags); err != nil {return err}} else if c.PreRun != nil {c.PreRun(c, argWoFlags)}if err := c.validateRequiredFlags(); err != nil {return err}if c.RunE != nil {if err := c.RunE(c, argWoFlags); err != nil {return err}} else {c.Run(c, argWoFlags)}if c.PostRunE != nil {if err := c.PostRunE(c, argWoFlags); err != nil {return err}} else if c.PostRun != nil {c.PostRun(c, argWoFlags)}for p := c; p != nil; p = p.Parent() {if p.PersistentPostRunE != nil {if err := p.PersistentPostRunE(c, argWoFlags); err != nil {return err}break} else if p.PersistentPostRun != nil {p.PersistentPostRun(c, argWoFlags)break}}return nil
}

上段代码中很长的if…else就是一开始命令中的设置。
一个命令执行,如peer node start xxx,就可以从这里解析

3.2 服务启动

分析了命令的运行流程之后,继续看node.startCmd()最后调用serve(args)之后做了什么。

func serve(args []string) error {// currently the peer only works with the standard MSP// because in certain scenarios the MSP has to make sure// that from a single credential you only have a single 'identity'.// Idemix does not support this *YET* but it can be easily// fixed to support it. For now, we just make sure that// the peer only comes up with the standard MSP//MSP处理mspType := mgmt.GetLocalMSP(factory.GetDefault()).GetType()if mspType != msp.FABRIC {panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))}// Trace RPCs with the golang.org/x/net/trace package. This was moved out of// the deliver service connection factory as it has process wide implications// and was racy with respect to initialization of gRPC clients and servers.grpc.EnableTracing = truelogger.Infof("Starting %s", version.GetInfo())//obtain coreConfigurationcoreConfig, err := peer.GlobalConfig()if err != nil {return err}platformRegistry := platforms.NewRegistry(platforms.SupportedPlatforms...)//获取通道管理者identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {return mgmt.GetManagerForChain(chainID)}//处理相关环境操作保存相关地址、端口、证书及TLS等。opsSystem := newOperationsSystem(coreConfig)err = opsSystem.Start()if err != nil {return errors.WithMessage(err, "failed to initialize operations subsystems")}defer opsSystem.Stop()//监控实例,监控相关节点信息并记录metricsProvider := opsSystem.ProviderlogObserver := floggingmetrics.NewObserver(metricsProvider)flogging.SetObserver(logObserver)//保存连接成员信息,方便应用membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)//读取配置中MSPIDmspID := coreConfig.LocalMSPID//chaincode 安装路径chaincodeInstallPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "lifecycle", "chaincodes")ccStore := persistence.NewStore(chaincodeInstallPath)ccPackageParser := &persistence.ChaincodePackageParser{MetadataProvider: ccprovider.PersistenceAdapter(ccprovider.MetadataAsTarEntries),}//节点hostpeerHost, _, err := net.SplitHostPort(coreConfig.PeerAddress)if err != nil {return fmt.Errorf("peer address is not in the format of host:port: %v", err)}//监听配置中的地址listenAddr := coreConfig.ListenAddressserverConfig, err := peer.GetServerConfig()if err != nil {logger.Fatalf("Error loading secure config for peer (%s)", err)}//设置Server配置serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")serverConfig.ServerStatsHandler = comm.NewServerStatsHandler(metricsProvider)serverConfig.UnaryInterceptors = append(serverConfig.UnaryInterceptors,grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),)serverConfig.StreamInterceptors = append(serverConfig.StreamInterceptors,grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),)cs := comm.NewCredentialSupport()if serverConfig.SecOpts.UseTLS {logger.Info("Starting peer with TLS enabled")cs = comm.NewCredentialSupport(serverConfig.SecOpts.ServerRootCAs...)// set the cert to use if client auth is requested by remote endpointsclientCert, err := peer.GetClientCertificate()if err != nil {logger.Fatalf("Failed to set TLS client certificate (%s)", err)}cs.SetClientCertificate(clientCert)}//创建节点RPC服务peerServer, err := comm.NewGRPCServer(listenAddr, serverConfig)if err != nil {logger.Fatalf("Failed to create peer server (%s)", err)}transientStoreProvider, err := transientstore.NewStoreProvider(filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "transientstore"),)if err != nil {return errors.WithMessage(err, "failed to open transient store")}deliverServiceConfig := deliverservice.GlobalConfig()peerInstance := &peer.Peer{Server:                   peerServer,ServerConfig:             serverConfig,CredentialSupport:        cs,StoreProvider:            transientStoreProvider,CryptoProvider:           factory.GetDefault(),OrdererEndpointOverrides: deliverServiceConfig.OrdererEndpointOverrides,}获得本地签名的身份信息,包括节点的功能,如背书和验证等localMSP := mgmt.GetLocalMSP(factory.GetDefault())signingIdentity, err := localMSP.GetDefaultSigningIdentity()if err != nil {logger.Panicf("Could not get the default signing identity from the local MSP: [%+v]", err)}signingIdentityBytes, err := signingIdentity.Serialize()if err != nil {logger.Panicf("Failed to serialize the signing identity: %v", err)}expirationLogger := flogging.MustGetLogger("certmonitor")crypto.TrackExpiration(serverConfig.SecOpts.UseTLS,serverConfig.SecOpts.Certificate,cs.GetClientCertificate().Certificate,signingIdentityBytes,expirationLogger.Warnf, // This can be used to piggyback a metric event in the futuretime.Now(),time.AfterFunc)policyMgr := policies.PolicyManagerGetterFunc(peerInstance.GetPolicyManager)deliverGRPCClient, err := comm.NewGRPCClient(comm.ClientConfig{Timeout: deliverServiceConfig.ConnectionTimeout,KaOpts:  deliverServiceConfig.KeepaliveOptions,SecOpts: deliverServiceConfig.SecOpts,})if err != nil {logger.Panicf("Could not create the deliver grpc client: [%+v]", err)}// FIXME: Creating the gossip service has the side effect of starting a bunch// of go routines and registration with the grpc server.// 初始化gossip 服务gossipService, err := initGossipService(policyMgr,metricsProvider,peerServer,signingIdentity,cs,coreConfig.PeerAddress,deliverGRPCClient,deliverServiceConfig,)if err != nil {return errors.WithMessage(err, "failed to initialize gossip service")}defer gossipService.Stop()peerInstance.GossipService = gossipServicepolicyChecker := policy.NewPolicyChecker(policies.PolicyManagerGetterFunc(peerInstance.GetPolicyManager),mgmt.GetLocalMSP(factory.GetDefault()),mgmt.NewLocalMSPPrincipalGetter(factory.GetDefault()),)//startup aclmgmt with default ACL providers (resource based and default 1.0 policies based).//Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this)aclProvider := aclmgmt.NewACLProvider(aclmgmt.ResourceGetter(peerInstance.GetStableChannelConfig),policyChecker,)// TODO, unfortunately, the lifecycle initialization is very unclean at the// moment. This is because ccprovider.SetChaincodePath only works after// ledgermgmt.Initialize, but ledgermgmt.Initialize requires a reference to// lifecycle.  Finally, lscc requires a reference to the system chaincode// provider in order to be created, which requires chaincode support to be// up, which also requires, you guessed it, lifecycle. Once we remove the// v1.0 lifecycle, we should be good to collapse all of the init of lifecycle// to this point.lifecycleResources := &lifecycle.Resources{Serializer:          &lifecycle.Serializer{},ChannelConfigSource: peerInstance,ChaincodeStore:      ccStore,PackageParser:       ccPackageParser,}lifecycleValidatorCommitter := &lifecycle.ValidatorCommitter{Resources:                    lifecycleResources,LegacyDeployedCCInfoProvider: &lscc.DeployedCCInfoProvider{},}ccInfoFSImpl := &ccprovider.CCInfoFSImpl{GetHasher: factory.GetDefault()}// legacyMetadataManager collects metadata information from the legacy// lifecycle (lscc). This is expected to disappear with FAB-15061.legacyMetadataManager, err := cclifecycle.NewMetadataManager(cclifecycle.EnumerateFunc(func() ([]ccdef.InstalledChaincode, error) {return ccInfoFSImpl.ListInstalledChaincodes(ccInfoFSImpl.GetChaincodeInstallPath(), ioutil.ReadDir, ccprovider.LoadPackage)},),)if err != nil {logger.Panicf("Failed creating LegacyMetadataManager: +%v", err)}// metadataManager aggregates metadata information from _lifecycle and// the legacy lifecycle (lscc).metadataManager := lifecycle.NewMetadataManager()// the purpose of these two managers is to feed per-channel chaincode data// into gossip owing to the fact that we are transitioning from lscc to// _lifecycle, we still have two providers of such information until v2.1,// in which we will remove the legacy.//// the flow of information is the following//// gossip <-- metadataManager <-- lifecycleCache  (for _lifecycle)//                             \//                              - legacyMetadataManager (for lscc)//// FAB-15061 tracks the work necessary to remove LSCC, at which point we// will be able to simplify the flow to simply be//// gossip <-- lifecycleCachechaincodeCustodian := lifecycle.NewChaincodeCustodian()externalBuilderOutput := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "externalbuilder", "builds")if err := os.MkdirAll(externalBuilderOutput, 0700); err != nil {logger.Panicf("could not create externalbuilder build output dir: %s", err)}ebMetadataProvider := &externalbuilder.MetadataProvider{DurablePath: externalBuilderOutput,}lifecycleCache := lifecycle.NewCache(lifecycleResources, mspID, metadataManager, chaincodeCustodian, ebMetadataProvider)txProcessors := map[common.HeaderType]ledger.CustomTxProcessor{common.HeaderType_CONFIG: &peer.ConfigTxProcessor{},}peerInstance.LedgerMgr = ledgermgmt.NewLedgerMgr(&ledgermgmt.Initializer{CustomTxProcessors:              txProcessors,DeployedChaincodeInfoProvider:   lifecycleValidatorCommitter,MembershipInfoProvider:          membershipInfoProvider,ChaincodeLifecycleEventProvider: lifecycleCache,MetricsProvider:                 metricsProvider,HealthCheckRegistry:             opsSystem,StateListeners:                  []ledger.StateListener{lifecycleCache},Config:                          ledgerConfig(),Hasher:                          factory.GetDefault(),EbMetadataProvider:              ebMetadataProvider,},)// Configure CC package storagelsccInstallPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "chaincodes")ccprovider.SetChaincodesPath(lsccInstallPath)if err := lifecycleCache.InitializeLocalChaincodes(); err != nil {return errors.WithMessage(err, "could not initialize local chaincodes")}// Parameter overrides must be processed before any parameters are// cached. Failures to cache cause the server to terminate immediately.if chaincodeDevMode {logger.Info("Running in chaincode development mode")logger.Info("Disable loading validity system chaincode")viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)}mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCertpolicyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {return func(env *cb.Envelope, channelID string) error {return aclProvider.CheckACL(resourceName, channelID, env)}}metrics := deliver.NewMetrics(metricsProvider)abServer := &peer.DeliverServer{DeliverHandler: deliver.NewHandler(&peer.DeliverChainManager{Peer: peerInstance},coreConfig.AuthenticationTimeWindow,mutualTLS,metrics,false,),PolicyCheckerProvider: policyCheckerProvider,}pb.RegisterDeliverServer(peerServer.Server(), abServer)// Create a self-signed CA for chaincode serviceca, err := tlsgen.NewCA()if err != nil {logger.Panic("Failed creating authentication layer:", err)}ccSrv, ccEndpoint, err := createChaincodeServer(coreConfig, ca, peerHost)if err != nil {logger.Panicf("Failed to create chaincode server: %s", err)}//get user modeuserRunsCC := chaincode.IsDevMode()tlsEnabled := coreConfig.PeerTLSEnabled// create chaincode specific tls CAauthenticator := accesscontrol.NewAuthenticator(ca)chaincodeHandlerRegistry := chaincode.NewHandlerRegistry(userRunsCC)lifecycleTxQueryExecutorGetter := &chaincode.TxQueryExecutorGetter{CCID:            scc.ChaincodeID(lifecycle.LifecycleNamespace),HandlerRegistry: chaincodeHandlerRegistry,}if coreConfig.VMEndpoint == "" && len(coreConfig.ExternalBuilders) == 0 {logger.Panic("VMEndpoint not set and no ExternalBuilders defined")}chaincodeConfig := chaincode.GlobalConfig()var client *docker.Clientvar dockerVM *dockercontroller.DockerVMif coreConfig.VMEndpoint != "" {client, err = createDockerClient(coreConfig)if err != nil {logger.Panicf("cannot create docker client: %s", err)}dockerVM = &dockercontroller.DockerVM{PeerID:        coreConfig.PeerID,NetworkID:     coreConfig.NetworkID,BuildMetrics:  dockercontroller.NewBuildMetrics(opsSystem.Provider),Client:        client,AttachStdOut:  coreConfig.VMDockerAttachStdout,HostConfig:    getDockerHostConfig(),ChaincodePull: coreConfig.ChaincodePull,NetworkMode:   coreConfig.VMNetworkMode,PlatformBuilder: &platforms.Builder{Registry: platformRegistry,Client:   client,},// This field is superfluous for chaincodes built with v2.0+ binaries// however, to prevent users from being forced to rebuild leaving for now// but it should be removed in the future.LoggingEnv: []string{"CORE_CHAINCODE_LOGGING_LEVEL=" + chaincodeConfig.LogLevel,"CORE_CHAINCODE_LOGGING_SHIM=" + chaincodeConfig.ShimLogLevel,"CORE_CHAINCODE_LOGGING_FORMAT=" + chaincodeConfig.LogFormat,},MSPID: mspID,}if err := opsSystem.RegisterChecker("docker", dockerVM); err != nil {logger.Panicf("failed to register docker health check: %s", err)}}externalVM := &externalbuilder.Detector{Builders:    externalbuilder.CreateBuilders(coreConfig.ExternalBuilders, mspID),DurablePath: externalBuilderOutput,}buildRegistry := &container.BuildRegistry{}containerRouter := &container.Router{DockerBuilder:   dockerVM,ExternalBuilder: externalVMAdapter{externalVM},PackageProvider: &persistence.FallbackPackageLocator{ChaincodePackageLocator: &persistence.ChaincodePackageLocator{ChaincodeDir: chaincodeInstallPath,},LegacyCCPackageLocator: &ccprovider.CCInfoFSImpl{GetHasher: factory.GetDefault()},},}builtinSCCs := map[string]struct{}{"lscc":       {},"qscc":       {},"cscc":       {},"_lifecycle": {},}lsccInst := &lscc.SCC{BuiltinSCCs: builtinSCCs,Support: &lscc.SupportImpl{GetMSPIDs: peerInstance.GetMSPIDs,},SCCProvider:        &lscc.PeerShim{Peer: peerInstance},ACLProvider:        aclProvider,GetMSPIDs:          peerInstance.GetMSPIDs,PolicyChecker:      policyChecker,BCCSP:              factory.GetDefault(),BuildRegistry:      buildRegistry,ChaincodeBuilder:   containerRouter,EbMetadataProvider: ebMetadataProvider,}chaincodeEndorsementInfo := &lifecycle.ChaincodeEndorsementInfoSource{LegacyImpl:  lsccInst,Resources:   lifecycleResources,Cache:       lifecycleCache,BuiltinSCCs: builtinSCCs,}containerRuntime := &chaincode.ContainerRuntime{BuildRegistry:   buildRegistry,ContainerRouter: containerRouter,}lifecycleFunctions := &lifecycle.ExternalFunctions{Resources:                 lifecycleResources,InstallListener:           lifecycleCache,InstalledChaincodesLister: lifecycleCache,ChaincodeBuilder:          containerRouter,BuildRegistry:             buildRegistry,}lifecycleSCC := &lifecycle.SCC{Dispatcher: &dispatcher.Dispatcher{Protobuf: &dispatcher.ProtobufImpl{},},DeployedCCInfoProvider: lifecycleValidatorCommitter,QueryExecutorProvider:  lifecycleTxQueryExecutorGetter,Functions:              lifecycleFunctions,OrgMSPID:               mspID,ChannelConfigSource:    peerInstance,ACLProvider:            aclProvider,}chaincodeLauncher := &chaincode.RuntimeLauncher{Metrics:           chaincode.NewLaunchMetrics(opsSystem.Provider),Registry:          chaincodeHandlerRegistry,Runtime:           containerRuntime,StartupTimeout:    chaincodeConfig.StartupTimeout,CertGenerator:     authenticator,CACert:            ca.CertBytes(),PeerAddress:       ccEndpoint,ConnectionHandler: &extcc.ExternalChaincodeRuntime{},}// Keep TestQueries workingif !chaincodeConfig.TLSEnabled {chaincodeLauncher.CertGenerator = nil}chaincodeSupport := &chaincode.ChaincodeSupport{ACLProvider:            aclProvider,AppConfig:              peerInstance,DeployedCCInfoProvider: lifecycleValidatorCommitter,ExecuteTimeout:         chaincodeConfig.ExecuteTimeout,InstallTimeout:         chaincodeConfig.InstallTimeout,HandlerRegistry:        chaincodeHandlerRegistry,HandlerMetrics:         chaincode.NewHandlerMetrics(opsSystem.Provider),Keepalive:              chaincodeConfig.Keepalive,Launcher:               chaincodeLauncher,Lifecycle:              chaincodeEndorsementInfo,Peer:                   peerInstance,Runtime:                containerRuntime,BuiltinSCCs:            builtinSCCs,TotalQueryLimit:        chaincodeConfig.TotalQueryLimit,UserRunsCC:             userRunsCC,}custodianLauncher := custodianLauncherAdapter{launcher:      chaincodeLauncher,streamHandler: chaincodeSupport,}go chaincodeCustodian.Work(buildRegistry, containerRouter, custodianLauncher)ccSupSrv := pb.ChaincodeSupportServer(chaincodeSupport)if tlsEnabled {ccSupSrv = authenticator.Wrap(ccSupSrv)}csccInst := cscc.New(aclProvider,lifecycleValidatorCommitter,lsccInst,lifecycleValidatorCommitter,policyChecker,peerInstance,factory.GetDefault(),)qsccInst := scc.SelfDescribingSysCC(qscc.New(aclProvider, peerInstance))if maxConcurrency := coreConfig.LimitsConcurrencyQSCC; maxConcurrency != 0 {qsccInst = scc.Throttle(maxConcurrency, qsccInst)}pb.RegisterChaincodeSupportServer(ccSrv.Server(), ccSupSrv)// start the chaincode specific gRPC listening servicego ccSrv.Start()logger.Debugf("Running peer")libConf, err := library.LoadConfig()if err != nil {return errors.WithMessage(err, "could not decode peer handlers configuration")}reg := library.InitRegistry(libConf)authFilters := reg.Lookup(library.Auth).([]authHandler.Filter)endorserSupport := &endorser.SupportImpl{SignerSerializer: signingIdentity,Peer:             peerInstance,ChaincodeSupport: chaincodeSupport,ACLProvider:      aclProvider,BuiltinSCCs:      builtinSCCs,}endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory)validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory)signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport)channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport)pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName)pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{ChannelStateRetriever:   channelStateRetriever,TransientStoreRetriever: peerInstance,PluginMapper:            pluginMapper,SigningIdentityFetcher:  signingIdentityFetcher,})endorserSupport.PluginEndorser = pluginEndorserchannelFetcher := endorserChannelAdapter{peer: peerInstance,}serverEndorser := &endorser.Endorser{PrivateDataDistributor: gossipService,ChannelFetcher:         channelFetcher,LocalMSP:               localMSP,Support:                endorserSupport,Metrics:                endorser.NewMetrics(metricsProvider),}// deploy system chaincodesfor _, cc := range []scc.SelfDescribingSysCC{lsccInst, csccInst, qsccInst, lifecycleSCC} {if enabled, ok := chaincodeConfig.SCCWhitelist[cc.Name()]; !ok || !enabled {logger.Infof("not deploying chaincode %s as it is not enabled", cc.Name())continue}scc.DeploySysCC(cc, chaincodeSupport)}logger.Infof("Deployed system chaincodes")// register the lifecycleMetadataManager to get updates from the legacy// chaincode; lifecycleMetadataManager will aggregate these updates with// the ones from the new lifecycle and deliver both// this is expected to disappear with FAB-15061legacyMetadataManager.AddListener(metadataManager)// register gossip as a listener for updates from lifecycleMetadataManagermetadataManager.AddListener(lifecycle.HandleMetadataUpdateFunc(func(channel string, chaincodes ccdef.MetadataSet) {gossipService.UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChannelID(channel))}))// this brings up all the channelspeerInstance.Initialize(func(cid string) {// initialize the metadata for this channel.// This call will pre-populate chaincode information for this// channel but it won't fire any updates to its listenerslifecycleCache.InitializeMetadata(cid)// initialize the legacyMetadataManager for this channel.// This call will pre-populate chaincode information from// the legacy lifecycle for this channel; it will also fire// the listener, which will cascade to metadataManager// and eventually to gossip to pre-populate data structures.// this is expected to disappear with FAB-15061sub, err := legacyMetadataManager.NewChannelSubscription(cid, cclifecycle.QueryCreatorFunc(func() (cclifecycle.Query, error) {return peerInstance.GetLedger(cid).NewQueryExecutor()}))if err != nil {logger.Panicf("Failed subscribing to chaincode lifecycle updates")}// register this channel's legacyMetadataManager (sub) to get ledger updates// this is expected to disappear with FAB-15061cceventmgmt.GetMgr().Register(cid, sub)},plugin.MapBasedMapper(validationPluginsByName),lifecycleValidatorCommitter,lsccInst,lifecycleValidatorCommitter,coreConfig.ValidatorPoolSize,)if coreConfig.DiscoveryEnabled {registerDiscoveryService(coreConfig,peerInstance,peerServer,policyMgr,lifecycle.NewMetadataProvider(lifecycleCache,legacyMetadataManager,peerInstance,),gossipService,)}logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", coreConfig.PeerID, coreConfig.NetworkID, coreConfig.PeerAddress)// Get configuration before starting go routines to avoid// racing in testsprofileEnabled := coreConfig.ProfileEnabledprofileListenAddress := coreConfig.ProfileListenAddress// Start the grpc server. Done in a goroutine so we can deploy the// genesis block if needed.serve := make(chan error)// Start profiling http endpoint if enabledif profileEnabled {go func() {logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {logger.Errorf("Error starting profiler: %s", profileErr)}}()}//处理事件消息go handleSignals(addPlatformSignals(map[os.Signal]func(){syscall.SIGINT:  func() { serve <- nil },syscall.SIGTERM: func() { serve <- nil },}))logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", coreConfig.PeerID, coreConfig.NetworkID, coreConfig.PeerAddress)// get a list of ledger IDs and load preResetHeight files for these ledger IDsledgerIDs, err := peerInstance.LedgerMgr.GetLedgerIDs()if err != nil {return errors.WithMessage(err, "failed to get ledger IDs")}// check to see if the peer ledgers have been resetrootFSPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "ledgersData")// 检查帐本是否重置preResetHeights, err := kvledger.LoadPreResetHeight(rootFSPath, ledgerIDs)if err != nil {return fmt.Errorf("error loading prereset height: %s", err)}for cid, height := range preResetHeights {logger.Infof("Ledger rebuild: channel [%s]: preresetHeight: [%d]", cid, height)}if len(preResetHeights) > 0 {logger.Info("Ledger rebuild: Entering loop to check if current ledger heights surpass prereset ledger heights. Endorsement request processing will be disabled.")resetFilter := &reset{reject: true,}authFilters = append(authFilters, resetFilter)go resetLoop(resetFilter, preResetHeights, ledgerIDs, peerInstance.GetLedger, 10*time.Second)}// 下面启动节点服务// start the peer serverauth := authHandler.ChainFilters(serverEndorser, authFilters...)// Register the Endorser serverpb.RegisterEndorserServer(peerServer.Server(), auth)go func() {var grpcErr errorif grpcErr = peerServer.Start(); grpcErr != nil {grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)}serve <- grpcErr}()// Block until grpc server exitsreturn <-serve
}

以上代码太长了,看着有点晕,最好是结合业务流程对着代码看,更清晰和容易理解。主要流程如下

1. 获取MSP类型
2. 读取全局配置信息
3. 创建grpc服务:PeerServer
4. 创建 GossipService 并注册到PeerServer上
5. 设置资源访问策略 aclmgmt.NewACLProvider
6. 创建 chaincode 服务
7. 创建 Endorser 背书服务
8. 部署系统 chaincode
9. 初始化通道
10.启动gRPC服务
注:如果启用了profile,还会启动监听服务

4. 总结

Peer的代码是整个Fabric中很重要的一个部分,可以说是基础,他提供了一系列相关的操作命令和通信的细节,这都需要后面不断的详细分析。

这篇关于Hyperledger Fabric(3) - 源码分析之Peer启动流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967486

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

用命令行的方式启动.netcore webapi

用命令行的方式启动.netcore web项目 进入指定的项目文件夹,比如我发布后的代码放在下面文件夹中 在此地址栏中输入“cmd”,打开命令提示符,进入到发布代码目录 命令行启动.netcore项目的命令为:  dotnet 项目启动文件.dll --urls="http://*:对外端口" --ip="本机ip" --port=项目内部端口 例: dotnet Imagine.M