本文主要是介绍Hyperledger Fabric(3) - 源码分析之Peer启动流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1. 背景
一个区块链项目的启动通常是通过命令行拉起peer,本文主要从代码角度分析Peer的启动流程。
2. /fabric/cmd
目录
从命名上看,此目录大概率存放的是项目的命令行代码文件夹
✘ wujinquan@wujinquandeMacBook-Pro ~/workspace/gospace/src/github.com/hyperledger/fabric/cmd tree -L 1
.
├── common //公共代码
├── configtxgen //创世块等初始文件和通道初始文件,对应容器、共识等
├── configtxlator //二进制文件和JSON的转换,对应着帐本、交易等
├── cryptogen //加密相关服务
├── discover //发现节点服务
├── idemixgen
├── orderer //排序服务,对应共识、区块等
└── peer //对应着帐本数据、链码、容器、MSP、接口等。8 directories, 0 fileswujinquan@wujinquandeMacBook-Pro ~/workspace/gospace/src/github.com/hyperledger/fabric/cmd
3. Peer启动流程
3.1 入口函数 /fabric/cmd/peer/main.go
// The main command describes the service and
// defaults to printing the help message.
var mainCmd = &cobra.Command{Use: "peer"}func main() {//上篇文章分析过,加载环境变量// For environment variables.viper.SetEnvPrefix(common.CmdRoot)viper.AutomaticEnv()replacer := strings.NewReplacer(".", "_")viper.SetEnvKeyReplacer(replacer)//处理命令格式,包括所有的peer相关的命令// Define command-line flags that are valid for all peer commands and// subcommands.mainFlags := mainCmd.PersistentFlags()mainFlags.String("logging-level", "", "Legacy logging level flag")viper.BindPFlag("logging_level", mainFlags.Lookup("logging-level"))mainFlags.MarkHidden("logging-level")cryptoProvider := factory.GetDefault()//Add 相关的命令,如版本、node、chaincode、通道等mainCmd.AddCommand(version.Cmd())mainCmd.AddCommand(node.Cmd())mainCmd.AddCommand(chaincode.Cmd(nil, cryptoProvider))mainCmd.AddCommand(channel.Cmd(nil))mainCmd.AddCommand(lifecycle.Cmd(cryptoProvider))//真正执行的地方,并处理异常,主要的任务由cobra完成,此处仅在错误时返回一个非零值// On failure Cobra prints the usage message and error string, so we only// need to exit with a non-0 statusif mainCmd.Execute() != nil {os.Exit(1)}
}
代码简单得出奇,Golang项目都有相同的特征,如ETH的入口函数也是极简洁。然而代码少,估计背后的服务啥的启动的少不了。以node.Cmd
为例,分析背后具体做了什么
func startCmd() *cobra.Command {// Set the flags on the node start command.flags := nodeStartCmd.Flags()flags.BoolVarP(&chaincodeDevMode, "peer-chaincodedev", "", false, "start peer in chaincode development mode")return nodeStartCmd
}
//返回的是下面的结构体实例
var nodeStartCmd = &cobra.Command{Use: "start",Short: "Starts the node.",Long: `Starts a node that interacts with the network.`,RunE: func(cmd *cobra.Command, args []string) error {if len(args) != 0 {return fmt.Errorf("trailing args detected")}// Parsing of the command line is done so silence cmd usagecmd.SilenceUsage = truereturn serve(args)},
}
以上是加载命令的过程,命令真正的执行过程如下
// Execute uses the args (os.Args[1:] by default)
// and run through the command tree finding appropriate matches
// for commands and then corresponding flags.
func (c *Command) Execute() error {_, err := c.ExecuteC()return err
}// ExecuteC executes the command.
func (c *Command) ExecuteC() (cmd *Command, err error) {// Regardless of what command execute is called on, run on Root only// 仅运行根命令if c.HasParent() {return c.Root().ExecuteC()}// windows hookif preExecHookFn != nil {preExecHookFn(c)}// initialize help as the last point possible to allow for user// overriding// 默认初始化helpc.InitDefaultHelpCmd()args := c.args// Workaround FAIL with "go test -v" or "cobra.test -test.v", see #155if c.args == nil && filepath.Base(os.Args[0]) != "cobra.test" {args = os.Args[1:]}var flags []stringif c.TraverseChildren {cmd, flags, err = c.Traverse(args)} else {cmd, flags, err = c.Find(args)}if err != nil {// If found parse to a subcommand and then failed, talk about the subcommandif cmd != nil {c = cmd}if !c.SilenceErrors {c.Println("Error:", err.Error())c.Printf("Run '%v --help' for usage.\n", c.CommandPath())}return c, err}cmd.commandCalledAs.called = trueif cmd.commandCalledAs.name == "" {cmd.commandCalledAs.name = cmd.Name()}//关键在这里err = cmd.execute(flags)
...return cmd, err
}func (c *Command) execute(a []string) (err error) {
...// initialize help and version flag at the last point possible to allow for user// overriding//初始化默认的help和版本标记c.InitDefaultHelpFlag()c.InitDefaultVersionFlag()//解析输入的标记err = c.ParseFlags(a)if err != nil {return c.FlagErrorFunc()(c, err)}// If help is called, regardless of other flags, return we want help.// Also say we need help if the command isn't runnable.helpVal, err := c.Flags().GetBool("help")if err != nil {// should be impossible to get here as we always declare a help// flag in InitDefaultHelpFlag()c.Println("\"help\" flag declared as non-bool. Please correct your code")return err}if helpVal {return flag.ErrHelp}// for back-compat, only add version flag behavior if version is definedif c.Version != "" {versionVal, err := c.Flags().GetBool("version")if err != nil {c.Println("\"version\" flag declared as non-bool. Please correct your code")return err}if versionVal {err := tmpl(c.OutOrStdout(), c.VersionTemplate(), c)if err != nil {c.Println(err)}return err}}if !c.Runnable() {return flag.ErrHelp}c.preRun()//解析参数argWoFlags := c.Flags().Args()if c.DisableFlagParsing {argWoFlags = a}if err := c.ValidateArgs(argWoFlags); err != nil {return err}//各种执行,包括父子命令和不同的命令格式情况下调用不同的命令形式运行for p := c; p != nil; p = p.Parent() {if p.PersistentPreRunE != nil {if err := p.PersistentPreRunE(c, argWoFlags); err != nil {return err}break} else if p.PersistentPreRun != nil {p.PersistentPreRun(c, argWoFlags)break}}if c.PreRunE != nil {if err := c.PreRunE(c, argWoFlags); err != nil {return err}} else if c.PreRun != nil {c.PreRun(c, argWoFlags)}if err := c.validateRequiredFlags(); err != nil {return err}if c.RunE != nil {if err := c.RunE(c, argWoFlags); err != nil {return err}} else {c.Run(c, argWoFlags)}if c.PostRunE != nil {if err := c.PostRunE(c, argWoFlags); err != nil {return err}} else if c.PostRun != nil {c.PostRun(c, argWoFlags)}for p := c; p != nil; p = p.Parent() {if p.PersistentPostRunE != nil {if err := p.PersistentPostRunE(c, argWoFlags); err != nil {return err}break} else if p.PersistentPostRun != nil {p.PersistentPostRun(c, argWoFlags)break}}return nil
}
上段代码中很长的if…else
就是一开始命令中的设置。
一个命令执行,如peer node start xxx
,就可以从这里解析
3.2 服务启动
分析了命令的运行流程之后,继续看node.startCmd()
最后调用serve(args)
之后做了什么。
func serve(args []string) error {// currently the peer only works with the standard MSP// because in certain scenarios the MSP has to make sure// that from a single credential you only have a single 'identity'.// Idemix does not support this *YET* but it can be easily// fixed to support it. For now, we just make sure that// the peer only comes up with the standard MSP//MSP处理mspType := mgmt.GetLocalMSP(factory.GetDefault()).GetType()if mspType != msp.FABRIC {panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))}// Trace RPCs with the golang.org/x/net/trace package. This was moved out of// the deliver service connection factory as it has process wide implications// and was racy with respect to initialization of gRPC clients and servers.grpc.EnableTracing = truelogger.Infof("Starting %s", version.GetInfo())//obtain coreConfigurationcoreConfig, err := peer.GlobalConfig()if err != nil {return err}platformRegistry := platforms.NewRegistry(platforms.SupportedPlatforms...)//获取通道管理者identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {return mgmt.GetManagerForChain(chainID)}//处理相关环境操作保存相关地址、端口、证书及TLS等。opsSystem := newOperationsSystem(coreConfig)err = opsSystem.Start()if err != nil {return errors.WithMessage(err, "failed to initialize operations subsystems")}defer opsSystem.Stop()//监控实例,监控相关节点信息并记录metricsProvider := opsSystem.ProviderlogObserver := floggingmetrics.NewObserver(metricsProvider)flogging.SetObserver(logObserver)//保存连接成员信息,方便应用membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)//读取配置中MSPIDmspID := coreConfig.LocalMSPID//chaincode 安装路径chaincodeInstallPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "lifecycle", "chaincodes")ccStore := persistence.NewStore(chaincodeInstallPath)ccPackageParser := &persistence.ChaincodePackageParser{MetadataProvider: ccprovider.PersistenceAdapter(ccprovider.MetadataAsTarEntries),}//节点hostpeerHost, _, err := net.SplitHostPort(coreConfig.PeerAddress)if err != nil {return fmt.Errorf("peer address is not in the format of host:port: %v", err)}//监听配置中的地址listenAddr := coreConfig.ListenAddressserverConfig, err := peer.GetServerConfig()if err != nil {logger.Fatalf("Error loading secure config for peer (%s)", err)}//设置Server配置serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")serverConfig.ServerStatsHandler = comm.NewServerStatsHandler(metricsProvider)serverConfig.UnaryInterceptors = append(serverConfig.UnaryInterceptors,grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),)serverConfig.StreamInterceptors = append(serverConfig.StreamInterceptors,grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),)cs := comm.NewCredentialSupport()if serverConfig.SecOpts.UseTLS {logger.Info("Starting peer with TLS enabled")cs = comm.NewCredentialSupport(serverConfig.SecOpts.ServerRootCAs...)// set the cert to use if client auth is requested by remote endpointsclientCert, err := peer.GetClientCertificate()if err != nil {logger.Fatalf("Failed to set TLS client certificate (%s)", err)}cs.SetClientCertificate(clientCert)}//创建节点RPC服务peerServer, err := comm.NewGRPCServer(listenAddr, serverConfig)if err != nil {logger.Fatalf("Failed to create peer server (%s)", err)}transientStoreProvider, err := transientstore.NewStoreProvider(filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "transientstore"),)if err != nil {return errors.WithMessage(err, "failed to open transient store")}deliverServiceConfig := deliverservice.GlobalConfig()peerInstance := &peer.Peer{Server: peerServer,ServerConfig: serverConfig,CredentialSupport: cs,StoreProvider: transientStoreProvider,CryptoProvider: factory.GetDefault(),OrdererEndpointOverrides: deliverServiceConfig.OrdererEndpointOverrides,}获得本地签名的身份信息,包括节点的功能,如背书和验证等localMSP := mgmt.GetLocalMSP(factory.GetDefault())signingIdentity, err := localMSP.GetDefaultSigningIdentity()if err != nil {logger.Panicf("Could not get the default signing identity from the local MSP: [%+v]", err)}signingIdentityBytes, err := signingIdentity.Serialize()if err != nil {logger.Panicf("Failed to serialize the signing identity: %v", err)}expirationLogger := flogging.MustGetLogger("certmonitor")crypto.TrackExpiration(serverConfig.SecOpts.UseTLS,serverConfig.SecOpts.Certificate,cs.GetClientCertificate().Certificate,signingIdentityBytes,expirationLogger.Warnf, // This can be used to piggyback a metric event in the futuretime.Now(),time.AfterFunc)policyMgr := policies.PolicyManagerGetterFunc(peerInstance.GetPolicyManager)deliverGRPCClient, err := comm.NewGRPCClient(comm.ClientConfig{Timeout: deliverServiceConfig.ConnectionTimeout,KaOpts: deliverServiceConfig.KeepaliveOptions,SecOpts: deliverServiceConfig.SecOpts,})if err != nil {logger.Panicf("Could not create the deliver grpc client: [%+v]", err)}// FIXME: Creating the gossip service has the side effect of starting a bunch// of go routines and registration with the grpc server.// 初始化gossip 服务gossipService, err := initGossipService(policyMgr,metricsProvider,peerServer,signingIdentity,cs,coreConfig.PeerAddress,deliverGRPCClient,deliverServiceConfig,)if err != nil {return errors.WithMessage(err, "failed to initialize gossip service")}defer gossipService.Stop()peerInstance.GossipService = gossipServicepolicyChecker := policy.NewPolicyChecker(policies.PolicyManagerGetterFunc(peerInstance.GetPolicyManager),mgmt.GetLocalMSP(factory.GetDefault()),mgmt.NewLocalMSPPrincipalGetter(factory.GetDefault()),)//startup aclmgmt with default ACL providers (resource based and default 1.0 policies based).//Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this)aclProvider := aclmgmt.NewACLProvider(aclmgmt.ResourceGetter(peerInstance.GetStableChannelConfig),policyChecker,)// TODO, unfortunately, the lifecycle initialization is very unclean at the// moment. This is because ccprovider.SetChaincodePath only works after// ledgermgmt.Initialize, but ledgermgmt.Initialize requires a reference to// lifecycle. Finally, lscc requires a reference to the system chaincode// provider in order to be created, which requires chaincode support to be// up, which also requires, you guessed it, lifecycle. Once we remove the// v1.0 lifecycle, we should be good to collapse all of the init of lifecycle// to this point.lifecycleResources := &lifecycle.Resources{Serializer: &lifecycle.Serializer{},ChannelConfigSource: peerInstance,ChaincodeStore: ccStore,PackageParser: ccPackageParser,}lifecycleValidatorCommitter := &lifecycle.ValidatorCommitter{Resources: lifecycleResources,LegacyDeployedCCInfoProvider: &lscc.DeployedCCInfoProvider{},}ccInfoFSImpl := &ccprovider.CCInfoFSImpl{GetHasher: factory.GetDefault()}// legacyMetadataManager collects metadata information from the legacy// lifecycle (lscc). This is expected to disappear with FAB-15061.legacyMetadataManager, err := cclifecycle.NewMetadataManager(cclifecycle.EnumerateFunc(func() ([]ccdef.InstalledChaincode, error) {return ccInfoFSImpl.ListInstalledChaincodes(ccInfoFSImpl.GetChaincodeInstallPath(), ioutil.ReadDir, ccprovider.LoadPackage)},),)if err != nil {logger.Panicf("Failed creating LegacyMetadataManager: +%v", err)}// metadataManager aggregates metadata information from _lifecycle and// the legacy lifecycle (lscc).metadataManager := lifecycle.NewMetadataManager()// the purpose of these two managers is to feed per-channel chaincode data// into gossip owing to the fact that we are transitioning from lscc to// _lifecycle, we still have two providers of such information until v2.1,// in which we will remove the legacy.//// the flow of information is the following//// gossip <-- metadataManager <-- lifecycleCache (for _lifecycle)// \// - legacyMetadataManager (for lscc)//// FAB-15061 tracks the work necessary to remove LSCC, at which point we// will be able to simplify the flow to simply be//// gossip <-- lifecycleCachechaincodeCustodian := lifecycle.NewChaincodeCustodian()externalBuilderOutput := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "externalbuilder", "builds")if err := os.MkdirAll(externalBuilderOutput, 0700); err != nil {logger.Panicf("could not create externalbuilder build output dir: %s", err)}ebMetadataProvider := &externalbuilder.MetadataProvider{DurablePath: externalBuilderOutput,}lifecycleCache := lifecycle.NewCache(lifecycleResources, mspID, metadataManager, chaincodeCustodian, ebMetadataProvider)txProcessors := map[common.HeaderType]ledger.CustomTxProcessor{common.HeaderType_CONFIG: &peer.ConfigTxProcessor{},}peerInstance.LedgerMgr = ledgermgmt.NewLedgerMgr(&ledgermgmt.Initializer{CustomTxProcessors: txProcessors,DeployedChaincodeInfoProvider: lifecycleValidatorCommitter,MembershipInfoProvider: membershipInfoProvider,ChaincodeLifecycleEventProvider: lifecycleCache,MetricsProvider: metricsProvider,HealthCheckRegistry: opsSystem,StateListeners: []ledger.StateListener{lifecycleCache},Config: ledgerConfig(),Hasher: factory.GetDefault(),EbMetadataProvider: ebMetadataProvider,},)// Configure CC package storagelsccInstallPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "chaincodes")ccprovider.SetChaincodesPath(lsccInstallPath)if err := lifecycleCache.InitializeLocalChaincodes(); err != nil {return errors.WithMessage(err, "could not initialize local chaincodes")}// Parameter overrides must be processed before any parameters are// cached. Failures to cache cause the server to terminate immediately.if chaincodeDevMode {logger.Info("Running in chaincode development mode")logger.Info("Disable loading validity system chaincode")viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)}mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCertpolicyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {return func(env *cb.Envelope, channelID string) error {return aclProvider.CheckACL(resourceName, channelID, env)}}metrics := deliver.NewMetrics(metricsProvider)abServer := &peer.DeliverServer{DeliverHandler: deliver.NewHandler(&peer.DeliverChainManager{Peer: peerInstance},coreConfig.AuthenticationTimeWindow,mutualTLS,metrics,false,),PolicyCheckerProvider: policyCheckerProvider,}pb.RegisterDeliverServer(peerServer.Server(), abServer)// Create a self-signed CA for chaincode serviceca, err := tlsgen.NewCA()if err != nil {logger.Panic("Failed creating authentication layer:", err)}ccSrv, ccEndpoint, err := createChaincodeServer(coreConfig, ca, peerHost)if err != nil {logger.Panicf("Failed to create chaincode server: %s", err)}//get user modeuserRunsCC := chaincode.IsDevMode()tlsEnabled := coreConfig.PeerTLSEnabled// create chaincode specific tls CAauthenticator := accesscontrol.NewAuthenticator(ca)chaincodeHandlerRegistry := chaincode.NewHandlerRegistry(userRunsCC)lifecycleTxQueryExecutorGetter := &chaincode.TxQueryExecutorGetter{CCID: scc.ChaincodeID(lifecycle.LifecycleNamespace),HandlerRegistry: chaincodeHandlerRegistry,}if coreConfig.VMEndpoint == "" && len(coreConfig.ExternalBuilders) == 0 {logger.Panic("VMEndpoint not set and no ExternalBuilders defined")}chaincodeConfig := chaincode.GlobalConfig()var client *docker.Clientvar dockerVM *dockercontroller.DockerVMif coreConfig.VMEndpoint != "" {client, err = createDockerClient(coreConfig)if err != nil {logger.Panicf("cannot create docker client: %s", err)}dockerVM = &dockercontroller.DockerVM{PeerID: coreConfig.PeerID,NetworkID: coreConfig.NetworkID,BuildMetrics: dockercontroller.NewBuildMetrics(opsSystem.Provider),Client: client,AttachStdOut: coreConfig.VMDockerAttachStdout,HostConfig: getDockerHostConfig(),ChaincodePull: coreConfig.ChaincodePull,NetworkMode: coreConfig.VMNetworkMode,PlatformBuilder: &platforms.Builder{Registry: platformRegistry,Client: client,},// This field is superfluous for chaincodes built with v2.0+ binaries// however, to prevent users from being forced to rebuild leaving for now// but it should be removed in the future.LoggingEnv: []string{"CORE_CHAINCODE_LOGGING_LEVEL=" + chaincodeConfig.LogLevel,"CORE_CHAINCODE_LOGGING_SHIM=" + chaincodeConfig.ShimLogLevel,"CORE_CHAINCODE_LOGGING_FORMAT=" + chaincodeConfig.LogFormat,},MSPID: mspID,}if err := opsSystem.RegisterChecker("docker", dockerVM); err != nil {logger.Panicf("failed to register docker health check: %s", err)}}externalVM := &externalbuilder.Detector{Builders: externalbuilder.CreateBuilders(coreConfig.ExternalBuilders, mspID),DurablePath: externalBuilderOutput,}buildRegistry := &container.BuildRegistry{}containerRouter := &container.Router{DockerBuilder: dockerVM,ExternalBuilder: externalVMAdapter{externalVM},PackageProvider: &persistence.FallbackPackageLocator{ChaincodePackageLocator: &persistence.ChaincodePackageLocator{ChaincodeDir: chaincodeInstallPath,},LegacyCCPackageLocator: &ccprovider.CCInfoFSImpl{GetHasher: factory.GetDefault()},},}builtinSCCs := map[string]struct{}{"lscc": {},"qscc": {},"cscc": {},"_lifecycle": {},}lsccInst := &lscc.SCC{BuiltinSCCs: builtinSCCs,Support: &lscc.SupportImpl{GetMSPIDs: peerInstance.GetMSPIDs,},SCCProvider: &lscc.PeerShim{Peer: peerInstance},ACLProvider: aclProvider,GetMSPIDs: peerInstance.GetMSPIDs,PolicyChecker: policyChecker,BCCSP: factory.GetDefault(),BuildRegistry: buildRegistry,ChaincodeBuilder: containerRouter,EbMetadataProvider: ebMetadataProvider,}chaincodeEndorsementInfo := &lifecycle.ChaincodeEndorsementInfoSource{LegacyImpl: lsccInst,Resources: lifecycleResources,Cache: lifecycleCache,BuiltinSCCs: builtinSCCs,}containerRuntime := &chaincode.ContainerRuntime{BuildRegistry: buildRegistry,ContainerRouter: containerRouter,}lifecycleFunctions := &lifecycle.ExternalFunctions{Resources: lifecycleResources,InstallListener: lifecycleCache,InstalledChaincodesLister: lifecycleCache,ChaincodeBuilder: containerRouter,BuildRegistry: buildRegistry,}lifecycleSCC := &lifecycle.SCC{Dispatcher: &dispatcher.Dispatcher{Protobuf: &dispatcher.ProtobufImpl{},},DeployedCCInfoProvider: lifecycleValidatorCommitter,QueryExecutorProvider: lifecycleTxQueryExecutorGetter,Functions: lifecycleFunctions,OrgMSPID: mspID,ChannelConfigSource: peerInstance,ACLProvider: aclProvider,}chaincodeLauncher := &chaincode.RuntimeLauncher{Metrics: chaincode.NewLaunchMetrics(opsSystem.Provider),Registry: chaincodeHandlerRegistry,Runtime: containerRuntime,StartupTimeout: chaincodeConfig.StartupTimeout,CertGenerator: authenticator,CACert: ca.CertBytes(),PeerAddress: ccEndpoint,ConnectionHandler: &extcc.ExternalChaincodeRuntime{},}// Keep TestQueries workingif !chaincodeConfig.TLSEnabled {chaincodeLauncher.CertGenerator = nil}chaincodeSupport := &chaincode.ChaincodeSupport{ACLProvider: aclProvider,AppConfig: peerInstance,DeployedCCInfoProvider: lifecycleValidatorCommitter,ExecuteTimeout: chaincodeConfig.ExecuteTimeout,InstallTimeout: chaincodeConfig.InstallTimeout,HandlerRegistry: chaincodeHandlerRegistry,HandlerMetrics: chaincode.NewHandlerMetrics(opsSystem.Provider),Keepalive: chaincodeConfig.Keepalive,Launcher: chaincodeLauncher,Lifecycle: chaincodeEndorsementInfo,Peer: peerInstance,Runtime: containerRuntime,BuiltinSCCs: builtinSCCs,TotalQueryLimit: chaincodeConfig.TotalQueryLimit,UserRunsCC: userRunsCC,}custodianLauncher := custodianLauncherAdapter{launcher: chaincodeLauncher,streamHandler: chaincodeSupport,}go chaincodeCustodian.Work(buildRegistry, containerRouter, custodianLauncher)ccSupSrv := pb.ChaincodeSupportServer(chaincodeSupport)if tlsEnabled {ccSupSrv = authenticator.Wrap(ccSupSrv)}csccInst := cscc.New(aclProvider,lifecycleValidatorCommitter,lsccInst,lifecycleValidatorCommitter,policyChecker,peerInstance,factory.GetDefault(),)qsccInst := scc.SelfDescribingSysCC(qscc.New(aclProvider, peerInstance))if maxConcurrency := coreConfig.LimitsConcurrencyQSCC; maxConcurrency != 0 {qsccInst = scc.Throttle(maxConcurrency, qsccInst)}pb.RegisterChaincodeSupportServer(ccSrv.Server(), ccSupSrv)// start the chaincode specific gRPC listening servicego ccSrv.Start()logger.Debugf("Running peer")libConf, err := library.LoadConfig()if err != nil {return errors.WithMessage(err, "could not decode peer handlers configuration")}reg := library.InitRegistry(libConf)authFilters := reg.Lookup(library.Auth).([]authHandler.Filter)endorserSupport := &endorser.SupportImpl{SignerSerializer: signingIdentity,Peer: peerInstance,ChaincodeSupport: chaincodeSupport,ACLProvider: aclProvider,BuiltinSCCs: builtinSCCs,}endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory)validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory)signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport)channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport)pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName)pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{ChannelStateRetriever: channelStateRetriever,TransientStoreRetriever: peerInstance,PluginMapper: pluginMapper,SigningIdentityFetcher: signingIdentityFetcher,})endorserSupport.PluginEndorser = pluginEndorserchannelFetcher := endorserChannelAdapter{peer: peerInstance,}serverEndorser := &endorser.Endorser{PrivateDataDistributor: gossipService,ChannelFetcher: channelFetcher,LocalMSP: localMSP,Support: endorserSupport,Metrics: endorser.NewMetrics(metricsProvider),}// deploy system chaincodesfor _, cc := range []scc.SelfDescribingSysCC{lsccInst, csccInst, qsccInst, lifecycleSCC} {if enabled, ok := chaincodeConfig.SCCWhitelist[cc.Name()]; !ok || !enabled {logger.Infof("not deploying chaincode %s as it is not enabled", cc.Name())continue}scc.DeploySysCC(cc, chaincodeSupport)}logger.Infof("Deployed system chaincodes")// register the lifecycleMetadataManager to get updates from the legacy// chaincode; lifecycleMetadataManager will aggregate these updates with// the ones from the new lifecycle and deliver both// this is expected to disappear with FAB-15061legacyMetadataManager.AddListener(metadataManager)// register gossip as a listener for updates from lifecycleMetadataManagermetadataManager.AddListener(lifecycle.HandleMetadataUpdateFunc(func(channel string, chaincodes ccdef.MetadataSet) {gossipService.UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChannelID(channel))}))// this brings up all the channelspeerInstance.Initialize(func(cid string) {// initialize the metadata for this channel.// This call will pre-populate chaincode information for this// channel but it won't fire any updates to its listenerslifecycleCache.InitializeMetadata(cid)// initialize the legacyMetadataManager for this channel.// This call will pre-populate chaincode information from// the legacy lifecycle for this channel; it will also fire// the listener, which will cascade to metadataManager// and eventually to gossip to pre-populate data structures.// this is expected to disappear with FAB-15061sub, err := legacyMetadataManager.NewChannelSubscription(cid, cclifecycle.QueryCreatorFunc(func() (cclifecycle.Query, error) {return peerInstance.GetLedger(cid).NewQueryExecutor()}))if err != nil {logger.Panicf("Failed subscribing to chaincode lifecycle updates")}// register this channel's legacyMetadataManager (sub) to get ledger updates// this is expected to disappear with FAB-15061cceventmgmt.GetMgr().Register(cid, sub)},plugin.MapBasedMapper(validationPluginsByName),lifecycleValidatorCommitter,lsccInst,lifecycleValidatorCommitter,coreConfig.ValidatorPoolSize,)if coreConfig.DiscoveryEnabled {registerDiscoveryService(coreConfig,peerInstance,peerServer,policyMgr,lifecycle.NewMetadataProvider(lifecycleCache,legacyMetadataManager,peerInstance,),gossipService,)}logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", coreConfig.PeerID, coreConfig.NetworkID, coreConfig.PeerAddress)// Get configuration before starting go routines to avoid// racing in testsprofileEnabled := coreConfig.ProfileEnabledprofileListenAddress := coreConfig.ProfileListenAddress// Start the grpc server. Done in a goroutine so we can deploy the// genesis block if needed.serve := make(chan error)// Start profiling http endpoint if enabledif profileEnabled {go func() {logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {logger.Errorf("Error starting profiler: %s", profileErr)}}()}//处理事件消息go handleSignals(addPlatformSignals(map[os.Signal]func(){syscall.SIGINT: func() { serve <- nil },syscall.SIGTERM: func() { serve <- nil },}))logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", coreConfig.PeerID, coreConfig.NetworkID, coreConfig.PeerAddress)// get a list of ledger IDs and load preResetHeight files for these ledger IDsledgerIDs, err := peerInstance.LedgerMgr.GetLedgerIDs()if err != nil {return errors.WithMessage(err, "failed to get ledger IDs")}// check to see if the peer ledgers have been resetrootFSPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "ledgersData")// 检查帐本是否重置preResetHeights, err := kvledger.LoadPreResetHeight(rootFSPath, ledgerIDs)if err != nil {return fmt.Errorf("error loading prereset height: %s", err)}for cid, height := range preResetHeights {logger.Infof("Ledger rebuild: channel [%s]: preresetHeight: [%d]", cid, height)}if len(preResetHeights) > 0 {logger.Info("Ledger rebuild: Entering loop to check if current ledger heights surpass prereset ledger heights. Endorsement request processing will be disabled.")resetFilter := &reset{reject: true,}authFilters = append(authFilters, resetFilter)go resetLoop(resetFilter, preResetHeights, ledgerIDs, peerInstance.GetLedger, 10*time.Second)}// 下面启动节点服务// start the peer serverauth := authHandler.ChainFilters(serverEndorser, authFilters...)// Register the Endorser serverpb.RegisterEndorserServer(peerServer.Server(), auth)go func() {var grpcErr errorif grpcErr = peerServer.Start(); grpcErr != nil {grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)}serve <- grpcErr}()// Block until grpc server exitsreturn <-serve
}
以上代码太长了,看着有点晕,最好是结合业务流程对着代码看,更清晰和容易理解。主要流程如下
1. 获取MSP类型
2. 读取全局配置信息
3. 创建grpc服务:PeerServer
4. 创建 GossipService 并注册到PeerServer上
5. 设置资源访问策略 aclmgmt.NewACLProvider
6. 创建 chaincode 服务
7. 创建 Endorser 背书服务
8. 部署系统 chaincode
9. 初始化通道
10.启动gRPC服务
注:如果启用了profile,还会启动监听服务
4. 总结
Peer的代码是整个Fabric中很重要的一个部分,可以说是基础,他提供了一系列相关的操作命令和通信的细节,这都需要后面不断的详细分析。
这篇关于Hyperledger Fabric(3) - 源码分析之Peer启动流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!