以太坊源码之RPC服务

2024-05-11 06:58
文章标签 源码 服务 rpc 以太

本文主要是介绍以太坊源码之RPC服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

搭建过以太坊私链的同学或许都通过console或者Postman第三方工具去查询以太坊主链的相关信息,如通过区块高度查询区块信息。在以太坊的底层,这些都是通过RPC的调用去实现的,今天就来看看以太坊底层RPC启动方面的源码,了解一下RPC的启动过程。
RPC在以太坊源码中的启动步骤如下:

----- geth
-------- startNode
----------- utils.StartNode
-------------- Node.Start()
----------------- Node.startRPC
上述是RPC在以太坊启动过程中的调用路径,下面先来介绍一下与RPC相关的几个重要的数据结构:

type API struct {Namespace string      Version   string   Service   interface{} Public    bool      
}
Namespace:命令空间,对模块进行命名,eg:eth,net,web3
Version   :模块的版本信息
Service   :执行模块的服务函数
Public    :模式是否运行外部调用type Server struct {services serviceRegistryrun      int32codecsMu sync.Mutexcodecs   *set.Set
}run:该服务是否运行的状态codecsMu :sync的互斥锁codecs   :services :是一个map,key是Namespace,value是一个service实例。定义:type serviceRegistry map[string]*service下面看看server的定义:type service struct {name          string        typ           reflect.Type callbacks     callbacks     subscriptions subscriptions 
}
name :模块名称
typ:接收的类型
callbacks     : 是一个map,key是Namespace,value是一个callback 实例。定义: type callbacks map[string]*callback 
subscriptions : 是一个map,key是Namespace,value是一个callback 实例。定义: type subscriptions map[string]*callback下面看看callback的定义:
type callback struct {rcvr        reflect.Value  // receiver of methodmethod      reflect.Method // callbackargTypes    []reflect.Type // input argument typeshasCtx      bool           // method's first argument is a context (not included in argTypes)errPos      int            // err return idx, of -1 when method cannot return errorisSubscribe bool           // indication if the callback is a subscription
}
rcvr:方法的接收者,这是一个反射值类型,其实就是指向了之前的NewPublicEthereumAPI
method:对应rcvr中的函数
argTypes:函数参数的类型列表
hasCtx:标识函数的第一个参数是否是context.Context类型
errPos:错误代码数值
isSubscribe:是否是subscription类型

接下来我们一起看看RPC启动的源码:

源码路径:go-ethereum\node\node.go
func (n *Node) startRPC(services map[reflect.Type]Service) error {//创建apisapis := n.apis()for _, service := range services {apis = append(apis, service.APIs()...)}//基于apis启动服务if err := n.startInProc(apis); err != nil {return err}if err := n.startIPC(apis); err != nil {n.stopInProc()return err}if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil {n.stopIPC()n.stopInProc()return err}if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {n.stopHTTP()n.stopIPC()n.stopInProc()return err}// All API endpoints started successfullyn.rpcAPIs = apisreturn nil
}

startInProc:内部调用,没有启动相关Server服务
startIPC:启动IPC服务
startHTTP:启动http服务
startWS:启动WebSocket服务
如上述服务有一项启动失败,整体RPC服务启动失败,报错并退出!下面重点介绍一下httpRPC的启动过程,源码如下:

func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string) error {// http的IP和端口,默认localhost:8545if endpoint == "" {return nil}listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts)if err != nil {return err}n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ","))// 所有服务启动成功&赋值n.httpEndpoint = endpointn.httpListener = listener    //HTTP RPC listener socket to server API requestsn.httpHandler = handler   //HTTP RPC request handler to process the API requests  //httpHandler   *rpc.Server,上述介绍的Server数据结构return nil
}

下面来具体看看API的注册过程,源码如下:

func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) {// 在本地创建一个白名单whitelist := make(map[string]bool)for _, module := range modules {whitelist[module] = true}// 注册API服务handler := NewServer()for _, api := range apis {if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {if err := handler.RegisterName(api.Namespace, api.Service); err != nil {return nil, nil, err}log.Debug("HTTP registered", "namespace", api.Namespace)}}// 所有服务注册完成,启动tcp监听var (listener net.Listenererr      error)if listener, err = net.Listen("tcp", endpoint); err != nil {return nil, nil, err}go NewHTTPServer(cors, vhosts, handler).Serve(listener)return listener, handler, err
}

接下来分析一下单个API服务的注册过程:

func (s *Server) RegisterName(name string, rcvr interface{}) error {if s.services == nil {s.services = make(serviceRegistry)}//先创建一个service实例,填充它的callbacks和subscriptions字段svc := new(service)svc.typ = reflect.TypeOf(rcvr)rcvrVal := reflect.ValueOf(rcvr)if name == "" {return fmt.Errorf("no service name for type %s", svc.typ.String())}if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())}//suitableCallbacks()函数会检查API定义是否符合标准methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)// API属于同一个Namespace,进行合并if regsvc, present := s.services[name]; present {if len(methods) == 0 && len(subscriptions) == 0 {return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)}for _, m := range methods {regsvc.callbacks[formatName(m.method.Name)] = m}for _, s := range subscriptions {regsvc.subscriptions[formatName(s.method.Name)] = s}return nil}//创建callback实例放入mapsvc.name = namesvc.callbacks, svc.subscriptions = methods, subscriptionsif len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)}s.services[svc.name] = svcreturn nil
}

API注册完成,HTTP将进行TCP监听,创建了一个http.Server实例,并启动一个goroutine调用它的Serve()方法:

func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server {// Wrap the CORS-handler within a host-handlerhandler := newCorsHandler(srv, cors)handler = newVHostHandler(vhosts, handler)return &http.Server{Handler: handler}
}这里有一个Handler参数,用到了装饰者模式,其实最终实现还是在rpc.Server中。Handler是一个接口,需要实现它的ServerHTTP()函数来处理网络数据,newCorsHandler代码如下:
func newCorsHandler(srv *Server, allowedOrigins []string) http.Handler {// disable CORS support if user has not specified a custom CORS configurationif len(allowedOrigins) == 0 {return srv}c := cors.New(cors.Options{AllowedOrigins: allowedOrigins,AllowedMethods: []string{http.MethodPost, http.MethodGet},MaxAge:         600,AllowedHeaders: []string{"*"},})return c.Handler(srv)
}Handle接口的实现:
func (c *Cors) Handler(h http.Handler) http.Handler {return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {if r.Method == "OPTIONS" {c.logf("Handler: Preflight request")c.handlePreflight(w, r)if c.optionPassthrough {h.ServeHTTP(w, r)} else {w.WriteHeader(http.StatusOK)}} else {c.logf("Handler: Actual request")c.handleActualRequest(w, r)h.ServeHTTP(w, r)}})
}进行数据处理的主要是h.ServeHTTP,其源码如下(go-ethereum\rpc\http.go):
func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {// Permit dumb empty requests for remote health-checks (AWS)if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {return}if code, err := validateRequest(r); err != nil {http.Error(w, err.Error(), code)return}// All checks passed, create a codec that reads direct from the request body// untilEOF and writes the response to w and order the server to process a// single request.ctx := context.Background()ctx = context.WithValue(ctx, "remote", r.RemoteAddr)ctx = context.WithValue(ctx, "scheme", r.Proto)ctx = context.WithValue(ctx, "local", r.Host)//创建reader读取原始数据body := io.LimitReader(r.Body, maxRequestContentLength)//创建一个JSON的编解码器codec := NewJSONCodec(&httpReadWriteNopCloser{body, w})defer codec.Close()w.Header().Set("content-type", contentType)//数据处理srv.ServeSingleRequest(ctx, codec, OptionMethodInvocation)
}

接下来重点分析一下数据处理函数srv.ServeSingleRequest(),其源码如下:

func (s *Server) ServeSingleRequest(ctx context.Context, codec ServerCodec, options CodecOption) {s.serveRequest(ctx, codec, true, options)
}func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {var pend sync.WaitGroupdefer func() {if err := recover(); err != nil {const size = 64 << 10buf := make([]byte, size)buf = buf[:runtime.Stack(buf, false)]log.Error(string(buf))}s.codecsMu.Lock()s.codecs.Remove(codec)s.codecsMu.Unlock()}()//	ctx, cancel := context.WithCancel(context.Background())ctx, cancel := context.WithCancel(ctx)defer cancel()// if the codec supports notification include a notifier that callbacks can use// to send notification to clients. It is thight to the codec/connection. If the// connection is closed the notifier will stop and cancels all active subscriptions.if options&OptionSubscriptions == OptionSubscriptions {ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))}s.codecsMu.Lock()if atomic.LoadInt32(&s.run) != 1 { // 服务没有起来直接退出s.codecsMu.Unlock()return &shutdownError{}}s.codecs.Add(codec)s.codecsMu.Unlock()// test if the server is ordered to stopfor atomic.LoadInt32(&s.run) == 1 {reqs, batch, err := s.readRequest(codec)if err != nil {// If a parsing error occurred, send an errorif err.Error() != "EOF" {log.Debug(fmt.Sprintf("read error %v\n", err))codec.Write(codec.CreateErrorResponse(nil, err))}// Error or end of stream, wait for requests and tear downpend.Wait()return nil}// check if server is ordered to shutdown and return an error// telling the client that his request failed.if atomic.LoadInt32(&s.run) != 1 {err = &shutdownError{}if batch {resps := make([]interface{}, len(reqs))for i, r := range reqs {resps[i] = codec.CreateErrorResponse(&r.id, err)}codec.Write(resps)} else {codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))}return nil}// If a single shot request is executing, run and return immediatelyif singleShot {if batch {s.execBatch(ctx, codec, reqs)} else {s.exec(ctx, codec, reqs[0])}return nil}// For multi-shot connections, start a goroutine to serve and loop backpend.Add(1)go func(reqs []*serverRequest, batch bool) {defer pend.Done()if batch {s.execBatch(ctx, codec, reqs)} else {s.exec(ctx, codec, reqs[0])}}(reqs, batch)}return nil
}

可以看到,就是一个循环,每次调用readRequest()解析请求数据,然后调用exec()或者execBatch()执行API调用,下面接着看readRequest()源码:

func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {reqs, batch, err := codec.ReadRequestHeaders()if err != nil {return nil, batch, err}requests := make([]*serverRequest, len(reqs))// verify requestsfor i, r := range reqs {var ok boolvar svc *serviceif r.err != nil {requests[i] = &serverRequest{id: r.id, err: r.err}continue}if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first argif args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {requests[i].args = args} else {requests[i].err = &invalidParamsError{err.Error()}}continue}if svc, ok = s.services[r.service]; !ok { // rpc method isn't availablerequests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}continue}if r.isPubSub { // eth_subscribe, r.method contains the subscription method nameif callb, ok := svc.subscriptions[r.method]; ok {requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}if r.params != nil && len(callb.argTypes) > 0 {argTypes := []reflect.Type{reflect.TypeOf("")}argTypes = append(argTypes, callb.argTypes...)if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {requests[i].args = args[1:] // first one is service.method name which isn't an actual argument} else {requests[i].err = &invalidParamsError{err.Error()}}}} else {requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}}continue}if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC methodrequests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}if r.params != nil && len(callb.argTypes) > 0 {if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {requests[i].args = args} else {requests[i].err = &invalidParamsError{err.Error()}}}continue}requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}}return requests, batch, nil
}

首先codec把原始JSON数据解析为一个rpcRequest数组,然后遍历这个数组,根据Namespace找到对应的service,再从service的callbacks表中查询需要调用的method,最后组装成一个新的数据结构serverRequest
接着就是调用exec()执行这个severRequest指向的API实现了:

func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {var response interface{}var callback func()if req.err != nil {response = codec.CreateErrorResponse(&req.id, req.err)} else {response, callback = s.handle(ctx, codec, req)}if err := codec.Write(response); err != nil {log.Error(fmt.Sprintf("%v\n", err))codec.Close()}// when request was a subscribe request this allows these subscriptions to be activedif callback != nil {callback()}
}

可以看到调用了handle()方法获取响应数据,然后通过codec组装成JSON发送给请求端。

看一下handle()函数:

func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {if req.err != nil {return codec.CreateErrorResponse(&req.id, req.err), nil}if req.isUnsubscribe { // cancel subscription, first param must be the subscription idif len(req.args) >= 1 && req.args[0].Kind() == reflect.String {notifier, supported := NotifierFromContext(ctx)if !supported { // interface doesn't support subscriptions (e.g. http)return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil}subid := ID(req.args[0].String())if err := notifier.unsubscribe(subid); err != nil {return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil}return codec.CreateResponse(req.id, true), nil}return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil}if req.callb.isSubscribe {subid, err := s.createSubscription(ctx, codec, req)if err != nil {return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil}// active the subscription after the sub id was successfully sent to the clientactivateSub := func() {notifier, _ := NotifierFromContext(ctx)notifier.activate(subid, req.svcname)}return codec.CreateResponse(req.id, subid), activateSub}// regular RPC call, prepare argumentsif len(req.args) != len(req.callb.argTypes) {rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",req.svcname, serviceMethodSeparator, req.callb.method.Name,len(req.callb.argTypes), len(req.args))}return codec.CreateErrorResponse(&req.id, rpcErr), nil}arguments := []reflect.Value{req.callb.rcvr}if req.callb.hasCtx {arguments = append(arguments, reflect.ValueOf(ctx))}if len(req.args) > 0 {arguments = append(arguments, req.args...)}// execute RPC method and return resultreply := req.callb.method.Func.Call(arguments)if len(reply) == 0 {return codec.CreateResponse(req.id, nil), nil}if req.callb.errPos >= 0 { // test if method returned an errorif !reply[req.callb.errPos].IsNil() {e := reply[req.callb.errPos].Interface().(error)res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})return res, nil}}return codec.CreateResponse(req.id, reply[0].Interface()), nil
}

首先处理参数列表,如果发现调用的函数需要Context参数则加到最前面。然后就是通过反射调用API了,最后把结果送给codec,按JSON RPC的格式要求组装成响应返回就可以了。
上述就是RPC启动和调用的全部过程。

这篇关于以太坊源码之RPC服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/978816

相关文章

springboot家政服务管理平台 LW +PPT+源码+讲解

3系统的可行性研究及需求分析 3.1可行性研究 3.1.1技术可行性分析 经过大学四年的学习,已经掌握了JAVA、Mysql数据库等方面的编程技巧和方法,对于这些技术该有的软硬件配置也是齐全的,能够满足开发的需要。 本家政服务管理平台采用的是Mysql作为数据库,可以绝对地保证用户数据的安全;可以与Mysql数据库进行无缝连接。 所以,家政服务管理平台在技术上是可以实施的。 3.1

高仿精仿愤怒的小鸟android版游戏源码

这是一款很完美的高仿精仿愤怒的小鸟android版游戏源码,大家可以研究一下吧、 为了报复偷走鸟蛋的肥猪们,鸟儿以自己的身体为武器,仿佛炮弹一样去攻击肥猪们的堡垒。游戏是十分卡通的2D画面,看着愤怒的红色小鸟,奋不顾身的往绿色的肥猪的堡垒砸去,那种奇妙的感觉还真是令人感到很欢乐。而游戏的配乐同样充满了欢乐的感觉,轻松的节奏,欢快的风格。 源码下载

基于Java医院药品交易系统详细设计和实现(源码+LW+调试文档+讲解等)

💗博主介绍:✌全网粉丝10W+,CSDN作者、博客专家、全栈领域优质创作者,博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌💗 🌟文末获取源码+数据库🌟 感兴趣的可以先收藏起来,还有大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助更多的人  Java精品实战案例《600套》 2023-2025年最值得选择的Java毕业设计选题大全:1000个热

美容美发店营销版微信小程序源码

打造线上生意新篇章 一、引言:微信小程序,开启美容美发行业新纪元 在数字化时代,微信小程序以其便捷、高效的特点,成为了美容美发行业营销的新宠。本文将带您深入了解美容美发营销微信小程序,探讨其独特优势及如何助力商家实现业务增长。 二、微信小程序:美容美发行业的得力助手 拓宽客源渠道:微信小程序基于微信社交平台,轻松实现线上线下融合,帮助商家快速吸引潜在客户,拓宽客源渠道。 提升用户体验:

风水研究会官网源码系统-可展示自己的领域内容-商品售卖等

一款用于展示风水行业,周易测算行业,玄学行业的系统,并支持售卖自己的商品。 整洁大气,非常漂亮,前端内容均可通过后台修改。 大致功能: 支持前端内容通过后端自定义支持开启关闭会员功能,会员等级设置支持对接官方支付支持添加商品类支持添加虚拟下载类支持自定义其他类型字段支持生成虚拟激活卡支持采集其他站点文章支持对接收益广告支持文章评论支持积分功能支持推广功能更多功能,搭建完成自行体验吧! 原文

微服务中RPC的强类型检查与HTTP的弱类型对比

在微服务架构中,服务间的通信是一个至关重要的环节。其中,远程过程调用(RPC)和HTTP是两种最常见的通信方式。虽然它们都能实现服务间的数据交换,但在类型检查方面,RPC的强类型检查和HTTP的弱类型之间有着显著的差异。本文将深入探讨这两种通信方式在类型检查方面的优缺点,以及它们对微服务架构的影响。 一、RPC的强类型检查 RPC的强类型检查是其核心优势之一。在RPC通信中,客户端和服务端都使

中国341城市生态系统服务价值数据集(2000-2020年)

生态系统服务反映了人类直接或者间接从自然生态系统中获得的各种惠益,对支撑和维持人类生存和福祉起着重要基础作用。目前针对全国城市尺度的生态系统服务价值的长期评估还相对较少。我们在Xie等(2017)的静态生态系统服务当量因子表基础上,选取净初级生产力,降水量,生物迁移阻力,土壤侵蚀度和道路密度五个变量,对生态系统供给服务、调节服务、支持服务和文化服务共4大类和11小类的当量因子进行了时空调整,计算了

HTML5文旅文化旅游网站模板源码

文章目录 1.设计来源文旅宣传1.1 登录界面演示1.2 注册界面演示1.3 首页界面演示1.4 文旅之行界面演示1.5 文旅之行文章内容界面演示1.6 关于我们界面演示1.7 文旅博客界面演示1.8 文旅博客文章内容界面演示1.9 联系我们界面演示 2.效果和源码2.1 动态效果2.2 源代码2.3 源码目录 源码下载万套模板,程序开发,在线开发,在线沟通 作者:xcLeigh

SpringCloud - 微服务

1、微服务介绍         参考: 微服务百度百科 1.1 概念         微服务(或称微服务架构)是一种云原生架构方法,在单个应用中包含众多松散耦合且可单独部署的小型组件或服务。 这些服务通常拥有自己的技术栈,包括数据库和数据管理模型;通过一个REST API、事件流和消息代理组合彼此通信;以及按照业务能力进行组织,具有通常称为有界上下文的服务分隔线。         微服务特

mediasoup 源码分析 (八)分析PlainTransport

mediasoup 源码分析 (六)分析PlainTransport 一、接收裸RTP流二、mediasoup 中udp建立过程 tips 一、接收裸RTP流 PlainTransport 可以接收裸RTP流,也可以接收AES加密的RTP流。源码中提供了一个通过ffmpeg发送裸RTP流到mediasoup的脚本,具体地址为:mediasoup-demo/broadcaste