chuck-lua的并发处理

chuck-lua使用的是单线程模型,依赖于底层高效率的事件回调框架.从前文介绍过的使用示例中可以看出,基本接口与node.js类似,大量依赖方法回调.

对于lua这种支持coroutine的语言,使用coroutine来将异步回调转换成同步接口是很方便的.

chuck-lua提供了两种使用coroutine解决异步问题的方式:

  • 直接使用coroutine
  • coroutine pool配合任务队列

首先来看一个例子,看下如何将异步的redis事件转换成同步接口:

redis.lua

local chuck   = require("chuck")
local engine  = require("distri.engine")
local Sche    = require("distri.uthread.sche")
local LinkQue = require("distri.linkque")

local map = {}

local client = {}

function client:new(c)
  local o = {}
  o.__index = client
  o.__gc    = function() print("redis_client gc") end      
  setmetatable(o,o)
  o.c = c
  o.pending = LinkQue:New()
  return o
end

function client:Close(err)
    local co
    while true do
        co = self.pending:Pop()
        if co then
            co = co[1]
            Sche.WakeUp(co,false,err)
        else
            map[self.c] = nil
            self.c:Close()
            self.c = nil
            return
        end
    end
end

function client:Do(cmd)
    local co = Sche.Running()

    if not co then return "should call in a coroutine context " end
    if not self.c then return "client close" end

    if "ok" == self.c:Execute(cmd,function (_,reply)
        Sche.WakeUp(co,true,reply)
    end) then
        local node = {co}
        self.pending:Push(node)
        --[[
            如果succ == true,则reply是操作结果,
            否则,reply是传递给Close的err值
        ]]--
        local succ,reply = Sche.Wait()
        self.pending:Remove(node)
        if succ then
            return nil,reply
        else
            return reply
        end 
    end
    return "error"
end

local redis = {}

function redis.Connect(ip,port,on_error)
    local err,c = chuck.redis.Connect(engine,ip,port,function (_,err)
        local c = map[_]
        if c then
            on_error(c,err)
        end
    end)
    if c then
        return err,client:new(c)
    else
        return err
    end
end

return redis

redis.lua封装了异步事件接口,向用户提供了同步的调用方式,唯一的使用约束是redis:Do必须在coroutine上下文中才能被使用.

我们首先看下如何在第一种方式下使用这个接口:

local Distri = require("distri.distri")
local Redis  = require("distri.redis")

local err,client = Redis.Connect("127.0.0.1",6379)

if client then

    local function co_fun(i)
        local cmd = string.format("hmget chaid:%d chainfo skills",i)    
        local err,reply = client:Do(cmd)
        if reply then
            for k,v in pairs(reply) do
                print(k,v)
            end
        end 
    end

    for i = 1,1000 do
        Sche.Spawn(co_fun,i)
    end
    Distri.Run()
end 

在这种模式下,每个redis任务都由一个单独的coroutine直接执行.

接下来再看下如何利用pool和任务队列完成同样的效果:

local Distri = require("distri.distri")
local Redis  = require("distri.redis")
local Task   = require("distri.uthread.task")

local err,client = Redis.Connect("127.0.0.1",6379)

if client then
    for i = 1,1000 do
        Task.New(function ()
            local cmd = string.format("hmget chaid:%d chainfo skills",i)    
            local err,reply = client:Do(cmd)
            if reply then
                for k,v in pairs(reply) do
                    print(k,v)
                end
            end
        end)
    end
    Distri.Run()
end 

对于每个任务,使用Task.New创建一个任务,任务被创建之后会被添加到任务队列尾部,预先创建的pool中的coroutine将会被唤醒,从队列中提取任务并执行.

最后,再来看一下对于网络消息,如何利用Task处理任务.

local Task   = require("distri.uthread.task")
local Distri = require("distri.distri")
local Redis  = require("distri.redis")
local Socket = require("distri.socket")
local Packet = require("chuck").packet
local clone     = Packet.clone

local err,client = Redis.Connect("127.0.0.1",6379)

if client then
    local server = Socket.stream.listen("127.0.0.1",8010,function (s,errno)
        if s then
            if s:Ok(4096,Socket.stream.rawdecoder,Task.Wrap(function (_,msg,errno)
                    if msg then                     
                        local cmd = string.format("hmget chaid:%d chainfo skills",1)    
                        local err,reply = client:Do(cmd)
                        local result = ""
                        if reply then
                            for k,v in pairs(reply) do
                                result = result .. v .. "\n"
                            end
                        else
                            result = "error\n"
                        end                 
                        s:Send(Packet.rawpacket(result))
                    else
                        s:Close()
                        s = nil
                    end
            end),client) then
                s:SetRecvTimeout(5000)
            end
        end     
    end)

    if server then
        Distri.Run()
    end 
end

这是一个简单的echo服务,当用户连接上服务器发送消息,服务器收到消息之后提交一个redis请求,并将结果返回给客户.

这里的关键点是使用Task.Wrap封装了事件回调函数.也就是说,对网络消息的处理也是在coroutine上下中执行的.因此在事件回调中的阻塞并不会导致无法响应其它到来的并发请求.

https://github.com/sniperHW/chuck.git

    原文作者:sniperHW
    原文地址: https://segmentfault.com/a/1190000002949569
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞