本文主要是介绍以太坊源码之RPC服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
搭建过以太坊私链的同学或许都通过console或者Postman第三方工具去查询以太坊主链的相关信息,如通过区块高度查询区块信息。在以太坊的底层,这些都是通过RPC的调用去实现的,今天就来看看以太坊底层RPC启动方面的源码,了解一下RPC的启动过程。
RPC在以太坊源码中的启动步骤如下:
----- geth
-------- startNode
----------- utils.StartNode
-------------- Node.Start()
----------------- Node.startRPC
上述是RPC在以太坊启动过程中的调用路径,下面先来介绍一下与RPC相关的几个重要的数据结构:
type API struct {Namespace string Version string Service interface{} Public bool
}
Namespace:命令空间,对模块进行命名,eg:eth,net,web3
Version :模块的版本信息
Service :执行模块的服务函数
Public :模式是否运行外部调用type Server struct {services serviceRegistryrun int32codecsMu sync.Mutexcodecs *set.Set
}run:该服务是否运行的状态codecsMu :sync的互斥锁codecs :services :是一个map,key是Namespace,value是一个service实例。定义:type serviceRegistry map[string]*service下面看看server的定义:type service struct {name string typ reflect.Type callbacks callbacks subscriptions subscriptions
}
name :模块名称
typ:接收的类型
callbacks : 是一个map,key是Namespace,value是一个callback 实例。定义: type callbacks map[string]*callback
subscriptions : 是一个map,key是Namespace,value是一个callback 实例。定义: type subscriptions map[string]*callback下面看看callback的定义:
type callback struct {rcvr reflect.Value // receiver of methodmethod reflect.Method // callbackargTypes []reflect.Type // input argument typeshasCtx bool // method's first argument is a context (not included in argTypes)errPos int // err return idx, of -1 when method cannot return errorisSubscribe bool // indication if the callback is a subscription
}
rcvr:方法的接收者,这是一个反射值类型,其实就是指向了之前的NewPublicEthereumAPI
method:对应rcvr中的函数
argTypes:函数参数的类型列表
hasCtx:标识函数的第一个参数是否是context.Context类型
errPos:错误代码数值
isSubscribe:是否是subscription类型
接下来我们一起看看RPC启动的源码:
源码路径:go-ethereum\node\node.go
func (n *Node) startRPC(services map[reflect.Type]Service) error {//创建apisapis := n.apis()for _, service := range services {apis = append(apis, service.APIs()...)}//基于apis启动服务if err := n.startInProc(apis); err != nil {return err}if err := n.startIPC(apis); err != nil {n.stopInProc()return err}if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil {n.stopIPC()n.stopInProc()return err}if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {n.stopHTTP()n.stopIPC()n.stopInProc()return err}// All API endpoints started successfullyn.rpcAPIs = apisreturn nil
}
startInProc:内部调用,没有启动相关Server服务
startIPC:启动IPC服务
startHTTP:启动http服务
startWS:启动WebSocket服务
如上述服务有一项启动失败,整体RPC服务启动失败,报错并退出!下面重点介绍一下httpRPC的启动过程,源码如下:
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string) error {// http的IP和端口,默认localhost:8545if endpoint == "" {return nil}listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts)if err != nil {return err}n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ","))// 所有服务启动成功&赋值n.httpEndpoint = endpointn.httpListener = listener //HTTP RPC listener socket to server API requestsn.httpHandler = handler //HTTP RPC request handler to process the API requests //httpHandler *rpc.Server,上述介绍的Server数据结构return nil
}
下面来具体看看API的注册过程,源码如下:
func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) {// 在本地创建一个白名单whitelist := make(map[string]bool)for _, module := range modules {whitelist[module] = true}// 注册API服务handler := NewServer()for _, api := range apis {if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {if err := handler.RegisterName(api.Namespace, api.Service); err != nil {return nil, nil, err}log.Debug("HTTP registered", "namespace", api.Namespace)}}// 所有服务注册完成,启动tcp监听var (listener net.Listenererr error)if listener, err = net.Listen("tcp", endpoint); err != nil {return nil, nil, err}go NewHTTPServer(cors, vhosts, handler).Serve(listener)return listener, handler, err
}
接下来分析一下单个API服务的注册过程:
func (s *Server) RegisterName(name string, rcvr interface{}) error {if s.services == nil {s.services = make(serviceRegistry)}//先创建一个service实例,填充它的callbacks和subscriptions字段svc := new(service)svc.typ = reflect.TypeOf(rcvr)rcvrVal := reflect.ValueOf(rcvr)if name == "" {return fmt.Errorf("no service name for type %s", svc.typ.String())}if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())}//suitableCallbacks()函数会检查API定义是否符合标准methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)// API属于同一个Namespace,进行合并if regsvc, present := s.services[name]; present {if len(methods) == 0 && len(subscriptions) == 0 {return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)}for _, m := range methods {regsvc.callbacks[formatName(m.method.Name)] = m}for _, s := range subscriptions {regsvc.subscriptions[formatName(s.method.Name)] = s}return nil}//创建callback实例放入mapsvc.name = namesvc.callbacks, svc.subscriptions = methods, subscriptionsif len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)}s.services[svc.name] = svcreturn nil
}
API注册完成,HTTP将进行TCP监听,创建了一个http.Server实例,并启动一个goroutine调用它的Serve()方法:
func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server {// Wrap the CORS-handler within a host-handlerhandler := newCorsHandler(srv, cors)handler = newVHostHandler(vhosts, handler)return &http.Server{Handler: handler}
}这里有一个Handler参数,用到了装饰者模式,其实最终实现还是在rpc.Server中。Handler是一个接口,需要实现它的ServerHTTP()函数来处理网络数据,newCorsHandler代码如下:
func newCorsHandler(srv *Server, allowedOrigins []string) http.Handler {// disable CORS support if user has not specified a custom CORS configurationif len(allowedOrigins) == 0 {return srv}c := cors.New(cors.Options{AllowedOrigins: allowedOrigins,AllowedMethods: []string{http.MethodPost, http.MethodGet},MaxAge: 600,AllowedHeaders: []string{"*"},})return c.Handler(srv)
}Handle接口的实现:
func (c *Cors) Handler(h http.Handler) http.Handler {return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {if r.Method == "OPTIONS" {c.logf("Handler: Preflight request")c.handlePreflight(w, r)if c.optionPassthrough {h.ServeHTTP(w, r)} else {w.WriteHeader(http.StatusOK)}} else {c.logf("Handler: Actual request")c.handleActualRequest(w, r)h.ServeHTTP(w, r)}})
}进行数据处理的主要是h.ServeHTTP,其源码如下(go-ethereum\rpc\http.go):
func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {// Permit dumb empty requests for remote health-checks (AWS)if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {return}if code, err := validateRequest(r); err != nil {http.Error(w, err.Error(), code)return}// All checks passed, create a codec that reads direct from the request body// untilEOF and writes the response to w and order the server to process a// single request.ctx := context.Background()ctx = context.WithValue(ctx, "remote", r.RemoteAddr)ctx = context.WithValue(ctx, "scheme", r.Proto)ctx = context.WithValue(ctx, "local", r.Host)//创建reader读取原始数据body := io.LimitReader(r.Body, maxRequestContentLength)//创建一个JSON的编解码器codec := NewJSONCodec(&httpReadWriteNopCloser{body, w})defer codec.Close()w.Header().Set("content-type", contentType)//数据处理srv.ServeSingleRequest(ctx, codec, OptionMethodInvocation)
}
接下来重点分析一下数据处理函数srv.ServeSingleRequest(),其源码如下:
func (s *Server) ServeSingleRequest(ctx context.Context, codec ServerCodec, options CodecOption) {s.serveRequest(ctx, codec, true, options)
}func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {var pend sync.WaitGroupdefer func() {if err := recover(); err != nil {const size = 64 << 10buf := make([]byte, size)buf = buf[:runtime.Stack(buf, false)]log.Error(string(buf))}s.codecsMu.Lock()s.codecs.Remove(codec)s.codecsMu.Unlock()}()// ctx, cancel := context.WithCancel(context.Background())ctx, cancel := context.WithCancel(ctx)defer cancel()// if the codec supports notification include a notifier that callbacks can use// to send notification to clients. It is thight to the codec/connection. If the// connection is closed the notifier will stop and cancels all active subscriptions.if options&OptionSubscriptions == OptionSubscriptions {ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))}s.codecsMu.Lock()if atomic.LoadInt32(&s.run) != 1 { // 服务没有起来直接退出s.codecsMu.Unlock()return &shutdownError{}}s.codecs.Add(codec)s.codecsMu.Unlock()// test if the server is ordered to stopfor atomic.LoadInt32(&s.run) == 1 {reqs, batch, err := s.readRequest(codec)if err != nil {// If a parsing error occurred, send an errorif err.Error() != "EOF" {log.Debug(fmt.Sprintf("read error %v\n", err))codec.Write(codec.CreateErrorResponse(nil, err))}// Error or end of stream, wait for requests and tear downpend.Wait()return nil}// check if server is ordered to shutdown and return an error// telling the client that his request failed.if atomic.LoadInt32(&s.run) != 1 {err = &shutdownError{}if batch {resps := make([]interface{}, len(reqs))for i, r := range reqs {resps[i] = codec.CreateErrorResponse(&r.id, err)}codec.Write(resps)} else {codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))}return nil}// If a single shot request is executing, run and return immediatelyif singleShot {if batch {s.execBatch(ctx, codec, reqs)} else {s.exec(ctx, codec, reqs[0])}return nil}// For multi-shot connections, start a goroutine to serve and loop backpend.Add(1)go func(reqs []*serverRequest, batch bool) {defer pend.Done()if batch {s.execBatch(ctx, codec, reqs)} else {s.exec(ctx, codec, reqs[0])}}(reqs, batch)}return nil
}
可以看到,就是一个循环,每次调用readRequest()解析请求数据,然后调用exec()或者execBatch()执行API调用,下面接着看readRequest()源码:
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {reqs, batch, err := codec.ReadRequestHeaders()if err != nil {return nil, batch, err}requests := make([]*serverRequest, len(reqs))// verify requestsfor i, r := range reqs {var ok boolvar svc *serviceif r.err != nil {requests[i] = &serverRequest{id: r.id, err: r.err}continue}if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first argif args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {requests[i].args = args} else {requests[i].err = &invalidParamsError{err.Error()}}continue}if svc, ok = s.services[r.service]; !ok { // rpc method isn't availablerequests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}continue}if r.isPubSub { // eth_subscribe, r.method contains the subscription method nameif callb, ok := svc.subscriptions[r.method]; ok {requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}if r.params != nil && len(callb.argTypes) > 0 {argTypes := []reflect.Type{reflect.TypeOf("")}argTypes = append(argTypes, callb.argTypes...)if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {requests[i].args = args[1:] // first one is service.method name which isn't an actual argument} else {requests[i].err = &invalidParamsError{err.Error()}}}} else {requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}}continue}if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC methodrequests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}if r.params != nil && len(callb.argTypes) > 0 {if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {requests[i].args = args} else {requests[i].err = &invalidParamsError{err.Error()}}}continue}requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}}return requests, batch, nil
}
首先codec把原始JSON数据解析为一个rpcRequest数组,然后遍历这个数组,根据Namespace找到对应的service,再从service的callbacks表中查询需要调用的method,最后组装成一个新的数据结构serverRequest
接着就是调用exec()执行这个severRequest指向的API实现了:
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {var response interface{}var callback func()if req.err != nil {response = codec.CreateErrorResponse(&req.id, req.err)} else {response, callback = s.handle(ctx, codec, req)}if err := codec.Write(response); err != nil {log.Error(fmt.Sprintf("%v\n", err))codec.Close()}// when request was a subscribe request this allows these subscriptions to be activedif callback != nil {callback()}
}
可以看到调用了handle()方法获取响应数据,然后通过codec组装成JSON发送给请求端。
看一下handle()函数:
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {if req.err != nil {return codec.CreateErrorResponse(&req.id, req.err), nil}if req.isUnsubscribe { // cancel subscription, first param must be the subscription idif len(req.args) >= 1 && req.args[0].Kind() == reflect.String {notifier, supported := NotifierFromContext(ctx)if !supported { // interface doesn't support subscriptions (e.g. http)return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil}subid := ID(req.args[0].String())if err := notifier.unsubscribe(subid); err != nil {return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil}return codec.CreateResponse(req.id, true), nil}return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil}if req.callb.isSubscribe {subid, err := s.createSubscription(ctx, codec, req)if err != nil {return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil}// active the subscription after the sub id was successfully sent to the clientactivateSub := func() {notifier, _ := NotifierFromContext(ctx)notifier.activate(subid, req.svcname)}return codec.CreateResponse(req.id, subid), activateSub}// regular RPC call, prepare argumentsif len(req.args) != len(req.callb.argTypes) {rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",req.svcname, serviceMethodSeparator, req.callb.method.Name,len(req.callb.argTypes), len(req.args))}return codec.CreateErrorResponse(&req.id, rpcErr), nil}arguments := []reflect.Value{req.callb.rcvr}if req.callb.hasCtx {arguments = append(arguments, reflect.ValueOf(ctx))}if len(req.args) > 0 {arguments = append(arguments, req.args...)}// execute RPC method and return resultreply := req.callb.method.Func.Call(arguments)if len(reply) == 0 {return codec.CreateResponse(req.id, nil), nil}if req.callb.errPos >= 0 { // test if method returned an errorif !reply[req.callb.errPos].IsNil() {e := reply[req.callb.errPos].Interface().(error)res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})return res, nil}}return codec.CreateResponse(req.id, reply[0].Interface()), nil
}
首先处理参数列表,如果发现调用的函数需要Context参数则加到最前面。然后就是通过反射调用API了,最后把结果送给codec,按JSON RPC的格式要求组装成响应返回就可以了。
上述就是RPC启动和调用的全部过程。
这篇关于以太坊源码之RPC服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!