chuck-lua中的RPC

chuck-lua内置了基于coroutine的RPC支持,所有的远程方法调用都跟调用本地方法一样简单.下面先来看一个简单的示例。

rpcserver.lua

local Task   = require("distri.uthread.task")
local Distri = require("distri.distri")
local Redis  = require("distri.redis")
local Socket = require("distri.socket")
local chuck  = require("chuck")
local RPC    = require("distri.rpc")
local Packet = chuck.packet

local config = RPC.Config(
function (data)                            --encoder
    local wpk = Packet.wpacket(512)
    wpk:WriteTab(data)
    return wpk
end,
function (packet)                          --decoder
    return packet:ReadTab()
end)

local rpcServer = RPC.Server(config)
rpcServer:RegService("hello",function ()
    return "world"
end)

local server = Socket.stream.Listen("127.0.0.1",8010,function (s,errno)
    if s then
        s:Ok(4096,Socket.stream.decoder.rpacket(4096),function (_,msg,errno)
            if msg then
                rpcServer:ProcessRPC(s,msg)
            else
                s:Close()
                s = nil
            end
        end)
    end
end)

if server then
    Distri.Signal(chuck.signal.SIGINT,Distri.Stop)
    Distri.Run()
end

rpcclient.lua

local Task   = require("distri.uthread.task")
local Distri = require("distri.distri")
local Redis  = require("distri.redis")
local Socket = require("distri.socket")
local chuck  = require("chuck")
local RPC    = require("distri.rpc")
local Packet = chuck.packet
local Sche  = require("distri.uthread.sche")

local config = RPC.Config(
function (data)                           --encoder                        
    local wpk = Packet.wpacket(512)
    wpk:WriteTab(data)
    return wpk
end,
function (packet)                         --decoder
    return packet:ReadTab()
end)

local rpcClient = RPC.Client(config)

local c = 0
if Socket.stream.Connect("127.0.0.1",8010,function (s,errno)
    if s then
        if not s:Ok(4096,Socket.stream.decoder.rpacket(4096),
                    function (_,msg,errno)
                        if msg then
                            c = c + 1
                            RPC.OnRPCResponse(config,s,msg)
                        else
                            print("close")
                            s:Close()
                            s = nil
                        end
                    end) 
        then
            return
        end
        rpcClient:Connect(s)
        for i = 1,100 do
            Task.New(function ()
                while true do
                    local err,ret = rpcClient:Call("hello")
                    if ret ~= "world" then
                        print("err")
                        break
                    end
                end
            end)
        end
    end
end) then
    Distri.Signal(chuck.signal.SIGINT,Distri.Stop)
    Distri.Run()
end

首先需要了解的是RPC.Config,它是RPC的配置对象:

local rpc_config = {}

function rpc_config:new(encoder,decoder,serializer,unserializer)
    if not encoder or not decoder then
        return nil
    end
    local o        = {}   
    o.__index      = rpc_config
    setmetatable(o, o)
    o.decoder      = decoder        --将数据从packet中取出
    o.encoder      = encoder        --使用特定封包格式将数据封装到packet中      
    o.serializer   = serializer     --将lua table序列化成传输格式,可以为空
    o.unserializer = unserializer   --将传输格式的数据反序列化成lua table,可以为空
    return o
end

其中提供了4个重要的方法,这个对象将会被传递给RPC.ClientRPC.Server供他们将调用的参数。

在上面的示例中,我只提供了decoderencoder.下面简单介绍下这4个方法的作用.

  • serializer: rpc调用的参数和返回值都通过luatable包裹.这个函数用于将这个被包裹的table序列化成一种公共传输格式,例如json.其输出对象将会被提供给encoder.

  • unserializer: 与serializer的作用正好相反,将公共传输格式反序列化成luatable.其输出对象将会被提供给decoder.如果不提供这两个函数,则直接将luatable提供给encoder/decoder.

  • encoder: 将传输数据封装成网络包,供Send发送.

  • decoder: 从网络包中提取出传输的数据.

在上面的示例中,我使用了内置的一种packet.直接将luatable写入到packet中.

示例中需要注意的另外一点是,rpcClient:Call("hello")必须在coroutine上下文中使用,否则将会返回错误.而 rpcServer:ProcessRPC(s,msg)则不是必须的,但如果在RPC请求的处理函数中需要执行一些阻塞调用,例如请求别的远程方法,或请求redis数据.则必须将rpcServer:ProcessRPC(s,msg)放在coroutine下执行.

如果在一条链路上只涉及到RPC请求,那么完全可以对rpcclient做一个简单的封装,省得每次都手动执行所有的初始化步骤,例如:

local WrapRPCClient = {}

function WrapRPCClient:new(config)
  local o = {}
  o.__index = actor      
  setmetatable(o,o)
  o.rpc    = RPC.Client(config)
  return o
end

function WrapRPCClient:Connect(host,port,on_success)
    local config = self.rpc.config
    if Socket.stream.Connect("127.0.0.1",8010,function (s,errno)
        if s then
            if not s:Ok(4096,Socket.stream.decoder.rpacket(4096),
                        function (_,msg,errno)
                            if msg then
                                RPC.OnRPCResponse(config,s,msg)
                            else
                                print("close")
                                s:Close()
                                s = nil
                            end
                        end) 
            then
                return
            end
            rpcClient:Connect(s)
            on_success()
        end
    end) then
        return true
    end 
end

function WrapRPCClient:Call(func,...)
    return self.rpc:Call(func,...)
end

有了这个封装之后,只需要调用Connect,当on_success回调之后就可以直接使用Call调用远程方法了.

https://github.com/sniperHW/chuck

    原文作者:sniperHW
    原文地址: https://segmentfault.com/a/1190000002956059
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞