chuck-lua内置了基于coroutine的RPC支持,所有的远程方法调用都跟调用本地方法一样简单.下面先来看一个简单的示例。
rpcserver.lua
local Task = require("distri.uthread.task")
local Distri = require("distri.distri")
local Redis = require("distri.redis")
local Socket = require("distri.socket")
local chuck = require("chuck")
local RPC = require("distri.rpc")
local Packet = chuck.packet
local config = RPC.Config(
function (data) --encoder
local wpk = Packet.wpacket(512)
wpk:WriteTab(data)
return wpk
end,
function (packet) --decoder
return packet:ReadTab()
end)
local rpcServer = RPC.Server(config)
rpcServer:RegService("hello",function ()
return "world"
end)
local server = Socket.stream.Listen("127.0.0.1",8010,function (s,errno)
if s then
s:Ok(4096,Socket.stream.decoder.rpacket(4096),function (_,msg,errno)
if msg then
rpcServer:ProcessRPC(s,msg)
else
s:Close()
s = nil
end
end)
end
end)
if server then
Distri.Signal(chuck.signal.SIGINT,Distri.Stop)
Distri.Run()
end
rpcclient.lua
local Task = require("distri.uthread.task")
local Distri = require("distri.distri")
local Redis = require("distri.redis")
local Socket = require("distri.socket")
local chuck = require("chuck")
local RPC = require("distri.rpc")
local Packet = chuck.packet
local Sche = require("distri.uthread.sche")
local config = RPC.Config(
function (data) --encoder
local wpk = Packet.wpacket(512)
wpk:WriteTab(data)
return wpk
end,
function (packet) --decoder
return packet:ReadTab()
end)
local rpcClient = RPC.Client(config)
local c = 0
if Socket.stream.Connect("127.0.0.1",8010,function (s,errno)
if s then
if not s:Ok(4096,Socket.stream.decoder.rpacket(4096),
function (_,msg,errno)
if msg then
c = c + 1
RPC.OnRPCResponse(config,s,msg)
else
print("close")
s:Close()
s = nil
end
end)
then
return
end
rpcClient:Connect(s)
for i = 1,100 do
Task.New(function ()
while true do
local err,ret = rpcClient:Call("hello")
if ret ~= "world" then
print("err")
break
end
end
end)
end
end
end) then
Distri.Signal(chuck.signal.SIGINT,Distri.Stop)
Distri.Run()
end
首先需要了解的是RPC.Config
,它是RPC的配置对象:
local rpc_config = {}
function rpc_config:new(encoder,decoder,serializer,unserializer)
if not encoder or not decoder then
return nil
end
local o = {}
o.__index = rpc_config
setmetatable(o, o)
o.decoder = decoder --将数据从packet中取出
o.encoder = encoder --使用特定封包格式将数据封装到packet中
o.serializer = serializer --将lua table序列化成传输格式,可以为空
o.unserializer = unserializer --将传输格式的数据反序列化成lua table,可以为空
return o
end
其中提供了4个重要的方法,这个对象将会被传递给RPC.Client
和RPC.Server
供他们将调用的参数。
在上面的示例中,我只提供了decoder
和encoder
.下面简单介绍下这4个方法的作用.
serializer: rpc调用的参数和返回值都通过luatable包裹.这个函数用于将这个被包裹的table序列化成一种公共传输格式,例如json.其输出对象将会被提供给encoder.
unserializer: 与serializer的作用正好相反,将公共传输格式反序列化成luatable.其输出对象将会被提供给decoder.如果不提供这两个函数,则直接将luatable提供给encoder/decoder.
encoder: 将传输数据封装成网络包,供Send发送.
decoder: 从网络包中提取出传输的数据.
在上面的示例中,我使用了内置的一种packet.直接将luatable写入到packet中.
示例中需要注意的另外一点是,rpcClient:Call("hello")
必须在coroutine上下文中使用,否则将会返回错误.而 rpcServer:ProcessRPC(s,msg)
则不是必须的,但如果在RPC请求的处理函数中需要执行一些阻塞调用,例如请求别的远程方法,或请求redis数据.则必须将rpcServer:ProcessRPC(s,msg)
放在coroutine下执行.
如果在一条链路上只涉及到RPC请求,那么完全可以对rpcclient做一个简单的封装,省得每次都手动执行所有的初始化步骤,例如:
local WrapRPCClient = {}
function WrapRPCClient:new(config)
local o = {}
o.__index = actor
setmetatable(o,o)
o.rpc = RPC.Client(config)
return o
end
function WrapRPCClient:Connect(host,port,on_success)
local config = self.rpc.config
if Socket.stream.Connect("127.0.0.1",8010,function (s,errno)
if s then
if not s:Ok(4096,Socket.stream.decoder.rpacket(4096),
function (_,msg,errno)
if msg then
RPC.OnRPCResponse(config,s,msg)
else
print("close")
s:Close()
s = nil
end
end)
then
return
end
rpcClient:Connect(s)
on_success()
end
end) then
return true
end
end
function WrapRPCClient:Call(func,...)
return self.rpc:Call(func,...)
end
有了这个封装之后,只需要调用Connect
,当on_success回调之后就可以直接使用Call调用远程方法了.