【Ovirt 笔记】engine 与 vdsm 之间的调用分析 JSON-RPC

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

分析整理的版本为 Ovirt 4.2.3 版本。

1. 简介

  • JSON-RPC(Remote Protocol Call)是一种以 JSON 为协议的远程调用服务,具有开发调试简单,多平台通用的特性。
    • 是一种以 json 为消息格式的远程调用服务,它是一套允许运行在不同操作系统、不同环境的程序实现基于 Internet 过程调用的规范和一系列的实现。
    • 是一种无状态、轻量级远程过程调用(RPC)协议。该协议是与传输方式无关的,其定义内容在同一个处理过程使用时,可通过 sockets、HTTP 或其它任意消息传递环境。
    • 使用了 JSON(RFC 4627) 作为数据格式。
  • JSON-RPC 和 XML-RPC 相比的优点
    • XML-RPC 是以 XML 作为消息格式,XML 具有体积大,格式复杂,传输占用带宽。
    • 程序对 XML 的解析也比较复杂,并且耗费较多服务器资源。JSON 相比 XML 体积小巧,并且解析相对容易很多。
    • JSON-RPC 相对 XML-RPC 在带宽、服务器资源消耗、开发易用性方面要强很多。

1.2 约定

  • MUST,MUST NOT,REQUIRED,SHALL,SHALL NOT,SHOULD,SHOULD NOT,RECOMMENDED,MAY,和 OPTIONAL 这些关键词由 RFC 2119 定义。
  • JSON-RPC 使用了 JSON 格式,所以与 JSON 使用相同的类型系统(参见
    http://www.json.org/ 或者 RFC 4627)。
    • JSON 可以表示四种基础类型(字符串,数字,布尔,和空值)以及两种结构类型(对象和数组)。
  • 客户端和服务端之间交互的所有成员名称都应当是大小写敏感的。
  • 客户端被定义为请求对象的来源和应答对象的处理者,服务端被定义为应答对象的来源和请求对象的处理者。

2. 请求

  • 请求时向服务器传输数据格式如下(基于 JSON 2.0)
{ 
    "jsonrpc" : 2.0,
    "method" : "sayHello", 
    "params" : ["Hello JSON-RPC"], 
    "id" : 1
} 

2.1 请求对象

属性说明
jsonrpc定义 JSON-RPC 版本,必须等于 2.0。
method字符串类型,包含被调用方法的名称。方法名称由 rpc 开头,紧随着一个 . 字符( U+002E 或者 ASCII 46),是 rpc 内部保留的方法和扩展名称,不允许被使用。
params结构类型值,保存着方法调用时的参数值。此成员可省略。若无参数则为 NULL。
id由客户端指定的标识,必须包含一个字符串或数字或 NULL 值。如果对象中不包含此成员,则该对象被认作一个通知(notification)。id 值通常不应该为 NULL。而且数字值不应当包含小数部分。如果包含 id 值,则服务端的响应必须包含与其一样的值。此成员用于维护两个对象之间的关联关系。
  • 不应该使用 NULL 作为 id 的值,因为规范文档使用 NULL 值作为 id 未知时响应对象 id 的值。
    • 1.0 版本中使用 id 为 NULL 值的对象被视作通知,这样做容易在处理时造成混乱。
  • id 不应该包含小数部分,因为许多十进制小数都无法用二进制准确的表示。​

2.2 参数结构

  • RPC 调用包含的参数必须是结构类型的。要么使用数组,按位置区分;要么使用对象,按名称区分。
    • 按位置区分:params 必须是一个数组,以服务端所期望的顺序包含所有参数。
    • 按名称区分:params 必须是一个对象,成员名称与服务端期望的参数名称一致。
      • 如果缺少相应名称的参数,则可能产生错误。
      • 名称必须与方法所期望的参数名称完全匹配,包括大小写。

3. 响应

  • 服务器返回的数据格式也为 JSON,格式如下
{
    "jsonrpc" : 2.0,
    "result" : "Hell JSON-RPC",
    "error" : null,
    "id" : 1
}

3.1 响应对象

属性说明
jsonrpc定义 JSON-RPC 版本,必须等于 2.0。
result方法返回值,如果调用成功则必须包含此成员。如果发生错误则必须不包含此成员。其值由被调用的服务端方法决定。
error此成员在发生错误时是必要的。如果在调用时没有发生错误,则此成员必须不存在。此成员值必须是一个对象。
id此成员是必要的。它必须与对应请求对象的 id 值相同。如果在获取请求对象的 id 时发生错误,那么它必须为 NULL 值。
  • result 成员和 error 成员两者必须包含其一,且不能同时存在。

4. 错误

{
    "code" : 1,
    "message" : "Nothing found",
    "data":null
}

4.1 错误对象

属性说明
code用于指示发生错误的数字。必须是整数。
message用于简短描述该错误的字符串。该消息内容应当是一个简洁明了的单句。
data基础类型或者结构类型值,包含更多关于该错误的信息。此成员是可以省略。

4.2 错误码

代码错误含义
-32700解析错误服务器接收到无效的 JSON,服务器解析 JSON 文本发生错误。
-32600无效的请求发送的 JSON 不是一个有效的请求。
-32601方法未找到方法不存在或不可见。
-36602无效的参数无效的方法参数。
-36603内部错误JSON-RPC 内部错误。
-32000 到 -32099服务器端错误保留给具体实现服务器端错误。

5. 批量调用

  • 客户端可以发送一个请求对象数组来进行批量调用。

    • 当所有的请求都响应完毕后,服务器应以一个数组作为响应,每个请求都应该对应一个请求对象。
    • 服务器可以以任何宽度,以任意顺序,并发的批量处理一个 RPC 调用。
    • 客户端可以通过 id 将请求和响应进行匹配。
  • 请求

[
       {"jsonrpc":"2.0", "method": "sum", "params":[1,2,4], "id": "1"},
       {"jsonrpc":"2.0", "method": "notify_hello","params": [7]},
       {"jsonrpc":"2.0", "method": "subtract", "params":[42,23], "id": "2"},
       {"foo":"boo"},
       {"jsonrpc":"2.0", "method": "foo.get", "params":{"name": "myself"}, "id": "5"},
       {"jsonrpc": "2.0", "method":"get_data", "id": "9"} 
]
  • 响应
[
       {"jsonrpc":"2.0", "result": 7, "id": "1"},
       {"jsonrpc":"2.0", "result": 19, "id": "2"},
       {"jsonrpc":"2.0", "error": {"code": -32600,"message": "Invalid Request"}, "id": null},
       {"jsonrpc":"2.0", "error": {"code": -32601,"message": "Method not found"}, "id":"5"},
       {"jsonrpc":"2.0", "result": ["hello", 5], "id":"9"}
]

6. 用例说明

6.1. 列表形式参数

  • 请求
{
    "jsonrpc":"2.0",
    "method": "subtract",
    "params":[42, 23],
    "id": 1
}
  • 响应
{
    "jsonrpc":"2.0",
    "result": 19,
    "id": 1
}

6.2. key-value 形式参数

  • 请求
{
    "jsonrpc":"2.0",
    "method": "subtract",
    "params":{
                "subtrahend": 23,
                "minuend": 42
             }, 
    "id": 3
}
  • 响应
{
    "jsonrpc":"2.0",
    "result": 19,
    "id": 3
}

6.3 通知(notification)

  • 通知是一个不包含 id 的请求对象。一次通知请求意味着客户端不需要相应的应答对象,因此不需要有应答对象返回给客户端。服务端必须不能回应通知,包括那些在批量请求中的消息。
  • 通知无法通过定义确认,因为它们没有对应返回的应答对象。而且,客户端无法得知任何错误消息(比如:” Invalid params “,” Internal error “)。
{"jsonrpc":"2.0", "method": "update", "params":[1,2,3,4,5]}
{"jsonrpc": "2.0", "method":"foobar"}

6.4 错误的调用

6.4.1 无效的请求对象

  • 请求
{
    "jsonrpc":"2.0", 
    "method": 1, 
    "params": "bar"
}
  • 响应
{
    "jsonrpc": "2.0", 
    "error":{
                "code": -32600, 
                "message": "Invalid Request"
            },
    "id": null
}

6.4.2 无效的方法

  • 请求
{"jsonrpc":"2.0", "method": "foobar", "id":"1"}
  • 响应
{"jsonrpc": "2.0", "error":{"code": -32601, "message": "Method not found"},"id": "1"}

6.4.3 无效的 JSON

  • 请求
{"jsonrpc":"2.0", "method": "foobar, "params":"bar", "baz]
  • 响应
{"jsonrpc": "2.0", "error":{"code": -32700, "message": "Parse error"},"id": null}

7. 应用实例

  • 以虚拟机关机为例,调用 DestroyVDSCommand 命令。
status = getBroker().destroy(getParameters().getVmId().toString());
  • VdsManager 初始化建立与主机的连接。
    • clientTimeOut 定义客户端超时时间。
    • connectionTimeOut 定义连接的时间。
    • heartbeat 定义心跳频率。
    • clientRetries 定义客户端重试连接次数。
@PostConstruct
private void init() {
  ......
  initVdsBroker();
}
int clientTimeOut = Config.<Integer> getValue(ConfigValues.vdsTimeout) * 1000;
int connectionTimeOut = Config.<Integer> getValue(ConfigValues.vdsConnectionTimeout) * 1000;
int heartbeat = Config.<Integer> getValue(ConfigValues.vdsHeartbeatInSeconds) * 1000;
int clientRetries = Config.<Integer> getValue(ConfigValues.vdsRetries);
vdsProxy = TransportFactory.createVdsServer(
                cachedVds.getHostName(),
                cachedVds.getPort(),
                clientTimeOut,
                connectionTimeOut,
                clientRetries,
                heartbeat,
                resourceManager.getExecutor());
  • 创建 vds 代理对象 JsonRpcVdsServer
public static IVdsServer createVdsServer(
            String hostname, int port, int clientTimeOut, int connectionTimeOut, int clientRetries, int heartbeat, ScheduledExecutorService executorService) {

        HttpClient client = HttpUtils.getConnection(
                connectionTimeOut,
                clientRetries,
                Config.getValue(ConfigValues.VdsMaxConnectionsPerHost),
                Config.getValue(ConfigValues.MaxTotalConnections));

        String eventQueue = Config.getValue(ConfigValues.EventQueueName);
        return new JsonRpcVdsServer(
                JsonRpcUtils.createStompClient(
                        hostname,
                        port,
                        connectionTimeOut,
                        clientTimeOut,
                        clientRetries,
                        heartbeat,
                        Config.getValue(ConfigValues.EncryptHostCommunication),
                        Config.getValue(ConfigValues.VdsmSSLProtocol),
                        Config.getValue(ConfigValues.EventProcessingPoolSize),
                        Config.getValue(ConfigValues.VdsRequestQueueName),
                        Config.getValue(ConfigValues.VdsResponseQueueName),
                        eventQueue,
                        executorService)
                , client);
}
  • 创建 JsonRpcVdsServer 代理对象的同时,设置 engine 与 vdsm 之间的连接属性。
    • 采用了 STOMP 协议。
    • 创建 JSON-RPC 客户端对象实例 JsonRpcClient
public static JsonRpcClient createStompClient(String hostname,
            int port,
            int connectionTimeout,
            int clientTimeout,
            int connectionRetry,
            int heartbeat,
            boolean isSecure,
            String protocol,
            int parallelism,
            String requestQueue,
            String responseQueue,
            String eventQueue,
            ScheduledExecutorService executorService) {
        StompClientPolicy connectionPolicy =
                new StompClientPolicy(connectionTimeout,
                        connectionRetry,
                        heartbeat,
                        IOException.class,
                        requestQueue,
                        responseQueue);
        connectionPolicy.setEventQueue(eventQueue);

        ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, connectionRetry, heartbeat, IOException.class);
        if (Config.getValue(ConfigValues.UseHostNameIdentifier)){
            log.debug(identifierLogMessage, hostname);
            connectionPolicy.setIdentifier(hostname);
        }
        return createClient(hostname, port, connectionPolicy, clientPolicy, isSecure, ReactorType.STOMP, protocol, parallelism, executorService);
}
  • 创建响应器对象 Reactor
final Reactor reactor = ReactorFactory.getReactor(provider, type);
  • 创建客户端响应对象 ReactorClient,将其与客户端对象 JsonRpcClient 进行绑定。
    • ResponseWorker 主要作用是对下发命令的响应进行处理。
    • this.trackerResponseTracker 主要负责下发命令的响应跟踪。
    • 通过 onMessageReceived 事件跟踪命令下发后的响应信息。
ResponseWorker worker = ReactorFactory.getWorker(parallelism);
JsonRpcClient jsonClient = worker.register(client);
public JsonRpcClient register(ReactorClient client) {
        final JsonRpcClient jsonRpcClient = new JsonRpcClient(client, this.tracker);
        client.addEventListener(new MessageListener() {

            @Override
            public void onMessageReceived(byte[] message) {
                queue.add(new MessageContext(jsonRpcClient, message));
            }
        });
        return jsonRpcClient;
}
  • 使用了 JsonRpcVdsServer 代理,下发 vdsm 命令。
    • 这里下发的是虚拟机关机命令。
@Override
public StatusOnlyReturn destroy(String vmId) {
    JsonRpcRequest request = new RequestBuilder("VM.destroy").withParameter("vmID", vmId).build();
    Map<String, Object> response = new FutureMap(this.client, request);
    return new StatusOnlyReturn(response);
}
  • JsonRpcRequest 类组装 JSON-RPC 请求。
    • new RequestBuilder("VM.destroy") 组装请求方法,方法名 VM.destroy
    • withParameter("vmID", vmId) 定义参数。
    • build() 组装请求对象。
public JsonRpcRequest build() {
     final TextNode id = this.parameters.textNode(UUID.randomUUID().toString());
     return new JsonRpcRequest(this.methodName, this.parameters, id);
}
  • 执行 RPC 请求。
Map<String, Object> response = new FutureMap(this.client, request);
public Future<JsonRpcResponse> call(JsonRpcRequest req) throws ClientConnectionException {
        final Call call = new Call(req);
        this.tracker.registerCall(req, call);
        retryCall(req, call);
        try {
            this.getClient().sendMessage(jsonToByteArray(req.toJson()));
        } finally {
            retryCall(req, call);
        }
        return call;
}
  • 通过 req.toJson() 方法组装请求的 JSON 对象。
public JsonNode toJson() {
        ObjectNode node = MAPPER.createObjectNode();
        node.put("jsonrpc", "2.0");
        if (getMethod() == null) {
            node.putNull("method");
        } else {
            node.put("method", getMethod());
        }
        if (getParams() == null) {
            node.putNull("params");
        } else {
            node.put("params", getParams());
        }
        if (getId() == null) {
            node.putNull("id");
        } else {
            node.put("id", getId());
        }
        return node;
}
  • 请求对象格式。
[
   {
    "jsonrpc":"2.0",
    "method": "VM.destroy",
    "params":{
                "vmID": <vmId>
             }, 
    "id": <UUID.randomUUID()>
   }
]
  • 将 JSON-RPC 调用放入缓存。
public void registerCall(JsonRpcRequest req, JsonRpcCall call) {
     if (this.runningCalls.putIfAbsent(req.getId(), call) != null) {
          throw new RequestAlreadySentException();
     }
}
  • 下发请求与响应。
public Future<JsonRpcResponse> call(JsonRpcRequest req) throws ClientConnectionException {
     final Call call = new Call(req);
     this.tracker.registerCall(req, call);
     retryCall(req, call);
     try {
         this.getClient().sendMessage(jsonToByteArray(req.toJson()));
     } finally {
         retryCall(req, call);
     }
     return call;
}
  • 创建下发命令的连接。
public ReactorClient getClient() throws ClientConnectionException {
        if (this.client.isOpen()) {
            return this.client;
        }
        this.client.connect();
        return this.client;
}
  • 获得 SocketChannel 对象 。
public void connect() throws ClientConnectionException {
        if (isOpen()) {
            return;
        }
        try (LockWrapper wrapper = new LockWrapper(this.lock)) {
            if (isOpen() && isInInit()) {
                getPostConnectCallback().await(policy.getRetryTimeOut(), policy.getTimeUnit());
            }
            if (isOpen()) {
                return;
            }
            final FutureTask<SocketChannel> task = scheduleTask(new Retryable<>(() -> {
                InetAddress address = InetAddress.getByName(hostname);
                log.info("Connecting to " + address);

                final InetSocketAddress addr = new InetSocketAddress(address, port);
                final SocketChannel socketChannel = SocketChannel.open();

                socketChannel.configureBlocking(false);
                socketChannel.connect(addr);

                return socketChannel;
            }, this.policy));
            this.channel = task.get();
......
  • 执行下发的命令。
final long timeout = getTimeout(policy.getRetryTimeOut(), policy.getTimeUnit());
while (!this.channel.finishConnect()) {

        final FutureTask<SocketChannel> connectTask = scheduleTask(new Retryable<>(() -> {
                    if (System.currentTimeMillis() >= timeout) {
                        throw new ConnectException("Connection timeout");
                    }
                    return null;
         }, this.policy));
         connectTask.get();
}
  • 命令下发到 vdsm,建立下发命令与实际执行方法的映射关系(在节点上执行)。
    • vdsm/jsonrpcvdscli.py 文件中定义。
    • VM.destroydestroy 对应。
_COMMAND_CONVERTER = {
......
'destroy': 'VM.destroy',
......
}
  • 解析 JSON 参数对象,执行映射方法(在节点上执行)。
def _callMethod(self, methodName, *args, **kwargs):
        try:
            method = _COMMAND_CONVERTER[methodName]
        except KeyError as e:
            raise Exception("Attempt to call function: %s with "
                            "arguments: %s error: %s" %
                            (methodName, args, e))

        class_name, method_name = method.split('.')
        timeout = kwargs.pop('_transport_timeout', self._default_timeout)
        params = self._prepare_args(class_name, method_name, args, kwargs)

        req = JsonRpcRequest(method, params, reqId=str(uuid4()))

        responses = self._client.call(
            req, timeout=self._timeouts.get(method_name, timeout))
        if responses:
            resp = responses[0]
        else:
            raise JsonRpcNoResponseError(method=method)

        if resp.error is not None:
            return response.error_raw(resp.error.code, str(resp.error))

        if not self._xml_compat:
            return response.success_raw(resp.result)

        if resp.result and resp.result is not True:
            # None is translated to True inside our JSONRPC implementation
            if isinstance(resp.result, list):
                return response.success(items=resp.result)
            elif isinstance(resp.result, six.string_types):
                return response.success(resp.result)
            else:
                return response.success(**resp.result)

        return response.success()
  • 这里执行了断电 destroy 方法,并返回结果(在节点上执行)。
    • vdsm/virt/vm.py 文件中定义。
def destroy(self, gracefulAttempts=1,
                reason=vmexitreason.ADMIN_SHUTDOWN):
        self.log.debug('destroy Called')

        result = self.doDestroy(gracefulAttempts, reason)
        if response.is_error(result):
            return result
        # Clean VM from the system
        self._deleteVm()

        return response.success()
  • 创建命令下发超时,重发命令的参数对象 ResponseTracking
private void retryCall(final JsonRpcRequest request, final JsonRpcCall call) throws ClientConnectionException {
        ResponseTracking tracking = new ResponseTracking(request, call, new RetryContext(policy), getTimeout(this.policy.getRetryTimeOut(), this.policy.getTimeUnit()), this.client, !Objects.equals(request.getMethod(), "Host.ping"));
        this.tracker.registerTrackingRequest(request, tracking);
}
  • 建立请求与 ResponseTracking 的映射关系。
public void registerTrackingRequest(JsonRpcRequest req, ResponseTracking tracking) {
        JsonNode id = req.getId();
        List<JsonNode> nodes = new CopyOnWriteArrayList<>();
        try (LockWrapper wrapper = new LockWrapper(this.lock)) {
            this.map.put(id, tracking);
            this.queue.add(id);
            nodes.add(id);
            nodes = this.hostToId.putIfAbsent(tracking.getClient().getClientId(), nodes);
            if (nodes != null && !nodes.contains(id)) {
                nodes.add(id);
            }
        }
}
  • 响应处理类 ResponseWorker 根据下发命令反馈,解析响应结果。
    • 同时启动 ResponseTracker 线程,用于跟踪命令响应,超时后进行重发。
public void run() {
        MessageContext context = null;
        while (true) {
            try {
                context = this.queue.take();
                if (context.getClient() == null) {
                    break;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Message received: " + new String(context.getMessage(), UTF8));
                }
                JsonNode rootNode = MAPPER.readTree(context.getMessage());
                if (!rootNode.isArray()) {
                    processIncomingObject(context.getClient(), rootNode);
                } else {
                    final Iterator<JsonNode> iter = rootNode.getElements();
                    while (iter.hasNext()) {
                        final JsonNode node = iter.next();
                        processIncomingObject(context.getClient(), node);
                    }
                }
            } catch (Exception e) {
                log.warn("Exception thrown during message processing");
                if (log.isDebugEnabled()) {
                    log.debug(e.getMessage(), e);
                }
                continue;
            }
        }
}
private void processIncomingObject(JsonRpcClient client, JsonNode node) {
......
   client.processResponse(JsonRpcResponse.fromJsonNode(node));
}
  • ResponseTracker 命令下发超时后的重发处理。
protected void loop() {
        for (JsonNode id : queue) {
            if (!this.runningCalls.containsKey(id)) {
                removeRequestFromTracking(id);
                continue;
            }
            ResponseTracking tracking = this.map.get(id);
            if (System.currentTimeMillis() >= tracking.getTimeout()) {
                RetryContext context = tracking.getContext();
                context.decreaseAttempts();
                if (context.getNumberOfAttempts() <= 0) {
                    handleFailure(tracking, id);
                    continue;
                }
                try {
                    tracking.getClient().sendMessage(jsonToByteArray(tracking.getRequest().toJson()));
                } catch (ClientConnectionException e) {
                    handleFailure(tracking, id);
                }
                tracking.setTimeout(getTimeout(context.getTimeout(), context.getTimeUnit()));
            }
        }
}
  • 命令未超时,组装响应对象 JsonRpcResponse
public static JsonRpcResponse fromJsonNode(JsonNode node) {
        JsonNode jsonrpcNode = node.get("jsonrpc");
        if (jsonrpcNode == null) {
            throw new IllegalArgumentException(
                    "'jsonrpc' field missing in node");
        }

        String version = jsonrpcNode.asText();
        if (version == null || !version.equals("2.0")) {
            throw new IllegalArgumentException("Only jsonrpc 2.0 is supported");
        }

        final JsonNode id = node.get("id");
        if (id == null) {
            throw new IllegalArgumentException("Response missing id field");
        }

        return new JsonRpcResponse(node.get("result"), node.get("error"), id);
}
  • 响应对象格式。
[
   {
    "jsonrpc":"2.0",
    "result": {
                     "status": {
                                       "code": 0,
                                       "message": "msg"
                                }
              },
    "id": <UUID.randomUUID()>
   }
]
  • 获取 result 部分,转换格式为 Map<String, Object>
call.getCallback().onResponse(new JsonResponseUtil().populate(response))
public FutureMap(JsonRpcClient client, JsonRpcRequest request) {
     try {
         this.response = client.call(request);
         this.client = client;
     } catch (ClientConnectionException e) {
         throw new TransportRunTimeException("Connection issues during send request", e);
     }
}
  • 业务层解析返回的 result 结果。
@Override
public StatusOnlyReturn destroy(String vmId) {
     ......
     return new StatusOnlyReturn(response);
}
@SuppressWarnings("unchecked")
public StatusOnlyReturn(Map<String, Object> innerMap) {
        Map<String, Object> statusMap = (Map<String, Object>) innerMap.get(STATUS);
        status = new Status(statusMap);
}

7.1 实例总结

《【Ovirt 笔记】engine 与 vdsm 之间的调用分析 JSON-RPC》 代码调用流程图

  • 图例说明
    • 其中紫色为重点关注对象。
    • 绿色为缓存和队列。
    • 虚线框为引用对象。
    • 数字标识调用时序。
对象说明
JsonRpcVdsServer用于执行 vdsm 命令的代理服务器,同时创建 JsonRpcClient 对象与之对应。
JsonRpcClientJSON-RPC 客户端对象,向服务端发送和接收 JSON 数据。
JsonRpcRequestJSON-RPC 的请求对象,封装请求的 JSON 数据。
JsonRpcResponseJSON-RPC 的响应对象,封装响应的 JSON 数据。
Call用于记录命令下发的过程,同时包含了请求和响应对象(JsonRpcRequest 和 JsonRpcResponse)。
ReactorClientJSON-RPC 客户端对象反应器,绑定消息反馈事件,当 RPC 请求得到响应回馈时触发事件反应(将结果交由 ResponseWorker 处理)。
ResponseWorker响应工作者,独立的响应处理线程,用于处理响应的消息(由 ReactorClient 获得后放入队列 queue 中)。同时启动 ReponseTracker 线程。
ReponseTracking响应跟踪对象,用于设置和保存,命令超时后的重发命令的参数。
ReponseTracker响应跟踪者,用于处理 ReponseTracking,建立了 ReponseTracking 与 JsonRpcRequest 的映射关系(map 缓存)。同时还建立了 JsonRpcRequest 与 Call 的映射关系(runningCalls 缓存)。同时还处理了命令超时后的重发。
    原文作者:羽杰
    原文地址: https://www.jianshu.com/p/24768f95dfe9
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞