匹配队列问题

匹配队列问题

最近在做视频聊天平台,想让他支持随机配对,做一个匹配队列。要求每个客户端到达后加入队列,匹配时随机找到
匹配目标。并且能够实现,广播所有人正在匹配的人数,在客户端离开后,也能及时清除垃圾。

最初的设想–无头苍蝇乱撞

最初的设计方案,采用每个客户端一个线程,同时每个客户端连接的时候,进入队列,客户端匹配的时候,更改状态
为正在匹配,并在队列中搜寻目标。如果搜寻到了目标,先试图获得自己的锁,如果获取失败,则寻找下一个,如果
成功,则再去获得目标的锁,获得成功的话即可配对,更改两者的状态和目标指针。这样做的目的就是为了避免你找
到的目标也找到了别人(你的目标找到别人的话,它自己的锁就被占用了),也避免别的人同时找到了你的目标(这
样的话,别的人会占用你的目标的锁),也避免自己被别人(非你的目标)找到(自己被加锁,别人也无法配对自己
)。如果在这个过程中自己的状态变成了离开,则释放拿到的锁,并离开。但是这种方式会造成很多问题:

  1. 在遍历队列时要加共享锁,这样的话,新来的客户端都进入不了队列(互斥锁)
  2. 匹配速度慢,涉及很多锁,而且多个客户端都在运行,会有很多线程运行,开销和竞争都很大
  3. 垃圾回收困难,不可能每次删除垃圾都要移动整个队列,只能定期去,而定期又存在竞争必须加锁,而且捡垃圾回
    收时,会卡住正常的匹配好久

改善方案–托管管理员

多个客户端线程如果都进行匹配,那如同一堆苍蝇在苍蝇群里面找对象,然后疯狂乱撞去配对,而且很容易多个人扎在
一起造成混乱,效率极低。但是这个时候如果有个组织者站起来,让所有想匹配的苍蝇举手,然后他去一对一对的抽取
对象,让他们配对。如果组织者太少,速度可能会很慢,这样的话,就需要当要匹配的苍蝇较多时,就多找两个组织者
。实现方法是建立一个组织者线程,去配对客户端,他分别轮询,一个一个拿到客户端的锁,拿到两个就配对。而且轮
询时可以承担广播的任务,广播的时候也可以得到已经关闭的客户端,这时候将其标记起来,进行垃圾回收。

容器采用链表,这样就可以加更小粒度的锁。当某个链节的下一节需要变更,就给此链节加上锁(比如新来的人,在链
表尾部加上锁),被上锁的链节不能操作数据,遍历数据到这个链节后不能再继续遍历(重新遍历),防止出现错误。
具体实现参照:Show

这种实现方案性能不错,目前试验还未出较大bug。

// Point struct stands for a specified client
type Point struct {
    Conn     *websocket.Conn
    Mutex    sync.Mutex // Protect websocket conn
    Status   int
    Pair     *Point
    Chan     chan bool               // When pairing, Chan is needed to avoid conflicts
    ListChan chan bool               // Protect the list when mapping When a chain was locked, its value cannot be read
    OnPair   func(self, pair *Point) //Event handler when pair is ok
}

func Manager() {
    atomic.AddInt32(&Manager_num, 1)
    log.Println("Manager Start")
    defer log.Println("Manager die")
    for atomic.LoadInt32(&Pairing_num) > 1 || atomic.LoadInt32((&Manager_num)) == 1 {

        //Manager number control
        if atomic.LoadInt32(&Pairing_num) > 50 {
            go Manager()
        }
        if atomic.LoadInt32(&Pairing_num)/atomic.LoadInt32(&Manager_num) < 20 && atomic.LoadInt32(&Manager_num) > 1 {
            atomic.AddInt32(&Manager_num, -1)
            return
        }

        //Pick broadcast message
        var msg map[string]interface{}
        select {
        case msg = <-BroadCastMessage:
        default:
        }

        Laji := make([]*list.Element, 0)
        if msg != nil {
            for l := Points.Front(); l != nil; l = l.Next() {
                p := l.Value.(*Point)
                if p.Status != P2P_POINT_CLOSE {
                    p.Push(msg)
                } else {
                    Laji = append(Laji, l)
                }
            }
            for _, l := range Laji {
                var p *Point
                if l.Prev() != nil {
                    p = l.Prev().Value.(*Point)
                } else {
                    p = l.Value.(*Point)
                }
                select {
                case p.Chan <- true:
                    log.Println("Clean Laji")
                    Points.Remove(l)
                    <-p.Chan
                default:
                }

            }
        }

        var p0 *Point
        l := Points.Front()
        for ; l != nil && (p0 == nil || p0.Status == P2P_POINT_PAIRING); l = l.Next() {
            p := l.Value.(*Point)
            select {
            case p.ListChan <- true:
                <-p.ListChan
            default:
                if p0 != nil {
                    <-p0.Chan
                }
                goto bbreak
            }
            if p.Status == P2P_POINT_PAIRING {
                if p0 == nil {
                    select {
                    case p.Chan <- true:
                        if p.Status != P2P_POINT_PAIRING {
                            <-p.Chan
                            continue
                        }
                        p0 = p
                        continue
                    default:
                        continue
                    }
                }
                select {
                case p.Chan <- true:
                    if p0.Status != P2P_POINT_PAIRING {
                        <-p0.Chan
                        <-p.Chan
                        p0 = nil
                        goto bbreak
                    }
                    if p.Status != P2P_POINT_PAIRING {
                        <-p.Chan
                        continue
                    }
                    p0.Status = P2P_POINT_OFFER
                    p.Status = P2P_POINT_ANSWER
                    p0.Pair = p
                    p.Pair = p0
                    atomic.AddInt32(&Pairing_num, -2)
                    <-p.Chan
                    <-p0.Chan
                    go p0.OnPair(p0, p)
                    go p.OnPair(p, p0)
                    goto bbreak
                default:
                    continue
                }
            }
        }
        if p0 != nil {
            <-p0.Chan
        }

    bbreak:
        //time.Sleep(1 * time.Millisecond)
    }
}
    原文作者:舞伴问题
    原文地址: https://blog.csdn.net/InsZVA/article/details/52853623
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞