chuck-lua使用的是单线程模型,依赖于底层高效率的事件回调框架.从前文介绍过的使用示例中可以看出,基本接口与node.js类似,大量依赖方法回调.
对于lua这种支持coroutine的语言,使用coroutine来将异步回调转换成同步接口是很方便的.
chuck-lua提供了两种使用coroutine解决异步问题的方式:
- 直接使用coroutine
- coroutine pool配合任务队列
首先来看一个例子,看下如何将异步的redis事件转换成同步接口:
redis.lua
local chuck = require("chuck")
local engine = require("distri.engine")
local Sche = require("distri.uthread.sche")
local LinkQue = require("distri.linkque")
local map = {}
local client = {}
function client:new(c)
local o = {}
o.__index = client
o.__gc = function() print("redis_client gc") end
setmetatable(o,o)
o.c = c
o.pending = LinkQue:New()
return o
end
function client:Close(err)
local co
while true do
co = self.pending:Pop()
if co then
co = co[1]
Sche.WakeUp(co,false,err)
else
map[self.c] = nil
self.c:Close()
self.c = nil
return
end
end
end
function client:Do(cmd)
local co = Sche.Running()
if not co then return "should call in a coroutine context " end
if not self.c then return "client close" end
if "ok" == self.c:Execute(cmd,function (_,reply)
Sche.WakeUp(co,true,reply)
end) then
local node = {co}
self.pending:Push(node)
--[[
如果succ == true,则reply是操作结果,
否则,reply是传递给Close的err值
]]--
local succ,reply = Sche.Wait()
self.pending:Remove(node)
if succ then
return nil,reply
else
return reply
end
end
return "error"
end
local redis = {}
function redis.Connect(ip,port,on_error)
local err,c = chuck.redis.Connect(engine,ip,port,function (_,err)
local c = map[_]
if c then
on_error(c,err)
end
end)
if c then
return err,client:new(c)
else
return err
end
end
return redis
redis.lua封装了异步事件接口,向用户提供了同步的调用方式,唯一的使用约束是redis:Do必须在coroutine上下文中才能被使用.
我们首先看下如何在第一种方式下使用这个接口:
local Distri = require("distri.distri")
local Redis = require("distri.redis")
local err,client = Redis.Connect("127.0.0.1",6379)
if client then
local function co_fun(i)
local cmd = string.format("hmget chaid:%d chainfo skills",i)
local err,reply = client:Do(cmd)
if reply then
for k,v in pairs(reply) do
print(k,v)
end
end
end
for i = 1,1000 do
Sche.Spawn(co_fun,i)
end
Distri.Run()
end
在这种模式下,每个redis任务都由一个单独的coroutine直接执行.
接下来再看下如何利用pool和任务队列完成同样的效果:
local Distri = require("distri.distri")
local Redis = require("distri.redis")
local Task = require("distri.uthread.task")
local err,client = Redis.Connect("127.0.0.1",6379)
if client then
for i = 1,1000 do
Task.New(function ()
local cmd = string.format("hmget chaid:%d chainfo skills",i)
local err,reply = client:Do(cmd)
if reply then
for k,v in pairs(reply) do
print(k,v)
end
end
end)
end
Distri.Run()
end
对于每个任务,使用Task.New创建一个任务,任务被创建之后会被添加到任务队列尾部,预先创建的pool中的coroutine将会被唤醒,从队列中提取任务并执行.
最后,再来看一下对于网络消息,如何利用Task处理任务.
local Task = require("distri.uthread.task")
local Distri = require("distri.distri")
local Redis = require("distri.redis")
local Socket = require("distri.socket")
local Packet = require("chuck").packet
local clone = Packet.clone
local err,client = Redis.Connect("127.0.0.1",6379)
if client then
local server = Socket.stream.listen("127.0.0.1",8010,function (s,errno)
if s then
if s:Ok(4096,Socket.stream.rawdecoder,Task.Wrap(function (_,msg,errno)
if msg then
local cmd = string.format("hmget chaid:%d chainfo skills",1)
local err,reply = client:Do(cmd)
local result = ""
if reply then
for k,v in pairs(reply) do
result = result .. v .. "\n"
end
else
result = "error\n"
end
s:Send(Packet.rawpacket(result))
else
s:Close()
s = nil
end
end),client) then
s:SetRecvTimeout(5000)
end
end
end)
if server then
Distri.Run()
end
end
这是一个简单的echo服务,当用户连接上服务器发送消息,服务器收到消息之后提交一个redis请求,并将结果返回给客户.
这里的关键点是使用Task.Wrap封装了事件回调函数.也就是说,对网络消息的处理也是在coroutine上下中执行的.因此在事件回调中的阻塞并不会导致无法响应其它到来的并发请求.