一个C++线程池的简单实现

代码是网上找的,自己稍作修改,仅仅编译通过了,是否实用还有待实际项目和实际应用才知道。

#include <Windows.h>
#include <iostream>
#include <queue>
#include <winbase.h>
#include <assert.h>

using namespace std;
class CThreadPool;
class WorkItemBase;

typedef std::queue<WorkItemBase*> WorkItemQueue, *PWorkItemQueue;

typedef struct _THREAD_CONTEXT
{
	CThreadPool*	pWorkQueue;
	void*			pThreadData;	
} THREAD_CONTEXT, *PTHREAD_CONTEXT;

class WorkItemBase
{
	virtual void DoWork(void* pThreadData) = 0;
	virtual void Abort() = 0;
	friend CThreadPool;
};

class CThreadPool
{
public:
	virtual ~CThreadPool(){};
	bool Create(const unsigned int nNumberOfThreads, void** ThreadData = NULL)
	{
		m_pWorkItemQueue = new WorkItemQueue;
		m_phSincOjbectsArray[SEMAPHORE_INDEX] = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
		if ( NULL == m_phSincOjbectsArray[SEMAPHORE_INDEX])
		{
			//clean codes ...
			return false;
		}
		m_phSincOjbectsArray[ABORT_EVENT_INDEX] = CreateEvent(NULL, true, false, NULL);
		if ( NULL == m_phSincOjbectsArray[ABORT_EVENT_INDEX])
		{
			//clean codes ...
			return false;
		}
		InitializeCriticalSection(&m_CriticalSection);
		m_phThreads = new HANDLE[nNumberOfThreads];
		if ( NULL == m_phThreads)
		{
			//clean codes
			return false;
		}
		m_nNumberOfThreads = nNumberOfThreads;
		DWORD dwThreadID;
		PTHREAD_CONTEXT pThreadContext;
		for (unsigned int i = 0; i < nNumberOfThreads; ++i)
		{
			pThreadContext = new THREAD_CONTEXT;
			pThreadContext->pWorkQueue = this;
			pThreadContext->pThreadData = ThreadData == NULL ? NULL : ThreadData[i];
			m_phThreads[i] = CreateThread(NULL, 0, CThreadPool::ThreadFunc, pThreadContext, 0, &dwThreadID);
			if ( NULL == m_phThreads[i])
			{
				//clean codes
				return false;
			}
		}

		return true;
	}

	void Destroy(int iWaitSecond)
	{
		// why?
		while(0 != GetWorkQueueSize())
		{
			Sleep(iWaitSecond * 1000);
		}
		if (!SetEvent(m_phSincOjbectsArray[ABORT_EVENT_INDEX]))
		{
			assert(false);
			return;
		}
		WaitForMultipleObjects(m_nNumberOfThreads, m_phThreads, true, INFINITE);
		while(!m_pWorkItemQueue->empty())
		{
			m_pWorkItemQueue->front()->Abort();
			m_pWorkItemQueue->pop();
		}
		delete m_pWorkItemQueue;
		m_pWorkItemQueue = NULL;
		CloseHandle(m_phSincOjbectsArray[SEMAPHORE_INDEX]);
		CloseHandle(m_phSincOjbectsArray[ABORT_EVENT_INDEX]);
		DeleteCriticalSection(&m_CriticalSection);
		for (unsigned int i = 0; i < m_nNumberOfThreads; ++i)
		{
			CloseHandle(m_phThreads[i]);
		}
		delete[] m_phThreads;
	}
	int GetThreadTotalNum()
	{
		return m_nNumberOfThreads;
	}
private:
	static unsigned long __stdcall ThreadFunc( void* pParam)
	{
		PTHREAD_CONTEXT pThreadContext = (PTHREAD_CONTEXT)pParam;
		WorkItemBase* pWorkItem = NULL;
		CThreadPool* pThreadPool =  pThreadContext->pWorkQueue;
		void* pThreadData = pThreadContext->pThreadData;
		DWORD dwWaitResult;
		for (;;)
		{
			dwWaitResult = WaitForMultipleObjects(NUMBER_OF_SYNC_OBJ, pThreadPool->m_phSincOjbectsArray, false, INFINITE);
			switch (dwWaitResult)
			{
			case ABORT_EVENT_INDEX:
				delete pThreadContext;
				return 0;
			case SEMAPHORE_INDEX:
				pWorkItem = pThreadPool->RemoveWorkItem();
				pWorkItem->DoWork(pThreadData);
				break;
			default:
				assert(false);
				delete pThreadContext;
				return 0;
			}
		}
	}
	bool InsertWorkItem( WorkItemBase* pWorkItem)
	{
		EnterCriticalSection(&m_CriticalSection);
		m_pWorkItemQueue->push(pWorkItem);
		LeaveCriticalSection(&m_CriticalSection);
		if (!ReleaseSemaphore(m_phSincOjbectsArray[SEMAPHORE_INDEX], 1, NULL))
		{
			assert(false);
			return false;
		}
		
		return true;
	}
	WorkItemBase* RemoveWorkItem()
	{
		EnterCriticalSection(&m_CriticalSection);
		WorkItemBase* pFront = m_pWorkItemQueue->front();
		m_pWorkItemQueue->pop();
		LeaveCriticalSection(&m_CriticalSection);
		return pFront;
	}
	size_t GetWorkQueueSize()
	{
		m_pWorkItemQueue->size();
	}

	enum{
		ABORT_EVENT_INDEX = 0,
		SEMAPHORE_INDEX,
		NUMBER_OF_SYNC_OBJ
	};

	PHANDLE				m_phThreads;
	unsigned int		m_nNumberOfThreads;
	void*				m_pThreadDataArray;

	HANDLE				m_phSincOjbectsArray[NUMBER_OF_SYNC_OBJ];
	
	CRITICAL_SECTION	m_CriticalSection;
	PWorkItemQueue		m_pWorkItemQueue;
	
};

总结了一下思路:

运行: 1. 根据需要创建N个working thread。 2. 使用一个WorkItem Queue来表示需要运行的过程队列,使用一个CriticalSection来保护此Queue。 3. 使用信号量来同步Queue中的WorkItem,每次向Queue中增加WorkItem都需要增加信号量,每次WaitForSingleObject(Or WaitforMutipleObjects时是Semaphore的signal)成功以后则从Queue中Remove掉一个WorkItem。 4. N个working threads保持循环,从被CriticalSection保护的Queue中取出WorkItem来,然后调用其DoWork()方法来运行。 停止: 1. 建立一个手动复位的event:HAbortEvent 2. 当想结束程序的时候使用SetEvent 3. 在working thread的CallBack函数中使用WaitForMutipleObjects一并监听HSemaphore和HAbortEvent,当是abort时退出线程。(使用index来确定是哪个handle被signal) 4. 对所有资源进行CloseHandle。

仍然需要思考的问题Q: 当线程处理线程池中的WorkItem不及时,是否会使Semaphore的容量超过最大值

点赞