c – 在生产者 – 消费者代码中使用wait()来提升条件死锁

我使用Boost线程和条件实现了一个基本的线程生成器 – 使用者(线程1 =生产者,线程2 =消费者).我经常无限期地陷入等待().我真的不明白这里有什么不对.下面是一些伪代码:

// main class
class Main {
public:
  void AddToQueue(...someData...)
  {
    boost::mutex::scoped_lock lock(m_mutex);
    m_queue.push_back(new QueueItem(...someData...));
    m_cond.notify_one(); 
  }

  void RemoveQueuedItem(...someCond...)
  {
    // i'm wondering if this could cause the trouble?
    boost::mutex::scoped_lock lock(m_mutex);
    // erase a item matching condition (some code not shown,
    // but should be fairly self-explanatory -- IsMatch()
    // simply looks at a flag of QueueItem
    m_queue.erase(std::remove_if(m_queue.being(), m_queue.end(),
      boost::bind(&Main::IsMatch, this, _1, someCond), m_queue.end());
  }

  friend void WorkerThread(Main* m);
private:      
  boost::ptr_deque<QueueItem> m_queue;
  boost::mutex m_mutex;
  boost::condition m_cond;
};

// worker thread
void WorkerThread(Main* m)
{
  typedef boost::ptr_deque<QueueItem>::auto_type RelType;
  RelType queueItem;

  while(!shutDown) {
    { // begin mutex scope
      boost::mutex::scoped_lock lock(m->m_mutex);
      while(m->m_queue.empty()) {
        m->m_cond.wait(lock); // <- stuck here forever quite often!
      }
      queueItem = m->m_queue->pop_front(); // pop & take ptr ownership
    } // end mutex scope

    // ... do stuff with queueItem
    // ...
    // ... queueItem is deleted when it leaves scope & we loop around
  }
}

一些其他信息:

>使用Boost v1.44
>问题出现在Linux和Android中;我还不确定它是否会在Windows中发生

有任何想法吗?

更新:
我相信我已经孤立了这个问题.一旦确认,我将进一步更新,希望将在明天.

更新2:
事实证明,上述代码中没有问题.我依赖于AddToQueue()的底层API – 当处理工作线程中的数据时将它交回API,它有一个循环错误,它会再次调用AddToQueue()…现在已修复;-)

最佳答案 即使我使用STL队列,我最近做了类似的事情.看看你是否可以从我的实施中挑选出来.正如
wilx所说,你需要等待条件.我的实现对队列中的元素有最大限制,我用它来等待释放互斥锁/后卫.

我最初在Windows上执行此操作,能够使用Mutex或Critical部分,因此您可以删除模板参数,并直接使用boost :: mutex,如果它简化了它.

#include <queue>
#include "Message.h"
#include <boost/thread/locks.hpp>
#include <boost/thread/condition.hpp>

template <typename T> class Queue :  private boost::noncopyable
{
public:
  // constructor binds the condition object to the Q mutex
  Queue(T & mutex, size_t max_size) :  m_max_size(max_size), m_mutex(mutex){}

  // writes messages to end of Q 
  void put(const Message & msg)
  {
    // Lock mutex to ensure exclusive access to Q
    boost::unique_lock<T> guard(m_mutex);

    // while Q is full, sleep waiting until something is taken off of it
    while (m_queue.size() == m_max_size)
    {
      cond.wait(guard);
    }

    // ok, room on the queue. 
    // Add the message to the queue
    m_queue.push(msg);

    // Indicate so data can be ready from Q
    cond.notify_one();
  }

  // Read message from front of Q. Message is removed from the Q
  Message get(void)
  {
    // Lock mutex to ensure exclusive access to Q
    boost::unique_lock<T> guard(m_mutex);

    // If Q is empty, sleep waiting for something to be put onto it
    while (m_queue.empty())
    {
      cond.wait(guard);
    }

    // Q not empty anymore, read the value
    Message msg = m_queue.front();

    // Remove it from the queue
    m_queue.pop();

    // Signal so more data can be added to Q
    cond.notify_one();

    return msg;
  }

  size_t max_size(void) const
  {
    return m_max_size;
  }


private:
  const size_t m_max_size;
  T & m_mutex;
  std::queue<Message> m_queue;
  boost::condition_variable_any cond;
};

这样,您可以跨生产者/消费者共享队列.用法示例

boost::mutex mutex;

Queue<boost::mutex> q(mutex, 100);

boost::thread_group threads;

threads.create_thread(Producer<boost::mutex>(q));
threads.create_thread(Consumer<boost::mutex>(q));

threads.join_all();

生产者/消费者定义如下

template <typename T> class Producer
{
public:
   // Queue passed in
   explicit Producer(Queue<T> &q) :  m_queue(q) {}

   void operator()()
   {
   }
}
点赞