生产者-消费者问题也被称为有界缓冲区问题,两个进程/线程共享一个公共的固定大小的缓冲区。其中一个是生产者,将信息放入缓冲区;另一个是消费者,从缓冲区中取出信息。
问题在于当缓冲区已满,而此时生产者还想向其中放入一个新的数据项的情况。其解决方法就是让生产者休眠,待消费者从缓冲区中取出一个或多个数据项时再唤醒它。同样地,当消费者试图从缓冲区中取数据而发现缓冲区为空时,消费者就休眠,直到生产者向其中放入一些数据时再将其唤醒。
// 以下代码只是对生产者-消费者问题的大致描述 // 因为对count的访问未加限制,可能会出现竞争条件问题 #define N 100 // 缓冲区的槽数目 int count = 0; // 缓冲区中的数据项的数目 // 生产者 void producer(void) { int item; while(TRUE){ item = produce_item(); if(count == N) sleep(); insert_item(item); count = count + 1; if(count == 1) wakeup(consumer); } } // 消费者 void consumer(void) { int item; while(TRUE){ if(count == 0) sleep(); item = remove_item(); count = count - 1; if(count == N - 1) wakeup(producer); consume_item(item); } }外部线程可以向线程池中的任务队列添加任务,相当于“生产者”;一旦任务队列中有任务,就唤醒线程队列中的线程来执行这些任务,这些线程就相当于“消费者”。模型如下图。
muduo ThreadPool类图:
(1)任务队列的实现用到了STL的deque容器 deque容器为一个给定类型的元素进行线性处理,像向量一样,它能够快速地随机访问任一个元素,并且能够高效地插入和删除容器的尾部元素。但它又与vector不同,deque支持高效插入和删除容器的头部元素,因此也叫做双端队列。
deque常用函数如下:
#include <deque> void push_front(const T& x); // 双端队列头部增加一个元素x void push_back(const T& x); // 双端队列尾部增加一个元素x void pop_front(); // 删除双端队列中最前一个元素 void pop_back(); // 删除双端队列中最后一个元素 void clear(); // 清空双端队列中最后一个元素 reference at(int pos); // 返回pos位置元素的引用 reference front(); // 返回手元素的引用 reference back(); // 返回尾元素的引用 bool empty() const; // 向量是否为空,若为true,则向量中无元素(2)几个成员函数的说明
文件名:ThreadPool.cc // 启动线程池,启动的线程是固定个数的(numThreads) void ThreadPool::start(int numThreads) { assert(threads_.empty()); // 断言线程池是空的 running_ = true; // 运行状态标记置为true threads_.reserve(numThreads); // 为线程池预留指定大小的空间 // 创建线程 for (int i = 0; i < numThreads; ++i) { char id[32]; snprintf(id, sizeof id, "%d", i); threads_.push_back(new muduo::Thread( boost::bind(&ThreadPool::runInThread, this), name_+id)); threads_[i].start(); } } 文件名:ThreadPool.cc // 关闭线程池 void ThreadPool::stop() { { MutexLockGuard lock(mutex_); running_ = false; // 运行状态标识置为false cond_.notifyAll(); // 通知所有线程 } // 等待所有线程关闭 // boost::bind调用类成员函数时需要传入类成员函数指针、类对象指针... for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1)); } 文件名:ThreadPool.cc // 执行任务 void ThreadPool::run(const Task& task) { // 如果线程池没有线程,那么直接执行任务 // 也就是说假设没有消费者,那么生产者直接消费产品,而不把任务加入任务队列 if (threads_.empty()) { task(); } // 如果线程池有线程,则将任务添加到任务队列 else { MutexLockGuard lock(mutex_); queue_.push_back(task); cond_.notify(); } } 文件名:ThreadPool.cc // 任务分配函数(获取任务) // 线程池函数或者线程池里面的函数都可以到这里取出一个任务 // 然后在自己的线程中执行任务,返回一个任务指针 ThreadPool::Task ThreadPool::take() { MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup(虚假唤醒) // 任务队列为空且线程池处于运行状态,需要等待任务的到来 while (queue_.empty() && running_) { cond_.wait(); } Task task; if(!queue_.empty()) { // 获取任务并弹出 task = queue_.front(); queue_.pop_front(); } return task; }