c# – 是否有任何并发​​无锁阻塞队列的实现?

我知道阻塞队列和无锁队列,这是
Scott et al.提供的那些实现的一个很好的例子,但是有没有无锁阻塞队列的实现?

在无锁阻塞队列中,出列将不需要锁定,但如果队列中没有项目,它将阻止使用者.这种野兽有没有实现?我更喜欢它们是C#实现,但任何实现在技术上都可行.

更新:

我想我最终在D14.1线上遇到了比赛条件:

initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event

    enqueue(Q: pointer to queue t, value: data type)
E1:     node = new node() // Allocate a new node from the free list
E2:     node–>value = value // Copy enqueued value into node
E3:     node–>next.ptr = NULL // Set next pointer of node to NULL
E4:     loop // Keep trying until Enqueue is done
E5:         tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6:         next = tail.ptr–>next // Read next ptr and count fields together
E7:         if tail == Q–>Tail // Are tail and next consistent?
E8:             if next.ptr == NULL // Was Tail pointing to the last node?
E9:                 if CAS(&tail.ptr–>next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1:                  signal.Set() // Signal to the blocking dequeues
E10.2:                  break // Enqueue is done. Exit loop
E11:                endif
E12:            else // Tail was not pointing to the last node
E13:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14:            endif
E15:        endif
E16:     endloop
E17:    CAS(&Q–>Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node


    dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1:     loop // Keep trying until Dequeue is done
D2:         head = Q–>Head // Read Head
D3:         tail = Q–>Tail // Read Tail
D4:         next = head–>next // Read Head.ptr–>next
D5:         if head == Q–>Head // Are head, tail, and next consistent?
D6:             if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7:                 if next.ptr == NULL // Is queue empty?
D8.1:                   signal.WaitOne() // Block until an enqueue
D8.X:                   // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9:                 endif
D10:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11:            else // No need to deal with Tail
                    // Read value before CAS, otherwise another dequeue might free the next node
D12:                *pvalue = next.ptr–>value
D13:                if CAS(&Q–>Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1:                  if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2:                      signal.Reset()
D14.3:                  break // Dequeue is done. Exit loop
D15:                endif
D16:            endif
D17:         endif
D18:    endloop
D19:    free(head.ptr) // It is safe now to free the old dummy node
D20:    return TRUE // Queue was not empty, dequeue succeeded

最佳答案 编辑:

简单了:
我建议你的队列不需要头尾.只是有一个头.如果head = NULL,则列表为空.添加项目到头部.从头部删除项目.更简单,更少的CAS操作.

帮手:
我在评论中建议你需要考虑一个辅助方案来处理这场比赛.在我的“锁定免费”版本的版本中,如果它们不会导致问题,那么可以获得罕见的竞争条件.我喜欢额外的性能与空闲线程睡眠时间太长.

帮助者的想法.当消费者抓住工作时,它可以检查是否存在昏迷中的线程.当生产者添加工作时,它可以寻找昏迷中的线程.

跟踪睡眠者.使用链接的睡眠者列表.当一个线程决定没有工作时,它会将自己标记为!清醒并且CAS本身就是睡眠者列表的头部.当接收到信号唤醒时,线程将自我标记为唤醒.然后新唤醒的线程清理睡眠者列表.要清理并发的单个链表,您必须要小心.你只能CAS到头.因此,当睡眠者名单的头部被标记为清醒时,您可以将头部关闭.如果头部没有醒来,继续扫描列表并“懒惰取消链接”(我将该术语提升)剩余的清醒物品.懒惰的取消链接很简单…只需在清醒项目上设置下一个上一个项目的ptr.并发扫描仍然会使它到达列表的末尾,即使它到达了唤醒的项目.后续扫描会查看较短的列表.最后,无论何时添加工作或完成工作,都要扫描睡眠清单中的!清醒物品.如果消费者在抓取一些工作后仍然注意到工作(.next work!= NULL),消费者可以扫描睡眠者列表并发出第一个醒来的线程!在生产者添加工作之后,生产者可以扫描睡眠者列表并执行相同的操作.

如果您有一个广播场景并且无法发出单个线程的信号,那么只需保持一个睡眠线程的计数.虽然这个数字仍然是> 0,消费者注意到剩余工作和消费者添加工作将广播信号以唤醒.

在我们的环境中,我们每个SMT有1个线程,因此睡眠者列表永远不会那么大(除非我得到其中一个新的128个并发线程机器!)我们在事务的早期生成工作项.在第一秒,我们可能会生成10,000个工作项,这种生产会迅速逐渐减少.线程在这些工作项上工作几秒钟.所以,我们很少在空闲池上有一个线程.

你仍然可以使用锁
如果你只有1个线程并很少生成工作……这对你不起作用.在这种情况下,互斥体的性能无关紧要,你应该只使用它们.在此方案中使用睡眠队列上的锁定.将无锁视为“没有锁定的地方”.

以前的帖子:
你是说:
有一个工作队列.
有许多消费者线程.
如果有任何工作,消费者需要拉动工作并做到这一点
消费者线程需要睡眠直到有工作.

如果是,我们这样做只使用原子操作:

工作队列是一个链表.还有一个睡眠线程的链表.

添加工作:CAS列表的头部到新工作.添加工作时,我们会检查睡眠者列表中是否有任何线程.如果有的话,在添加工作之前,我们将CAS从睡眠者列表中清除,设置其工作=新工作,然后通知睡眠者醒来.我们将工作添加到工作队列中.

消耗工作:CAS列表的头部 – 接下来.如果工作列表的头部为NULL,我们将CAS线程转换为睡眠者列表.

一旦线程有一个工作项,线程必须CAS工作项的状态为WORK_INPROGRESS或其他一些.如果失败,则意味着工作正在由另一个人执行,因此消费者线程会返回搜索工作.如果一个线程醒来并且有一个工作项,它仍然需要CAS状态.

因此,如果增加工作,一个熟睡的消费者总是被唤醒并交付工作. pthread_kill()总是在sigwait()中唤醒一个线程,因为即使线程在信号之后进入sigwait,也会收到信号.这解决了线程将自己放在睡眠者列表上但在睡觉前发出信号的问题.所有发生的事情是线程试图拥有它的 – >如果有的话.没有工作或没有工作将线程发送回消费开始.如果一个线程无法通过CAS进入睡眠者列表,则意味着另一个线程击败它,或者生产者关闭了一个睡眠者.为了安全起见,我们让线程表现得好像只是被唤醒了.

我们没有竞争条件这样做,并有多个生产者和消费者.我们也能够扩展它以允许线程在个别工作项上休眠.

点赞