操作系统:生产者消费者模型的两种实现(C++)

文章目录

生产者消费者模型

什么是生产者消费者模型

生产者消费者模型是针对在任务处理中既要产生数据,又要处理数据这一情景而设计出来的一种解决方案。如果生产者生产资源很快,消费者处理资源的速度很慢,则生产者就必须等待消费者处理完数据才能继续生产,反之同理,这样的话生产者与消费者之间的耦合度较高(依赖关系),导致总体效率较低。
于是通过引入一个交易场所(缓冲区),来解决生产者与消费者之间的强耦合关系生产者与消费者之间不是直接通讯,而是通过这个交易场所来间接通讯,将两者的直接关系转变成间接关系,生产者生产完数据直接交给仓库,而消费者要使用则直接从缓冲区取出数据,这样效率就大大的提高。
《操作系统:生产者消费者模型的两种实现(C++)》

生产者消费者模型的321原则

  1. 一个场所
    交易场所:缓冲区

  2. 两个角色
    生产者:生产资源的角色,往交易场所放数据
    消费者:消费资源的角色,从交易场所取数据

  3. 三种关系
    生产者与消费者之间的同步与互斥关系
    生产者与生产者之间的互斥关系
    消费者与消费者之间的互斥关系

生产者消费者模型的优点

  • 解耦合:
    生产者与消费者之间不直接通讯,而是借助交易场所通讯,降低了耦合度
  • 支持忙闲不均:
    可以解决某一方过忙而某一方过闲的问题
  • 支持并发:
    多线程轮询处理

生产者消费者模型的实现方法

基于循环队列,信号量实现

可以借助循环队列来作为交易场所,用信号量实现同步与互斥关系

#include<iostream>
#include<pthread.h>
#include<vector>
#include<semaphore.h>
using namespace std;

const size_t MAX_SIZE = 5;

class CircularQueue
{ 
    public:
        CircularQueue(int capacity = MAX_SIZE) 
            : _queue(capacity) 
            , _capacity(capacity)
            , _front(0)
            , _rear(0)
        { 
            sem_init(&_mutex, 0, 1);
            sem_init(&_full, 0, 0);
            sem_init(&_empty, 0, _capacity);
        }
        
        ~CircularQueue()
        { 
            sem_destroy(&_mutex);
            sem_destroy(&_full);
            sem_destroy(&_empty);
        }
        
        //队尾入队
        void Push(int data)
        { 
            //入队前先减少一个空闲资源,防止其他进程争抢资源,如果资源不够则阻塞
            sem_wait(&_empty);
            sem_wait(&_mutex);
            
            _queue[_rear++] = data;
            _rear %= _capacity;
           
            sem_post(&_mutex);
            sem_post(&_full);
            //入队完成,使用资源+1,同时唤醒消费者使用资源
        }

        //队头出队
        void Pop(int& data)
        { 
            //出队前减少当前使用的资源数
            sem_wait(&_full);
            sem_wait(&_mutex);
            
            data = _queue[_front++];
            _front %= _capacity;
           
            sem_post(&_mutex);
            sem_post(&_empty);
            //出队完成,唤醒生产者生产资源
        }

    private:
        vector<int> _queue;
        size_t _capacity;
        size_t _front;
        size_t _rear;

        //需要三个信号量,一个互斥信号量,一个信号量表示空闲资源,一个信号量正在使用资源
        sem_t _mutex;
        sem_t _full;
        sem_t _empty;
        
};

void* productor(void* arg)
{ 
    //因为入口函数的参数为void*,所以需要强转回对应类型
    CircularQueue* queue = (CircularQueue*)arg;
    int data = 0;

    while(1)
    { 
        queue->Push(data);
        cout << "生产者存入数据:" << data++ << endl;
    }

    return NULL;
}

void* customer(void* arg)
{ 
    CircularQueue* queue = (CircularQueue*)arg;
    int data;

    while(1)
    { 
        queue->Pop(data);
        cout << "消费者取出数据:" << data << endl;
    }

    return NULL;
}

int main()
{ 
    CircularQueue queue;
    pthread_t pro_pid[5], cus_pid[5];
    
    //创建线程
    for(int i = 0; i < 5; i++)
    { 
        int ret = pthread_create(&pro_pid[i], NULL, productor, (void*)&queue);
        if(ret)
        { 
            cout << "生产者进程创建失败" << endl;
            return -1;
        }
        
        ret = pthread_create(&cus_pid[i], NULL, customer, (void*)&queue);

        if(ret)
        { 
            cout << "消费者进程创建失败" << endl;
            return -1;
        }
    }

    //线程等待
    for(int i = 0; i < 5; i++)
    { 
        pthread_join(pro_pid[i], NULL);
        pthread_join(cus_pid[i], NULL);
    }

    return 0;
}

这种方法是教科书上常用的,实现起来也较为简单,但是有一个需要注意的问题,
出队和入队时,互斥信号量一定要放在条件信号量后面,否则会引起死锁问题
这里以生产者举例子,假设如果先执行wait(mutex),此时如果没有空闲的空间,生产者就会阻塞,此时到消费者进程,消费者却因为mutex已经加锁,无法进行后面的post操作使互斥信号量mutex进行解锁,此时消费者也会被阻塞,这时两者都期待对方解锁,但是双方都阻塞,无法解锁,导致程序进入了永眠状态,也就是死锁。

运行效果
《操作系统:生产者消费者模型的两种实现(C++)》

基于阻塞队列,互斥锁、条件变量实现

借助阻塞队列作为交易场所,互斥锁实现互斥关系,条件变量实现同步关系

#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<queue>

using namespace std;

const size_t MAX_SIZE = 5;

class BlockQueue
{ 
    public:
        BlockQueue(size_t capacity = MAX_SIZE) : _capacity(capacity)
        { 
            pthread_mutex_init(&_mutex, NULL);
            pthread_cond_init(&_pro_cond, NULL);
            pthread_cond_init(&_cus_cond, NULL);
        }

        ~BlockQueue()
        { 
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_pro_cond);
            pthread_cond_destroy(&_cus_cond);
        }

        void Push(int data)
        { 
            //加锁保证线程安全
            pthread_mutex_lock(&_mutex);

            //队列已满,阻塞生产者生产
            //循环等待,防止有其他进程进入
            while(_queue.size() == _capacity)
            { 
                pthread_cond_wait(&_pro_cond, &_mutex);
            }

            //数据入队
            _queue.push(data);

            pthread_mutex_unlock(&_mutex);
            //生产完毕,唤醒消费者消费
            pthread_cond_signal(&_cus_cond);

        }

        void Pop(int& data)
        { 
            pthread_mutex_lock(&_mutex);

            while(_queue.empty())
            { 
                //队列空,阻塞消费者消费
                pthread_cond_wait(&_cus_cond, &_mutex);
            }

            //数据出队
            data = _queue.front();
            _queue.pop();
            
            //消费完毕,唤醒生产者生产
            pthread_cond_signal(&_pro_cond);
            pthread_mutex_unlock(&_mutex);
        }

    private:
        queue<int> _queue;
        size_t _capacity;
		
		//实现互斥关系的互斥锁
        pthread_mutex_t _mutex;
        //实现同步关系的条件变量
        pthread_cond_t _pro_cond;
        pthread_cond_t _cus_cond;
};

//因为入口函数的参数必须为void* ,所以要强转成BlockQueue类型
void *producter(void *arg)
{ 
    BlockQueue *queue = (BlockQueue*)arg;
    //生产者不断生产数据
    
    int data = 0;
    while(1)
    { 
        //生产数据
        queue->Push(data);

        cout << "生产者放入数据:" << data++ << endl;
    }

    return NULL;
}

void *customer(void *arg)
{ 
    BlockQueue *queue = (BlockQueue*)arg;
    //消费者不断取出数据
    while(1)
    { 
        //取出数据
        int data;
        queue->Pop(data);

        cout << "消费者取出数据:" << data << endl;
    }

    return NULL;
}

int main()
{ 
    BlockQueue queue;
    pthread_t pro_tid[5], cus_tid[5];
	
    //创建线程
    for(size_t i = 0; i < 5; i++)
    { 
        int ret = pthread_create(&pro_tid[i], NULL, producter, (void*)&queue);

        if(ret)
        { 
            cout << "生产者线程创建失败" << endl;
            return -1;
        }

        ret = pthread_create(&cus_tid[i], NULL, customer, (void*)&queue);
        if(ret)
        { 
            cout << "消费者线程创建失败" << endl;
            return -1;
        }
    }   
    
    //等待线程退出
    for(size_t i = 0; i < 4; i++)
    { 
        pthread_join(pro_tid[i], NULL);
        pthread_join(cus_tid[i], NULL);
    }

    return 0;
}

在这里我们又会看到,这里是先对互斥锁加锁,再对条件变量进行wait操作,这里为什么不会引起死锁呢?
因为条件变量的pthread_cond_wait操作其实集合了三个操作,他会首先对互斥锁解锁,然后判断条件变量是否达成,没有则阻塞,之后再对互斥锁进行加锁,所以这样就不会因为没人解锁而陷入死锁状态。

实现效果
《操作系统:生产者消费者模型的两种实现(C++)》

    原文作者:凌桓丶
    原文地址: https://blog.csdn.net/qq_35423154/article/details/106149185
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞