以太坊源码之RPC服务

2024-05-11 06:58
文章标签 源码 服务 rpc 以太

本文主要是介绍以太坊源码之RPC服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

搭建过以太坊私链的同学或许都通过console或者Postman第三方工具去查询以太坊主链的相关信息,如通过区块高度查询区块信息。在以太坊的底层,这些都是通过RPC的调用去实现的,今天就来看看以太坊底层RPC启动方面的源码,了解一下RPC的启动过程。
RPC在以太坊源码中的启动步骤如下:

----- geth
-------- startNode
----------- utils.StartNode
-------------- Node.Start()
----------------- Node.startRPC
上述是RPC在以太坊启动过程中的调用路径,下面先来介绍一下与RPC相关的几个重要的数据结构:

type API struct {Namespace string      Version   string   Service   interface{} Public    bool      
}
Namespace:命令空间,对模块进行命名,eg:eth,net,web3
Version   :模块的版本信息
Service   :执行模块的服务函数
Public    :模式是否运行外部调用type Server struct {services serviceRegistryrun      int32codecsMu sync.Mutexcodecs   *set.Set
}run:该服务是否运行的状态codecsMu :sync的互斥锁codecs   :services :是一个map,key是Namespace,value是一个service实例。定义:type serviceRegistry map[string]*service下面看看server的定义:type service struct {name          string        typ           reflect.Type callbacks     callbacks     subscriptions subscriptions 
}
name :模块名称
typ:接收的类型
callbacks     : 是一个map,key是Namespace,value是一个callback 实例。定义: type callbacks map[string]*callback 
subscriptions : 是一个map,key是Namespace,value是一个callback 实例。定义: type subscriptions map[string]*callback下面看看callback的定义:
type callback struct {rcvr        reflect.Value  // receiver of methodmethod      reflect.Method // callbackargTypes    []reflect.Type // input argument typeshasCtx      bool           // method's first argument is a context (not included in argTypes)errPos      int            // err return idx, of -1 when method cannot return errorisSubscribe bool           // indication if the callback is a subscription
}
rcvr:方法的接收者,这是一个反射值类型,其实就是指向了之前的NewPublicEthereumAPI
method:对应rcvr中的函数
argTypes:函数参数的类型列表
hasCtx:标识函数的第一个参数是否是context.Context类型
errPos:错误代码数值
isSubscribe:是否是subscription类型

接下来我们一起看看RPC启动的源码:

源码路径:go-ethereum\node\node.go
func (n *Node) startRPC(services map[reflect.Type]Service) error {//创建apisapis := n.apis()for _, service := range services {apis = append(apis, service.APIs()...)}//基于apis启动服务if err := n.startInProc(apis); err != nil {return err}if err := n.startIPC(apis); err != nil {n.stopInProc()return err}if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil {n.stopIPC()n.stopInProc()return err}if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {n.stopHTTP()n.stopIPC()n.stopInProc()return err}// All API endpoints started successfullyn.rpcAPIs = apisreturn nil
}

startInProc:内部调用,没有启动相关Server服务
startIPC:启动IPC服务
startHTTP:启动http服务
startWS:启动WebSocket服务
如上述服务有一项启动失败,整体RPC服务启动失败,报错并退出!下面重点介绍一下httpRPC的启动过程,源码如下:

func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string) error {// http的IP和端口,默认localhost:8545if endpoint == "" {return nil}listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts)if err != nil {return err}n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ","))// 所有服务启动成功&赋值n.httpEndpoint = endpointn.httpListener = listener    //HTTP RPC listener socket to server API requestsn.httpHandler = handler   //HTTP RPC request handler to process the API requests  //httpHandler   *rpc.Server,上述介绍的Server数据结构return nil
}

下面来具体看看API的注册过程,源码如下:

func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) {// 在本地创建一个白名单whitelist := make(map[string]bool)for _, module := range modules {whitelist[module] = true}// 注册API服务handler := NewServer()for _, api := range apis {if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {if err := handler.RegisterName(api.Namespace, api.Service); err != nil {return nil, nil, err}log.Debug("HTTP registered", "namespace", api.Namespace)}}// 所有服务注册完成,启动tcp监听var (listener net.Listenererr      error)if listener, err = net.Listen("tcp", endpoint); err != nil {return nil, nil, err}go NewHTTPServer(cors, vhosts, handler).Serve(listener)return listener, handler, err
}

接下来分析一下单个API服务的注册过程:

func (s *Server) RegisterName(name string, rcvr interface{}) error {if s.services == nil {s.services = make(serviceRegistry)}//先创建一个service实例,填充它的callbacks和subscriptions字段svc := new(service)svc.typ = reflect.TypeOf(rcvr)rcvrVal := reflect.ValueOf(rcvr)if name == "" {return fmt.Errorf("no service name for type %s", svc.typ.String())}if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())}//suitableCallbacks()函数会检查API定义是否符合标准methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)// API属于同一个Namespace,进行合并if regsvc, present := s.services[name]; present {if len(methods) == 0 && len(subscriptions) == 0 {return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)}for _, m := range methods {regsvc.callbacks[formatName(m.method.Name)] = m}for _, s := range subscriptions {regsvc.subscriptions[formatName(s.method.Name)] = s}return nil}//创建callback实例放入mapsvc.name = namesvc.callbacks, svc.subscriptions = methods, subscriptionsif len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)}s.services[svc.name] = svcreturn nil
}

API注册完成,HTTP将进行TCP监听,创建了一个http.Server实例,并启动一个goroutine调用它的Serve()方法:

func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server {// Wrap the CORS-handler within a host-handlerhandler := newCorsHandler(srv, cors)handler = newVHostHandler(vhosts, handler)return &http.Server{Handler: handler}
}这里有一个Handler参数,用到了装饰者模式,其实最终实现还是在rpc.Server中。Handler是一个接口,需要实现它的ServerHTTP()函数来处理网络数据,newCorsHandler代码如下:
func newCorsHandler(srv *Server, allowedOrigins []string) http.Handler {// disable CORS support if user has not specified a custom CORS configurationif len(allowedOrigins) == 0 {return srv}c := cors.New(cors.Options{AllowedOrigins: allowedOrigins,AllowedMethods: []string{http.MethodPost, http.MethodGet},MaxAge:         600,AllowedHeaders: []string{"*"},})return c.Handler(srv)
}Handle接口的实现:
func (c *Cors) Handler(h http.Handler) http.Handler {return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {if r.Method == "OPTIONS" {c.logf("Handler: Preflight request")c.handlePreflight(w, r)if c.optionPassthrough {h.ServeHTTP(w, r)} else {w.WriteHeader(http.StatusOK)}} else {c.logf("Handler: Actual request")c.handleActualRequest(w, r)h.ServeHTTP(w, r)}})
}进行数据处理的主要是h.ServeHTTP,其源码如下(go-ethereum\rpc\http.go):
func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {// Permit dumb empty requests for remote health-checks (AWS)if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {return}if code, err := validateRequest(r); err != nil {http.Error(w, err.Error(), code)return}// All checks passed, create a codec that reads direct from the request body// untilEOF and writes the response to w and order the server to process a// single request.ctx := context.Background()ctx = context.WithValue(ctx, "remote", r.RemoteAddr)ctx = context.WithValue(ctx, "scheme", r.Proto)ctx = context.WithValue(ctx, "local", r.Host)//创建reader读取原始数据body := io.LimitReader(r.Body, maxRequestContentLength)//创建一个JSON的编解码器codec := NewJSONCodec(&httpReadWriteNopCloser{body, w})defer codec.Close()w.Header().Set("content-type", contentType)//数据处理srv.ServeSingleRequest(ctx, codec, OptionMethodInvocation)
}

接下来重点分析一下数据处理函数srv.ServeSingleRequest(),其源码如下:

func (s *Server) ServeSingleRequest(ctx context.Context, codec ServerCodec, options CodecOption) {s.serveRequest(ctx, codec, true, options)
}func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {var pend sync.WaitGroupdefer func() {if err := recover(); err != nil {const size = 64 << 10buf := make([]byte, size)buf = buf[:runtime.Stack(buf, false)]log.Error(string(buf))}s.codecsMu.Lock()s.codecs.Remove(codec)s.codecsMu.Unlock()}()//	ctx, cancel := context.WithCancel(context.Background())ctx, cancel := context.WithCancel(ctx)defer cancel()// if the codec supports notification include a notifier that callbacks can use// to send notification to clients. It is thight to the codec/connection. If the// connection is closed the notifier will stop and cancels all active subscriptions.if options&OptionSubscriptions == OptionSubscriptions {ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))}s.codecsMu.Lock()if atomic.LoadInt32(&s.run) != 1 { // 服务没有起来直接退出s.codecsMu.Unlock()return &shutdownError{}}s.codecs.Add(codec)s.codecsMu.Unlock()// test if the server is ordered to stopfor atomic.LoadInt32(&s.run) == 1 {reqs, batch, err := s.readRequest(codec)if err != nil {// If a parsing error occurred, send an errorif err.Error() != "EOF" {log.Debug(fmt.Sprintf("read error %v\n", err))codec.Write(codec.CreateErrorResponse(nil, err))}// Error or end of stream, wait for requests and tear downpend.Wait()return nil}// check if server is ordered to shutdown and return an error// telling the client that his request failed.if atomic.LoadInt32(&s.run) != 1 {err = &shutdownError{}if batch {resps := make([]interface{}, len(reqs))for i, r := range reqs {resps[i] = codec.CreateErrorResponse(&r.id, err)}codec.Write(resps)} else {codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))}return nil}// If a single shot request is executing, run and return immediatelyif singleShot {if batch {s.execBatch(ctx, codec, reqs)} else {s.exec(ctx, codec, reqs[0])}return nil}// For multi-shot connections, start a goroutine to serve and loop backpend.Add(1)go func(reqs []*serverRequest, batch bool) {defer pend.Done()if batch {s.execBatch(ctx, codec, reqs)} else {s.exec(ctx, codec, reqs[0])}}(reqs, batch)}return nil
}

可以看到,就是一个循环,每次调用readRequest()解析请求数据,然后调用exec()或者execBatch()执行API调用,下面接着看readRequest()源码:

func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {reqs, batch, err := codec.ReadRequestHeaders()if err != nil {return nil, batch, err}requests := make([]*serverRequest, len(reqs))// verify requestsfor i, r := range reqs {var ok boolvar svc *serviceif r.err != nil {requests[i] = &serverRequest{id: r.id, err: r.err}continue}if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first argif args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {requests[i].args = args} else {requests[i].err = &invalidParamsError{err.Error()}}continue}if svc, ok = s.services[r.service]; !ok { // rpc method isn't availablerequests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}continue}if r.isPubSub { // eth_subscribe, r.method contains the subscription method nameif callb, ok := svc.subscriptions[r.method]; ok {requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}if r.params != nil && len(callb.argTypes) > 0 {argTypes := []reflect.Type{reflect.TypeOf("")}argTypes = append(argTypes, callb.argTypes...)if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {requests[i].args = args[1:] // first one is service.method name which isn't an actual argument} else {requests[i].err = &invalidParamsError{err.Error()}}}} else {requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}}continue}if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC methodrequests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}if r.params != nil && len(callb.argTypes) > 0 {if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {requests[i].args = args} else {requests[i].err = &invalidParamsError{err.Error()}}}continue}requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}}return requests, batch, nil
}

首先codec把原始JSON数据解析为一个rpcRequest数组,然后遍历这个数组,根据Namespace找到对应的service,再从service的callbacks表中查询需要调用的method,最后组装成一个新的数据结构serverRequest
接着就是调用exec()执行这个severRequest指向的API实现了:

func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {var response interface{}var callback func()if req.err != nil {response = codec.CreateErrorResponse(&req.id, req.err)} else {response, callback = s.handle(ctx, codec, req)}if err := codec.Write(response); err != nil {log.Error(fmt.Sprintf("%v\n", err))codec.Close()}// when request was a subscribe request this allows these subscriptions to be activedif callback != nil {callback()}
}

可以看到调用了handle()方法获取响应数据,然后通过codec组装成JSON发送给请求端。

看一下handle()函数:

func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {if req.err != nil {return codec.CreateErrorResponse(&req.id, req.err), nil}if req.isUnsubscribe { // cancel subscription, first param must be the subscription idif len(req.args) >= 1 && req.args[0].Kind() == reflect.String {notifier, supported := NotifierFromContext(ctx)if !supported { // interface doesn't support subscriptions (e.g. http)return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil}subid := ID(req.args[0].String())if err := notifier.unsubscribe(subid); err != nil {return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil}return codec.CreateResponse(req.id, true), nil}return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil}if req.callb.isSubscribe {subid, err := s.createSubscription(ctx, codec, req)if err != nil {return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil}// active the subscription after the sub id was successfully sent to the clientactivateSub := func() {notifier, _ := NotifierFromContext(ctx)notifier.activate(subid, req.svcname)}return codec.CreateResponse(req.id, subid), activateSub}// regular RPC call, prepare argumentsif len(req.args) != len(req.callb.argTypes) {rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",req.svcname, serviceMethodSeparator, req.callb.method.Name,len(req.callb.argTypes), len(req.args))}return codec.CreateErrorResponse(&req.id, rpcErr), nil}arguments := []reflect.Value{req.callb.rcvr}if req.callb.hasCtx {arguments = append(arguments, reflect.ValueOf(ctx))}if len(req.args) > 0 {arguments = append(arguments, req.args...)}// execute RPC method and return resultreply := req.callb.method.Func.Call(arguments)if len(reply) == 0 {return codec.CreateResponse(req.id, nil), nil}if req.callb.errPos >= 0 { // test if method returned an errorif !reply[req.callb.errPos].IsNil() {e := reply[req.callb.errPos].Interface().(error)res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})return res, nil}}return codec.CreateResponse(req.id, reply[0].Interface()), nil
}

首先处理参数列表,如果发现调用的函数需要Context参数则加到最前面。然后就是通过反射调用API了,最后把结果送给codec,按JSON RPC的格式要求组装成响应返回就可以了。
上述就是RPC启动和调用的全部过程。

这篇关于以太坊源码之RPC服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/978816

相关文章

关于DNS域名解析服务

《关于DNS域名解析服务》:本文主要介绍关于DNS域名解析服务,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录DNS系统的作用及类型DNS使用的协议及端口号DNS系统的分布式数据结构DNS的分布式互联网解析库域名体系结构两种查询方式DNS服务器类型统计构建DNS域

Linux中SSH服务配置的全面指南

《Linux中SSH服务配置的全面指南》作为网络安全工程师,SSH(SecureShell)服务的安全配置是我们日常工作中不可忽视的重要环节,本文将从基础配置到高级安全加固,全面解析SSH服务的各项参... 目录概述基础配置详解端口与监听设置主机密钥配置认证机制强化禁用密码认证禁止root直接登录实现双因素

java向微信服务号发送消息的完整步骤实例

《java向微信服务号发送消息的完整步骤实例》:本文主要介绍java向微信服务号发送消息的相关资料,包括申请测试号获取appID/appsecret、关注公众号获取openID、配置消息模板及代码... 目录步骤1. 申请测试系统2. 公众号账号信息3. 关注测试号二维码4. 消息模板接口5. Java测试

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过

如何搭建并配置HTTPD文件服务及访问权限控制

《如何搭建并配置HTTPD文件服务及访问权限控制》:本文主要介绍如何搭建并配置HTTPD文件服务及访问权限控制的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、安装HTTPD服务二、HTTPD服务目录结构三、配置修改四、服务启动五、基于用户访问权限控制六、

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

8种快速易用的Python Matplotlib数据可视化方法汇总(附源码)

《8种快速易用的PythonMatplotlib数据可视化方法汇总(附源码)》你是否曾经面对一堆复杂的数据,却不知道如何让它们变得直观易懂?别慌,Python的Matplotlib库是你数据可视化的... 目录引言1. 折线图(Line Plot)——趋势分析2. 柱状图(Bar Chart)——对比分析3

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

linux服务之NIS账户管理服务方式

《linux服务之NIS账户管理服务方式》:本文主要介绍linux服务之NIS账户管理服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、所需要的软件二、服务器配置1、安装 NIS 服务2、设定 NIS 的域名 (NIS domain name)3、修改主

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整