C++ 并发消息队列
在网上找到了一份POSIX线程显示的并发消息队列示例代码:
http://codereview.stackexchange.com/questions/41604/thread-safe-concurrent-fifo-queue-in-c
上面的示例代码其实是有问题的,他只能对并发Push或者并发Pop进行上锁,二并不能保证同时Push和Pop是线程安全的,所以在锁队列时只能使用一个锁。同时该代码并不支持Windows,所以按照这篇文档的思路想使用标准模板库(STL)实现一份平台无关的代码,具体实现如下所示。
1 #include <queue> 2 #include <mutex> 3 #include <thread> 4 #include <chrono> 5 #include <memory> 6 #include <condition_variable> 7 8 typedef struct task_tag 9 { 10 int data; 11 task_tag( int i ) : data(i) { } 12 } Task, *PTask; 13 14 class MessageQueue 15 { 16 public: 17 MessageQueue(){} 18 ~MessageQueue() 19 { 20 if ( !m_queue.empty() ) 21 { 22 PTask pRtn = m_queue.front(); 23 delete pRtn; 24 } 25 26 } 27 28 void PushTask( PTask pTask ) 29 { 30 std::unique_lock<std::mutex> lock( m_queueMutex ); 31 m_queue.push( pTask ); 32 m_cond.notify_one(); 33 } 34 35 PTask PopTask() 36 { 37 PTask pRtn = NULL; 38 std::unique_lock<std::mutex> lock( m_queueMutex ); 39 while ( m_queue.empty() ) 40 { 41 m_cond.wait_for( lock, std::chrono::seconds(1) ); 42 } 43 44 if ( !m_queue.empty() ) 45 { 46 pRtn = m_queue.front(); 47 if ( pRtn->data != 0 ) 48 m_queue.pop(); 49 } 50 51 return pRtn; 52 } 53 54 private: 55 std::mutex m_queueMutex; 56 std::condition_variable m_cond; 57 std::queue<PTask> m_queue; 58 }; 59 60 void thread_fun( MessageQueue *arguments ) 61 { 62 while ( true ) 63 { 64 PTask data = arguments->PopTask(); 65 66 if (data != NULL) 67 { 68 printf( "Thread is: %d\n", std::this_thread::get_id() ); 69 printf(" %d\n", data->data ); 70 if ( 0 == data->data ) //Thread end. 71 break; 72 else 73 delete data; 74 } 75 } 76 77 return; 78 } 79 80 int main( int argc, char *argv[] ) 81 { 82 MessageQueue cq; 83 84 #define THREAD_NUM 3 85 std::thread threads[THREAD_NUM]; 86 87 for ( int i=0; i<THREAD_NUM; ++i ) 88 threads[i] = std::thread( thread_fun, &cq ); 89 90 int i = 100000; 91 while( i > 0 ) 92 { 93 Task *pTask = new Task( --i ); 94 cq.PushTask( pTask ); 95 } 96 97 for ( int i=0; i<THREAD_NUM; ++i) 98 threads[i].join(); 99 100 //system( "pause" ); 101 return 0; 102 }
在示例代码中,我们使主线程向公共队列cq中Push任务,而其他的线程则负责取出任务并打印任务,由于std::cout并不支持并发线程安全,所以在打印任务时使用printf。主线程new出的任务,在其他线程中使用并销毁,当主线程发送data为0的任务时,则规定任务发送完毕,而其他的线程获取到data为0的任务后退出线程,data为0的任务则有消息队列负责销毁。整个消息队列使用标准模板库实现,现实跨平台。
在最初设计std::queue<PTask>的时候,想使用std::queue<std::shared_ptr<Task>>来管理主线程new出来的任务,这样智能指针则负责处理任务的销毁工作,但是在多线程并发的时候程序莫名的崩溃,仔细调试了半天,还是没有找到问题,最终我怀疑智能指针在多线程中是不是有问题呢?所以不得不放弃最初的设计。