c – std线程作业系统在添加作业期间删除1纳秒的延迟时中止

我已经包含了项目的完整来源,只有大约100行.

这是一个多线程的工作系统,我正在为一个学校作业工作,经过几天的努力,主要是因为这对我来说是非常新的事实(C 11和标准线程).

我对thread,mutex和condition_variable类有基本的了解.但是,应用程序抛出调试错误:已调用R6010 abort().当我在向池中添加单个作业期间删除1纳秒的睡眠呼叫时.通过在访问和获取作业期间放置互斥锁我希望解决错误,但是徒劳无功.

#include "stdafx.h"
#include <iostream>
#include <queue>
#include <future>
#include <thread>
#include <atomic>
using namespace std;

float CalcInvSqr(float x){
    return (x);
}

class ThreadPool{
public:
    ThreadPool::ThreadPool(int numThreads){
        for(int i=0;i<numThreads;i++){
            threads.push_back(thread(&ThreadPool::StartThread, this));
        }
    };

    ThreadPool::~ThreadPool(){
        CleanUp();
    }

    template<typename F>
    future<F> AddJob(std::packaged_task<F()>& job) {
        unique_lock<mutex> lock(poolMutex);
        this->jobList.push([&job]() {
            job();
        });

        this->cv.notify_one();
        lock.unlock();
        return job.get_future();
    }

    void StartThread(){
        while(true){

            unique_lock<mutex> uLock(poolMutex);
            cv.wait(uLock);

            if(stopped.load())
                return;

            std::function<void()> f;
            f = jobList.front();
            jobList.pop();

            uLock.unlock();
            f();
        }
    }

    void CleanUp(){
        stopped.store(true);
        {
            unique_lock<mutex> lock(poolMutex);
            cv.notify_all();
            lock.unlock();

            for(int i=0;i<threads.size();i++){
                if(threads[i].joinable())
                    threads[i].join();
            }

            cout<<jobList.size()<<endl;
        }
    }


private:
    mutable std::queue<std::function<void()>> jobList;
    atomic<bool> stopped;
    std::vector<std::thread> threads;
    std::mutex poolMutex;
    std::condition_variable cv;
};

#define JOBS 50

int main(){

    ThreadPool pool(4);

    std::vector<std::future<float>> results;
    for(int i=0;i<JOBS;i++){
        std::this_thread::sleep_for(std::chrono::nanoseconds(1));
        float num = (float)(rand()%RAND_MAX)/(float)RAND_MAX;
        std::packaged_task<float()> task([num](){
            return CalcInvSqr(num);
        });
        results.push_back(pool.AddJob<float>(task));
    }

    for(int i=0;i<JOBS;i++){
        cout<<"VALUE: "<<results[i].get()<<endl;
    }

    cout<<"Jobs Done ! "<<endl;
    system("PAUSE");
    return 0;
}

我在这里遇到了障碍,似乎无法解决错误,我希望有更多经验的人可以引导我走向正确的方向.

当调用1 nanosec延迟时,代码成功执行并具有正确的输出.

最佳答案 这是更正后的计划.

请注意,在通知条件变量之前,互斥锁已解锁.这个很重要.

您会注意到我在ThreadPool :: Add<>中使用了shared_ptr.这是因为代码是为c 11编写的.如果它是c 14,我们可以取消shared_ptr并简单地将packaged_task移动到首先采用未来的lambda中.

#include <iostream>
#include <queue>
#include <future>
#include <thread>
#include <atomic>
using namespace std;

float CalcInvSqr(float x){
    return (x);
}

class ThreadPool{
public:
    ThreadPool(int numThreads){
        for(int i=0;i<numThreads;i++){
            threads.push_back(thread(&ThreadPool::StartThread, this));
        }
    };

    ~ThreadPool(){
        CleanUp();
    }

    template<typename F>
    std::future<F> AddJob(std::packaged_task<F()> job)
    {
        auto job_ptr = make_shared<std::packaged_task<F()>>(move(job));

        unique_lock<mutex> lock(_job_mutex);

        jobList.push_back([job_ptr]() {
            job_ptr->operator()();
        });
        lock.unlock();
        _jobs_available.notify_one();
        return job_ptr->get_future();
    }

    void StartThread(){
        while (true) {
            unique_lock<mutex> job_lock(_job_mutex);
            _jobs_available.wait(job_lock, [this]() { return !stopped && !jobList.empty(); });
            if (stopped)
                return;
            auto job = move(jobList.front());
            jobList.pop_front();
            job_lock.unlock();
            job();
        }
    }

    void CleanUp(){
        unique_lock<mutex> job_lock(_job_mutex);
        stopped = true;
        jobList.clear();
        job_lock.unlock();
        _jobs_available.notify_all();

        for (auto& thread : threads) {
            if (thread.joinable())
                thread.join();
        }
    }


private:
    std::mutex _job_mutex;
    std::condition_variable _jobs_available;

    std::deque<std::function<void()>> jobList;
    atomic<bool> stopped;
    std::vector<std::thread> threads;
};

#define JOBS 5000

int main(){

    ThreadPool pool(40);

    std::vector<std::future<float>> results;
    for(int i=0;i<JOBS;i++){
        float num = (float)(rand()%RAND_MAX)/(float)RAND_MAX;
        std::packaged_task<float()> task([num](){
            return CalcInvSqr(num);
        });
        results.push_back(pool.AddJob<float>(move(task)));
    }

    for(int i=0;i<JOBS;i++){
        cout<<"VALUE: "<<results[i].get()<<endl;
    }

    cout<<"Jobs Done ! "<<endl;
    //    system("PAUSE");
    return 0;
}
点赞