我使用Boost线程和条件实现了一个基本的线程生成器 – 使用者(线程1 =生产者,线程2 =消费者).我经常无限期地陷入等待().我真的不明白这里有什么不对.下面是一些伪代码:
// main class
class Main {
public:
void AddToQueue(...someData...)
{
boost::mutex::scoped_lock lock(m_mutex);
m_queue.push_back(new QueueItem(...someData...));
m_cond.notify_one();
}
void RemoveQueuedItem(...someCond...)
{
// i'm wondering if this could cause the trouble?
boost::mutex::scoped_lock lock(m_mutex);
// erase a item matching condition (some code not shown,
// but should be fairly self-explanatory -- IsMatch()
// simply looks at a flag of QueueItem
m_queue.erase(std::remove_if(m_queue.being(), m_queue.end(),
boost::bind(&Main::IsMatch, this, _1, someCond), m_queue.end());
}
friend void WorkerThread(Main* m);
private:
boost::ptr_deque<QueueItem> m_queue;
boost::mutex m_mutex;
boost::condition m_cond;
};
// worker thread
void WorkerThread(Main* m)
{
typedef boost::ptr_deque<QueueItem>::auto_type RelType;
RelType queueItem;
while(!shutDown) {
{ // begin mutex scope
boost::mutex::scoped_lock lock(m->m_mutex);
while(m->m_queue.empty()) {
m->m_cond.wait(lock); // <- stuck here forever quite often!
}
queueItem = m->m_queue->pop_front(); // pop & take ptr ownership
} // end mutex scope
// ... do stuff with queueItem
// ...
// ... queueItem is deleted when it leaves scope & we loop around
}
}
一些其他信息:
>使用Boost v1.44
>问题出现在Linux和Android中;我还不确定它是否会在Windows中发生
有任何想法吗?
更新:
我相信我已经孤立了这个问题.一旦确认,我将进一步更新,希望将在明天.
更新2:
事实证明,上述代码中没有问题.我依赖于AddToQueue()的底层API – 当处理工作线程中的数据时将它交回API,它有一个循环错误,它会再次调用AddToQueue()…现在已修复;-)
最佳答案 即使我使用STL队列,我最近做了类似的事情.看看你是否可以从我的实施中挑选出来.正如
wilx所说,你需要等待条件.我的实现对队列中的元素有最大限制,我用它来等待释放互斥锁/后卫.
我最初在Windows上执行此操作,能够使用Mutex或Critical部分,因此您可以删除模板参数,并直接使用boost :: mutex,如果它简化了它.
#include <queue>
#include "Message.h"
#include <boost/thread/locks.hpp>
#include <boost/thread/condition.hpp>
template <typename T> class Queue : private boost::noncopyable
{
public:
// constructor binds the condition object to the Q mutex
Queue(T & mutex, size_t max_size) : m_max_size(max_size), m_mutex(mutex){}
// writes messages to end of Q
void put(const Message & msg)
{
// Lock mutex to ensure exclusive access to Q
boost::unique_lock<T> guard(m_mutex);
// while Q is full, sleep waiting until something is taken off of it
while (m_queue.size() == m_max_size)
{
cond.wait(guard);
}
// ok, room on the queue.
// Add the message to the queue
m_queue.push(msg);
// Indicate so data can be ready from Q
cond.notify_one();
}
// Read message from front of Q. Message is removed from the Q
Message get(void)
{
// Lock mutex to ensure exclusive access to Q
boost::unique_lock<T> guard(m_mutex);
// If Q is empty, sleep waiting for something to be put onto it
while (m_queue.empty())
{
cond.wait(guard);
}
// Q not empty anymore, read the value
Message msg = m_queue.front();
// Remove it from the queue
m_queue.pop();
// Signal so more data can be added to Q
cond.notify_one();
return msg;
}
size_t max_size(void) const
{
return m_max_size;
}
private:
const size_t m_max_size;
T & m_mutex;
std::queue<Message> m_queue;
boost::condition_variable_any cond;
};
这样,您可以跨生产者/消费者共享队列.用法示例
boost::mutex mutex;
Queue<boost::mutex> q(mutex, 100);
boost::thread_group threads;
threads.create_thread(Producer<boost::mutex>(q));
threads.create_thread(Consumer<boost::mutex>(q));
threads.join_all();
生产者/消费者定义如下
template <typename T> class Producer
{
public:
// Queue passed in
explicit Producer(Queue<T> &q) : m_queue(q) {}
void operator()()
{
}
}