我已经包含了项目的完整来源,只有大约100行.
这是一个多线程的工作系统,我正在为一个学校作业工作,经过几天的努力,主要是因为这对我来说是非常新的事实(C 11和标准线程).
我对thread,mutex和condition_variable类有基本的了解.但是,应用程序抛出调试错误:已调用R6010 abort().当我在向池中添加单个作业期间删除1纳秒的睡眠呼叫时.通过在访问和获取作业期间放置互斥锁我希望解决错误,但是徒劳无功.
#include "stdafx.h"
#include <iostream>
#include <queue>
#include <future>
#include <thread>
#include <atomic>
using namespace std;
float CalcInvSqr(float x){
return (x);
}
class ThreadPool{
public:
ThreadPool::ThreadPool(int numThreads){
for(int i=0;i<numThreads;i++){
threads.push_back(thread(&ThreadPool::StartThread, this));
}
};
ThreadPool::~ThreadPool(){
CleanUp();
}
template<typename F>
future<F> AddJob(std::packaged_task<F()>& job) {
unique_lock<mutex> lock(poolMutex);
this->jobList.push([&job]() {
job();
});
this->cv.notify_one();
lock.unlock();
return job.get_future();
}
void StartThread(){
while(true){
unique_lock<mutex> uLock(poolMutex);
cv.wait(uLock);
if(stopped.load())
return;
std::function<void()> f;
f = jobList.front();
jobList.pop();
uLock.unlock();
f();
}
}
void CleanUp(){
stopped.store(true);
{
unique_lock<mutex> lock(poolMutex);
cv.notify_all();
lock.unlock();
for(int i=0;i<threads.size();i++){
if(threads[i].joinable())
threads[i].join();
}
cout<<jobList.size()<<endl;
}
}
private:
mutable std::queue<std::function<void()>> jobList;
atomic<bool> stopped;
std::vector<std::thread> threads;
std::mutex poolMutex;
std::condition_variable cv;
};
#define JOBS 50
int main(){
ThreadPool pool(4);
std::vector<std::future<float>> results;
for(int i=0;i<JOBS;i++){
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
float num = (float)(rand()%RAND_MAX)/(float)RAND_MAX;
std::packaged_task<float()> task([num](){
return CalcInvSqr(num);
});
results.push_back(pool.AddJob<float>(task));
}
for(int i=0;i<JOBS;i++){
cout<<"VALUE: "<<results[i].get()<<endl;
}
cout<<"Jobs Done ! "<<endl;
system("PAUSE");
return 0;
}
我在这里遇到了障碍,似乎无法解决错误,我希望有更多经验的人可以引导我走向正确的方向.
当调用1 nanosec延迟时,代码成功执行并具有正确的输出.
最佳答案 这是更正后的计划.
请注意,在通知条件变量之前,互斥锁已解锁.这个很重要.
您会注意到我在ThreadPool :: Add<>中使用了shared_ptr.这是因为代码是为c 11编写的.如果它是c 14,我们可以取消shared_ptr并简单地将packaged_task移动到首先采用未来的lambda中.
#include <iostream>
#include <queue>
#include <future>
#include <thread>
#include <atomic>
using namespace std;
float CalcInvSqr(float x){
return (x);
}
class ThreadPool{
public:
ThreadPool(int numThreads){
for(int i=0;i<numThreads;i++){
threads.push_back(thread(&ThreadPool::StartThread, this));
}
};
~ThreadPool(){
CleanUp();
}
template<typename F>
std::future<F> AddJob(std::packaged_task<F()> job)
{
auto job_ptr = make_shared<std::packaged_task<F()>>(move(job));
unique_lock<mutex> lock(_job_mutex);
jobList.push_back([job_ptr]() {
job_ptr->operator()();
});
lock.unlock();
_jobs_available.notify_one();
return job_ptr->get_future();
}
void StartThread(){
while (true) {
unique_lock<mutex> job_lock(_job_mutex);
_jobs_available.wait(job_lock, [this]() { return !stopped && !jobList.empty(); });
if (stopped)
return;
auto job = move(jobList.front());
jobList.pop_front();
job_lock.unlock();
job();
}
}
void CleanUp(){
unique_lock<mutex> job_lock(_job_mutex);
stopped = true;
jobList.clear();
job_lock.unlock();
_jobs_available.notify_all();
for (auto& thread : threads) {
if (thread.joinable())
thread.join();
}
}
private:
std::mutex _job_mutex;
std::condition_variable _jobs_available;
std::deque<std::function<void()>> jobList;
atomic<bool> stopped;
std::vector<std::thread> threads;
};
#define JOBS 5000
int main(){
ThreadPool pool(40);
std::vector<std::future<float>> results;
for(int i=0;i<JOBS;i++){
float num = (float)(rand()%RAND_MAX)/(float)RAND_MAX;
std::packaged_task<float()> task([num](){
return CalcInvSqr(num);
});
results.push_back(pool.AddJob<float>(move(task)));
}
for(int i=0;i<JOBS;i++){
cout<<"VALUE: "<<results[i].get()<<endl;
}
cout<<"Jobs Done ! "<<endl;
// system("PAUSE");
return 0;
}