以太坊源码阅读-网络处理-RPC

server.go 实现了RPC服务端的核心逻辑,包括注册、读取请求、处理请求、发送回应等逻辑。
// Server represents a RPC server
type Server struct {
services serviceRegistry

run      int32
codecsMu sync.Mutex
codecs   mapset.Set

}

// callback is a method callback which was registered in the server
type callback struct {
rcvr reflect.Value // receiver of method
method reflect.Method // callback
argTypes []reflect.Type // input argument types
hasCtx bool // method’s first argument is a context (not included in argTypes)
errPos int // err return idx, of -1 when method cannot return error
isSubscribe bool // indication if the callback is a subscription
}

// service represents a registered object
type service struct {
name string // name for service
typ reflect.Type // receiver type
callbacks callbacks // registered handlers
subscriptions subscriptions // available subscriptions/notifications
}

Server创建的时候通过调用RegisterName注册自己的实例。
// NewServer will create a new server instance with no registered handlers.
func NewServer() *Server {
server := &Server{
services: make(serviceRegistry),
codecs: mapset.NewSet(),
run: 1,
}

// register a default service which will provide meta information about the RPC service such as the services and
// methods it offers.
rpcService := &RPCService{server}
server.RegisterName(MetadataApi, rpcService)

return server

}

服务注册RegisterName,该方法会通过传入的参数创建一个service对象,如果没有找到合适的方法则返回错误,如果没有错误,则将创建的service实例加入serviceRegistry。
// RegisterName will create a service for the given rcvr type under the given name. When no methods on the given rcvr
// match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is
// created and added to the service collection this server instance serves.
func (s *Server) RegisterName(name string, rcvr interface{}) error {
if s.services == nil {
s.services = make(serviceRegistry)
}

svc := new(service)
svc.typ = reflect.TypeOf(rcvr)
rcvrVal := reflect.ValueOf(rcvr)

if name == "" {
    return fmt.Errorf("no service name for type %s", svc.typ.String())
}
if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
    return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
}

methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)

if len(methods) == 0 && len(subscriptions) == 0 {
    return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}

// already a previous service register under given name, merge methods/subscriptions
if regsvc, present := s.services[name]; present {
    for _, m := range methods {
        regsvc.callbacks[formatName(m.method.Name)] = m
    }
    for _, s := range subscriptions {
        regsvc.subscriptions[formatName(s.method.Name)] = s
    }
    return nil
}

svc.name = name
svc.callbacks, svc.subscriptions = methods, subscriptions

s.services[svc.name] = svc
return nil

}

suitableCallbacks()遍历这个类型的所有方法并返回。
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria.
func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) {
callbacks := make(callbacks)
subscriptions := make(subscriptions)

METHODS:
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := formatName(method.Name)
if method.PkgPath != “” { // method must be exported
continue
}

    var h callback
    h.isSubscribe = isPubSub(mtype)
    h.rcvr = rcvr
    h.method = method
    h.errPos = -1

    firstArg := 1
    numIn := mtype.NumIn()
    if numIn >= 2 && mtype.In(1) == contextType {
        h.hasCtx = true
        firstArg = 2
    }

    if h.isSubscribe {
        h.argTypes = make([]reflect.Type, numIn-firstArg) // skip rcvr type
        for i := firstArg; i < numIn; i++ {
            argType := mtype.In(i)
            if isExportedOrBuiltinType(argType) {
                h.argTypes[i-firstArg] = argType
            } else {
                continue METHODS
            }
        }

        subscriptions[mname] = &h
        continue METHODS
    }

    // determine method arguments, ignore first arg since it's the receiver type
    // Arguments must be exported or builtin types
    h.argTypes = make([]reflect.Type, numIn-firstArg)
    for i := firstArg; i < numIn; i++ {
        argType := mtype.In(i)
        if !isExportedOrBuiltinType(argType) {
            continue METHODS
        }
        h.argTypes[i-firstArg] = argType
    }

    // check that all returned values are exported or builtin types
    for i := 0; i < mtype.NumOut(); i++ {
        if !isExportedOrBuiltinType(mtype.Out(i)) {
            continue METHODS
        }
    }

    // when a method returns an error it must be the last returned value
    h.errPos = -1
    for i := 0; i < mtype.NumOut(); i++ {
        if isErrorType(mtype.Out(i)) {
            h.errPos = i
            break
        }
    }

    if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 {
        continue METHODS
    }

    switch mtype.NumOut() {
    case 0, 1, 2:
        if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
            continue METHODS
        }
        callbacks[mname] = &h
    }
}

return callbacks, subscriptions

}

server启动和服务,在ipc.go中,可以看到每Accept(),则启动一个goroutine调用srv.ServeCodec来进行服务。
func (srv *Server) ServeListener(l net.Listener) error {
for {
conn, err := l.Accept()
if err != nil {
return err
}
log.Trace(fmt.Sprint(“accepted conn”, conn.RemoteAddr()))
go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
}
}

func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
defer codec.Close()
s.serveRequest(codec, false, options)
}

serveRequest从codec读取请求,调用对应方法并返回至codec,sync.WaitGroup实现了一个信号量的功能,context实现上下文管理。
func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {
var pend sync.WaitGroup

defer func() {
    if err := recover(); err != nil {
        const size = 64 << 10
        buf := make([]byte, size)
        buf = buf[:runtime.Stack(buf, false)]
        log.Error(string(buf))
    }
    s.codecsMu.Lock()
    s.codecs.Remove(codec)
    s.codecsMu.Unlock()
}()

//  ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
    //如果codec支持,可以通过一个叫notifier的对象执行回调函数发送消息给客户端。
// if the codec supports notification include a notifier that callbacks can use
// to send notification to clients. It is tied to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
if options&OptionSubscriptions == OptionSubscriptions {
    ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
}
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
    s.codecsMu.Unlock()
    return &shutdownError{}
}
s.codecs.Add(codec)
s.codecsMu.Unlock()

// test if the server is ordered to stop
for atomic.LoadInt32(&s.run) == 1 {
    reqs, batch, err := s.readRequest(codec)
    if err != nil {
        // If a parsing error occurred, send an error
        if err.Error() != "EOF" {
            log.Debug(fmt.Sprintf("read error %v\n", err))
            codec.Write(codec.CreateErrorResponse(nil, err))
        }
        // Error or end of stream, wait for requests and tear down
        pend.Wait()
        return nil
    }

    // check if server is ordered to shutdown and return an error
    // telling the client that his request failed.
    if atomic.LoadInt32(&s.run) != 1 {
        err = &shutdownError{}
        if batch {
            resps := make([]interface{}, len(reqs))
            for i, r := range reqs {
                resps[i] = codec.CreateErrorResponse(&r.id, err)
            }
            codec.Write(resps)
        } else {
            codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
        }
        return nil
    }
    // If a single shot request is executing, run and return immediately
    if singleShot {
        if batch {
            s.execBatch(ctx, codec, reqs)
        } else {
            s.exec(ctx, codec, reqs[0])
        }
        return nil
    }
    // For multi-shot connections, start a goroutine to serve and loop back
    pend.Add(1)

    go func(reqs []*serverRequest, batch bool) {
        defer pend.Done()
        if batch {
            s.execBatch(ctx, codec, reqs)
        } else {
            s.exec(ctx, codec, reqs[0])
        }
    }(reqs, batch)
}
return nil

}

readRequest方法, 从codec读取请求,查找对应的方法组装成rpcRequest。
type rpcRequest struct {
service string
method string
id interface{}
isPubSub bool
params interface{}
err Error // invalid batch element
}
然后返回serverRequest
// serverRequest is an incoming request
type serverRequest struct {
id interface{}
svcname string
callb *callback
args []reflect.Value
isUnsubscribe bool
err Error
}

readRequest方法,从codec读取请求,对请求进行处理生成serverRequest对象返回。
func (s Server) readRequest(codec ServerCodec) ([]serverRequest, bool, Error) {
reqs, batch, err := codec.ReadRequestHeaders()
if err != nil {
return nil, batch, err
}

requests := make([]*serverRequest, len(reqs))
    //根据reqs构建requests
// verify requests
for i, r := range reqs {
    var ok bool
    var svc *service

    if r.err != nil {
        requests[i] = &serverRequest{id: r.id, err: r.err}
        continue
    }
            //如果请求时发送/订阅的请求,而且方法名称有_unsuvscribe后缀。
    if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
        requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
        argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
        if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
            requests[i].args = args
        } else {
            requests[i].err = &invalidParamsError{err.Error()}
        }
        continue
    }
            //如果没有注册这个方法
    if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
        requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
        continue
    }
            //如果是发布和订阅模式,调用订阅方法。
    if r.isPubSub { // eth_subscribe, r.method contains the subscription method name
        if callb, ok := svc.subscriptions[r.method]; ok {
            requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
            if r.params != nil && len(callb.argTypes) > 0 {
                argTypes := []reflect.Type{reflect.TypeOf("")}
                argTypes = append(argTypes, callb.argTypes...)
                if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                    requests[i].args = args[1:] // first one is service.method name which isn't an actual argument
                } else {
                    requests[i].err = &invalidParamsError{err.Error()}
                }
            }
        } else {
            requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
        }
        continue
    }

    if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
        requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
        if r.params != nil && len(callb.argTypes) > 0 {
            if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
                requests[i].args = args
            } else {
                requests[i].err = &invalidParamsError{err.Error()}
            }
        }
        continue
    }

    requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
}

return requests, batch, nil

}

exec&execBatch方法,调用s.handle方法对request进行处理。
// exec executes the given request and writes the result back using the codec.
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
var response interface{}
var callback func()
if req.err != nil {
response = codec.CreateErrorResponse(&req.id, req.err)
} else {
response, callback = s.handle(ctx, codec, req)
}

if err := codec.Write(response); err != nil {
    log.Error(fmt.Sprintf("%v\n", err))
    codec.Close()
}

// when request was a subscribe request this allows these subscriptions to be actived
if callback != nil {
    callback()
}

}

// execBatch executes the given requests and writes the result back using the codec.
// It will only write the response back when the last request is processed.
func (s Server) execBatch(ctx context.Context, codec ServerCodec, requests []serverRequest) {
responses := make([]interface{}, len(requests))
var callbacks []func()
for i, req := range requests {
if req.err != nil {
responses[i] = codec.CreateErrorResponse(&req.id, req.err)
} else {
var callback func()
if responses[i], callback = s.handle(ctx, codec, req); callback != nil {
callbacks = append(callbacks, callback)
}
}
}

if err := codec.Write(responses); err != nil {
    log.Error(fmt.Sprintf("%v\n", err))
    codec.Close()
}

// when request holds one of more subscribe requests this allows these subscriptions to be activated
for _, c := range callbacks {
    c()
}

}

handle方法,执行一个request,然后返回response。
// handle executes a request and returns the response from the callback.
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
if req.err != nil {
return codec.CreateErrorResponse(&req.id, req.err), nil
}
//取消订阅消息,通过NotifierFromContext获取存入ctx的notifier。
if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
notifier, supported := NotifierFromContext(ctx)
if !supported { // interface doesn’t support subscriptions (e.g. http)
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
}

        subid := ID(req.args[0].String())
        if err := notifier.unsubscribe(subid); err != nil {
            return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
        }

        return codec.CreateResponse(req.id, true), nil
    }
    return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
}
    //如果是订阅消息,创建订阅并激活
if req.callb.isSubscribe {
    subid, err := s.createSubscription(ctx, codec, req)
    if err != nil {
        return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
    }

    // active the subscription after the sub id was successfully sent to the client
    activateSub := func() {
        notifier, _ := NotifierFromContext(ctx)
        notifier.activate(subid, req.svcname)
    }

    return codec.CreateResponse(req.id, subid), activateSub
}

// regular RPC call, prepare arguments
if len(req.args) != len(req.callb.argTypes) {
    rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
        req.svcname, serviceMethodSeparator, req.callb.method.Name,
        len(req.callb.argTypes), len(req.args))}
    return codec.CreateErrorResponse(&req.id, rpcErr), nil
}

arguments := []reflect.Value{req.callb.rcvr}
if req.callb.hasCtx {
    arguments = append(arguments, reflect.ValueOf(ctx))
}
if len(req.args) > 0 {
    arguments = append(arguments, req.args...)
}

// execute RPC method and return result
reply := req.callb.method.Func.Call(arguments)
if len(reply) == 0 {
    return codec.CreateResponse(req.id, nil), nil
}
if req.callb.errPos >= 0 { // test if method returned an error
    if !reply[req.callb.errPos].IsNil() {
        e := reply[req.callb.errPos].Interface().(error)
        res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
        return res, nil
    }
}
return codec.CreateResponse(req.id, reply[0].Interface()), nil

}

subscription.go发布订阅模式
在服务一个客户端连接时候,调用newNotifier方法创建了一个notifier对象存储到ctx中。可以观察到Notifier对象保存了codec的实例,也就是说Notifier对象保存了网络连接,用来在需要的时候发送数据。
// newNotifier creates a new notifier that can be used to send subscription
// notifications to the client.
func newNotifier(codec ServerCodec) Notifier {
return &Notifier{
codec: codec,
active: make(map[ID]
Subscription),
inactive: make(map[ID]*Subscription),
}
}

createSubscription方法会调用指定的注册上来的方法,并得到回应。

// createSubscription will call the subscription callback and returns the subscription id or error.
func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
// subscription have as first argument the context following optional arguments
args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
args = append(args, req.args…)
reply := req.callb.method.Func.Call(args)

if !reply[1].IsNil() { // subscription creation failed
    return "", reply[1].Interface().(error)
}

return reply[0].Interface().(*Subscription).ID, nil

}

在来看看我们的activate方法,这个方法激活了subscription。 subscription在subscription ID被发送给客户端之后被激活,避免客户端还没有收到subscription ID的时候就收到了subscription信息。

// activate enables a subscription. Until a subscription is enabled all
// notifications are dropped. This method is called by the RPC server after
// the subscription ID was sent to client. This prevents notifications being
// send to the client before the subscription ID is send to the client.
func (n *Notifier) activate(id ID, namespace string) {
n.subMu.Lock()
defer n.subMu.Unlock()
if sub, found := n.inactive[id]; found {
sub.namespace = namespace
n.active[id] = sub
delete(n.inactive, id)
}
}

取消订阅的函数
// unsubscribe a subscription.
// If the subscription could not be found ErrSubscriptionNotFound is returned.
func (n *Notifier) unsubscribe(id ID) error {
n.subMu.Lock()
defer n.subMu.Unlock()
if s, found := n.active[id]; found {
close(s.err)
delete(n.active, id)
return nil
}
return ErrSubscriptionNotFound
}

最后是一个发送订阅的函数,调用这个函数把数据发送到客户端, 这个也比较简单。
// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error {
n.subMu.RLock()
defer n.subMu.RUnlock()
sub, active := n.active[id]
if active {
notification := n.codec.CreateNotification(string(id), sub.namespace, data)
if err := n.codec.Write(notification); err != nil {
n.codec.Close()
return err
}
}
return nil
}

client.go
先来看看客户端的数据结构
// Client represents a connection to an RPC server.
type Client struct {
idCounter uint32
connectFunc func(ctx context.Context) (net.Conn, error)
isHTTP bool
// writeConn is only safe to access outside dispatch, with the
// write lock held. The write lock is taken by sending on
// requestOp and released by sending on sendDone.
writeConn net.Conn
// for dispatch
close chan struct{}
didQuit chan struct{} // closed when client quits
reconnected chan net.Conn // where write/reconnect sends the new connection
readErr chan error // errors from read
readResp chan []jsonrpcMessage // valid messages from read
requestOp chan requestOp // for registering response IDs
sendDone chan error // signals write completion, releases write lock
respWait map[string]
requestOp // active requests
subs map[string]
ClientSubscription // active subscriptions
}

newClient, 新建一个客户端。 通过调用connectFunc方法来获取一个网络连接,如果网络连接是httpConn对象的化,那么isHTTP设置为true。然后是对象的初始化, 如果是HTTP连接的化,直接返回,否者就启动一个goroutine调用dispatch方法。 dispatch方法是整个client的指挥中心,通过上面提到的channel来和其他的goroutine来进行通信,获取信息,根据信息做出各种决策。后续会详细介绍dispatch。 因为HTTP的调用方式非常简单, 这里先对HTTP的方式做一个简单的阐述。
func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (Client, error) {
//调用connectFunc方法来获取一个网络连接
conn, err := connectFunc(initctx)
if err != nil {
return nil, err
}
_, isHTTP := conn.(
httpConn)
c := &Client{
writeConn: conn,
isHTTP: isHTTP,
connectFunc: connectFunc,
close: make(chan struct{}),
didQuit: make(chan struct{}),
reconnected: make(chan net.Conn),
readErr: make(chan error),
readResp: make(chan []jsonrpcMessage),
requestOp: make(chan requestOp),
sendDone: make(chan error, 1),
respWait: make(map[string]
requestOp),
subs: make(map[string]
ClientSubscription),
}
if !isHTTP {
go c.dispatch(conn)
}
return c, nil
}

请求通过调用client的call方法来进行RPC调用
// Call performs a JSON-RPC call with the given arguments and unmarshals into
// result if no error occurred.
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) Call(result interface{}, method string, args …interface{}) error {
ctx := context.Background()
return c.CallContext(ctx, result, method, args…)
}

func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args …interface{}) error {
msg, err := c.newMessage(method, args…)
if err != nil {
return err
}
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
}
if err != nil {
return err
}
// dispatch has accepted the request and will close the channel when it quits.
switch resp, err := op.wait(ctx); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
return ErrNoResult
default:
return json.Unmarshal(resp.Result, &result)
}
}

sendHTTP,这个方法直接调用doRequest方法进行请求拿到回应。然后写入到resp队列就返回了。

func (c *Client) sendHTTP(ctx context.Context, op requestOp, msg interface{}) error {
hc := c.writeConn.(
httpConn)
respBody, err := hc.doRequest(ctx, msg)
if err != nil {
return err
}
defer respBody.Close()
var respmsg jsonrpcMessage
if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
return err
}
op.resp <- &respmsg
return nil
}

在看看上面的另一个方法 op.wait()方法,这个方法会查看两个队列的信息。如果是http那么从resp队列获取到回应就会直接返回。 这样整个HTTP的请求过程就完成了。 中间没有涉及到多线程问题,都在一个线程内部完成了。
func (op requestOp) wait(ctx context.Context) (jsonrpcMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-op.resp:
return resp, op.err
}
}

// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
select {
case c.requestOp <- op:
log.Trace(“”, “msg”, log.Lazy{Fn: func() string {
return fmt.Sprint(“sending “, msg)
}})
err := c.write(ctx, msg)
c.sendDone <- err
return err
case <-ctx.Done():
// This can happen if the client is overloaded or unable to keep up with
// subscription notifications.
return ctx.Err()
case <-c.didQuit:
//已经退出,可能被调用了Close
return ErrClientQuit
}
}

dispatch方法
// dispatch is the main loop of the client.
// It sends read messages to waiting calls to Call and BatchCall
// and subscription notifications to registered subscriptions.
func (c *Client) dispatch(conn net.Conn) {
// Spawn the initial read loop.
go c.read(conn)
var (
lastOp *requestOp // tracks last send operation
requestOpLock = c.requestOp // nil while the send lock is held
reading = true // if true, a read loop is running
)
defer close(c.didQuit)
defer func() {
c.closeRequestOps(ErrClientQuit)
conn.Close()
if reading {
// Empty read channels until read is dead.
for {
select {
case <-c.readResp:
case <-c.readErr:
return
}
}
}
}()

for {
    select {
    case <-c.close:
        return

    // Read path.
    case batch := <-c.readResp:
        for _, msg := range batch {
            switch {
            case msg.isNotification():
                log.Trace("", "msg", log.Lazy{Fn: func() string {
                    return fmt.Sprint("<-readResp: notification ", msg)
                }})
                c.handleNotification(msg)
            case msg.isResponse():
                log.Trace("", "msg", log.Lazy{Fn: func() string {
                    return fmt.Sprint("<-readResp: response ", msg)
                }})
                c.handleResponse(msg)
            default:
                log.Debug("", "msg", log.Lazy{Fn: func() string {
                    return fmt.Sprint("<-readResp: dropping weird message", msg)
                }})
                // TODO: maybe close
            }
        }

    case err := <-c.readErr:
        log.Debug("<-readErr", "err", err)
        c.closeRequestOps(err)
        conn.Close()
        reading = false

    case newconn := <-c.reconnected:
        log.Debug("<-reconnected", "reading", reading, "remote", conn.RemoteAddr())
        if reading {
            // Wait for the previous read loop to exit. This is a rare case.
            conn.Close()
            <-c.readErr
        }
        go c.read(newconn)
        reading = true
        conn = newconn

    // Send path.
    case op := <-requestOpLock:
        // Stop listening for further send ops until the current one is done.
        requestOpLock = nil
        lastOp = op
        for _, id := range op.ids {
            c.respWait[string(id)] = op
        }

    case err := <-c.sendDone:
        if err != nil {
            // Remove response handlers for the last send. We remove those here
            // because the error is already handled in Call or BatchCall. When the
            // read loop goes down, it will signal all other current operations.
            for _, id := range lastOp.ids {
                delete(c.respWait, string(id))
            }
        }
        // Listen for send ops again.
        requestOpLock = c.requestOp
        lastOp = nil
    }
}

}

客户端-订阅模式的特殊处理
以太坊的RPC框架支持发布和订阅的模式。
//Subscribe会使用传入的参数调用”<namespace>_subscribe”方法来订阅指定的消息。
//服务器的通知会写入channel参数指定的队列。 channel参数必须和返回的类型相同。
//ctx参数可以用来取消RPC的请求,但是如果订阅已经完成就不会有效果了。
//处理速度太慢的订阅者的消息会被删除,每个客户端有8000个消息的缓存。
func (c Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args …interface{}) (ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic(“first argument to Subscribe must be a writable channel”)
}
if chanVal.IsNil() {
panic(“channel given to Subscribe must not be nil”)
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}

msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
if err != nil {
    return nil, err
}
//requestOp的参数和Call调用的不一样。 多了一个参数sub.
op := &requestOp{
    ids:  []json.RawMessage{msg.ID},
    resp: make(chan *jsonrpcMessage),
    sub:  newClientSubscription(c, namespace, chanVal),
}

// Send the subscription request.
// The arrival and validity of the response is signaled on sub.quit.
if err := c.send(ctx, op, msg); err != nil {
    return nil, err
}
if _, err := op.wait(ctx); err != nil {
    return nil, err
}
return op.sub, nil

}
newClientSubscription方法,这个方法创建了一个新的对象ClientSubscription,这个对象把传入的channel参数保存起来。 然后自己又创建了三个chan对象。后续会对详细介绍这三个chan对象

func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{
client: c,
namespace: namespace,
etype: channel.Type().Elem(),
channel: channel,
quit: make(chan struct{}),
err: make(chan error, 1),
in: make(chan json.RawMessage),
}
return sub
}
从上面的代码可以看出。订阅过程根Call过程差不多,构建一个订阅请求。调用send发送到网络上,然后等待返回。 我们通过dispatch对返回结果的处理来看看订阅和Call的不同。

func (c *Client) handleResponse(msg *jsonrpcMessage) {
op := c.respWait[string(msg.ID)]
if op == nil {
log.Debug(fmt.Sprintf(“unsolicited response %v”, msg))
return
}
delete(c.respWait, string(msg.ID))
// For normal responses, just forward the reply to Call/BatchCall.
如果op.sub是nil,普通的RPC请求,这个字段的值是空白的,只有订阅请求才有值。
if op.sub == nil {
op.resp <- msg
return
}
// For subscription responses, start the subscription if the server
// indicates success. EthSubscribe gets unblocked in either case through
// the op.resp channel.
defer close(op.resp)
if msg.Error != nil {
op.err = msg.Error
return
}
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
//启动一个新的goroutine 并把op.sub.subid记录起来。
go op.sub.start()
c.subs[op.sub.subid] = op.sub
}
}

op.sub.start方法,专门用来处理订阅消息。主要的功能是从in队列里面获取订阅消息,然后把订阅消息放到buffer里面。 如果数据能够发送。就从buffer里面发送一些数据给用户传入的那个channel。 如果buffer超过指定的大小,就丢弃。
func (sub *ClientSubscription) start() {
sub.quitWithError(sub.forward())
}

func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
{Dir: reflect.SelectSend, Chan: sub.channel},
}
buffer := list.New()
defer buffer.Init()
for {
var chosen int
var recv reflect.Value
if buffer.Len() == 0 {
// Idle, omit send case.
chosen, recv, _ = reflect.Select(cases[:2])
} else {
// Non-empty buffer, send the first queued item.
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(cases)
}

    switch chosen {
    case 0: // <-sub.quit
        return nil, false
    case 1: // <-sub.in
        val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
        if err != nil {
            return err, true
        }
        if buffer.Len() == maxClientSubscriptionBuffer {
            return ErrSubscriptionQueueOverflow, true
        }
        buffer.PushBack(val)
    case 2: // sub.channel<-
        cases[2].Send = reflect.Value{} // Don't hold onto the value.
        buffer.Remove(buffer.Front())
    }
}

}
当接收到一条Notification消息的时候会调用handleNotification方法。会把消息传送给in队列。
[图片上传失败…(image-b9d024-1535549234468)]
func (c *Client) handleNotification(msg *jsonrpcMessage) {
if !strings.HasSuffix(msg.Method, notificationMethodSuffix) {
log.Debug(fmt.Sprint(“dropping non-subscription message: “, msg))
return
}
var subResult struct {
ID string json:"subscription"
Result json.RawMessage json:"result"
}
if err := json.Unmarshal(msg.Params, &subResult); err != nil {
log.Debug(fmt.Sprint(“dropping invalid subscription message: “, msg))
return
}
if c.subs[subResult.ID] != nil {
c.subs[subResult.ID].deliver(subResult.Result)
}
}
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
select {
case sub.in <- result:
return true
case <-sub.quit:
return false
}
}

    原文作者:justheone
    原文地址: https://www.jianshu.com/p/780b8372221c
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞