生产者消费者模型

    xiaoxiao2021-03-25  146

    生产者消费者模型

    如果没有生产者消费者模型? 我们先来看如果没有生产着消费者模型会怎么样, 假如生产者生产单位数据需要耗时10t 的时间;消费者消费数据需要1t 的时间, 那么如果我们是生产者消费者一对一的话, 消费者消费的节奏严重依赖了生产者的节奏. 消费者将有9t 的时间处于等待状态 . 这个时候就很僵硬了 , 明明消费者在剩余的9t 时间里可以去处理别的生产者产出的数据 , 那我们要怎么来设计来达到这个效果呢?

    生产者消费者模型大体结构 其实我们可以把生产者跟消费者解耦开来, 让生产者去依赖一个队列 , 生产者产出的消息放到队列里面,消费者再去队列里面取 . 这样我们可以对生产者和消费者的数量比进行拓展, 想要多少个消费者对应多少个生产者都可以; 比如生产者产出相对消费者消费速度慢,我们可以相对减少消费者的数量; 这样大大平衡了生产者群体和消费者群体的处理能力 .

    举个恶心点的例子 生产者线程的任务:拉屎; 消费者线程的任务:吃屎 生产者拉单位屎的时间是1000ms;消费者吃掉单位屎的时间是100ms; 如果消费者直接依赖生产者, 那么消费者只能等生产者把屎拉出来再吃掉,这个时候相当于消费者吃掉单位屎的时间是1100ms ; 如果消费者间接依赖生产者, 比如消费者依赖公共厕所, 生产者也依赖公共厕所 . 生产者把屎拉在厕所里, 消费者去厕所里面取. 这个时候我们可以增加生产者的个数. 比如 我们设置 5个生产者同时在拉屎, 然后消费者去厕所里面吃掉这些屎. 这样就能让一个消费者去处理五个生产者产出的数据, 就业是上面所说的平衡处理能力. 也达到了耦合的作用.

    生产者消费者模型 队列的设计 数据结构 生产者跟消费者的设计编码 MyBlockingQueueMyConsumerMyProducer

    队列的设计

    数据结构

    public class MyBlockingQueue<E> { MyBlockingQueue(int ca) { this.ca = ca; Entry<E> e = new Entry<>(); head = e; tail = e; } }

    1.采用单向链表来实现队列,则至少具有出队入队两个public的函数; 出队(take)针对消费者,入队(put)针对生产者;

    private static class Entry<E> { E datas; Entry<E> next; } private Entry<E> head; private Entry<E> tail; public void put(E e) throws InterruptedException; public E take() throws InterruptedException; private Entry<E> head; private Entry<E> tail

    2.需要有个固定的容量来规定队列的长度,这里采用AtomicInteger 当队列长度等于这个容量 让生产者停止生产; 同理,当队列长度等于0,让消费者停止消费; private AtomicInteger size = new AtomicInteger(0); 3.消费锁 takeLock 串行化多个消费者线程在该队列体现的消费动作 private final Lock takeLock = new ReentrantLock(); 4.产出锁 putLock 串行化多个消费者线程在该队列体现的产出动作 private final Lock putLock = new ReentrantLock(); 5.消费状态 notEmpty 提醒消费者进行消费 这个状态在消费者发现队列已经空了的时候,会执行await方法进入对象的等待池 在生产者生产完一个数数据是, 会执行signal方法,通知进入对象等待池的消费者线程进入对象的锁池进行抢锁消费 private final Condition notEmpty = takeLock.newCondition(); 6.产出状态 notFull 提醒生产者进行产出 这个状态在生产者发现队列已经满了的时候,会执行await方法进入对象的等待池 在消费者消费完一个数据时,会执行signal方法,通知进入对象等待池的生产者线程进入对象的锁池进行抢锁产出

    private final Condition notFull = putLock.newCondition();

    生产者跟消费者的设计

    生产者消费者本身是个线程, 所以继承Thread或者实现Runable接口就好了, 然后run方法只需对队列(构造函数传入)进行不断的put/take 操作就好了

    编码

    MyBlockingQueue

    import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author Jiangjiaze * @version Id: MyBlockingQueue.java, v 0.1 2017/3/8 0:21 FancyKong Exp $$ */ public class MyBlockingQueue<E> { private final Lock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final Lock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); private Entry<E> head; private Entry<E> tail; private AtomicInteger size = new AtomicInteger(0); private int ca; MyBlockingQueue(int ca) { this.ca = ca; Entry<E> e = new Entry<>(); head = e; tail = e; } private static class Entry<E> { E datas; Entry<E> next; } public void put(E e) throws InterruptedException { putLock.lock(); try { while (size.intValue() == ca) { System.out.println("厕所满了"); notFull.await(); }//之所以要用while是因为可能我醒过来之后,厕所有可能又满了. tail.datas = e; tail.next = new Entry<>(); tail = tail.next; if(size.getAndIncrement() +1 <= ca){ notFull.signal(); } } finally { putLock.unlock(); } //拉完要叫人来吃 signalConsumers(); } private void signalConsumers() { takeLock.lock(); try { notEmpty.signal(); }finally { takeLock.unlock(); } } public E take() throws InterruptedException { takeLock.lock(); E toReturn; try{ while(size.intValue()==0){ System.out.println("都吃完了"); notEmpty.await();//没有元素,不能吃 } toReturn = head.datas; head = head.next; if(size.getAndDecrement() -1 > 0){ //我吃完都还有剩下 notEmpty.signal(); } }finally { takeLock.unlock(); } //吃完之后呢,我得顺便通知一下生产者继续拉 if(toReturn != null) signalProducer(); return toReturn; } private void signalProducer() { this.putLock.lock(); try { notFull.signal(); // 不空啦,快来拉 }finally { this.putLock.unlock(); } } }

    MyConsumer

    import com.fancy.prodcons.Message; /** * @author Jiangjiaze * @version Id: MyConsumer.java, v 0.1 2017/3/8 0:57 FancyKong Exp $$ */ public class MyConsumer extends Thread{ public static void main(String[] args) { //测试的客户端 MyBlockingQueue<Message> queue = new MyBlockingQueue<>(10); new MyConsumer("消费者1",queue).start(); new MyProducer("生产者1",queue).start(); new MyProducer("生产者2",queue).start(); new MyProducer("生产者3",queue).start(); /*new MyConsumer("消费者2",queue).start(); new MyConsumer("消费者3",queue).start(); new MyConsumer("消费者4",queue).start(); new MyConsumer("消费者5",queue).start(); new MyConsumer("消费者6",queue).start(); new MyConsumer("消费者7",queue).start();*/ } private MyBlockingQueue<Message> queue; MyConsumer(String name,MyBlockingQueue<Message> queue){ setName(name); this.queue = queue; } @Override public void run() { while (true) try { sleep(100); //消费需要时间100ms Message m = queue.take(); System.out.println(getName()+"吃掉了"+m); } catch (InterruptedException e) { e.printStackTrace(); } } }

    MyProducer

    import com.fancy.prodcons.Message; /** * @author Jiangjiaze * @version Id: Producer.java, v 0.1 2017/3/1 17:32 FancyKong Exp $$ */ public class MyProducer extends Thread{ MyBlockingQueue<Message> blockingQueue; public MyProducer(String name,MyBlockingQueue<Message> blockingQueue) { this.blockingQueue = blockingQueue; setName(name); } @Override public void run() { while (true){ try { sleep(1000); //生产需要时间1000s Message e = new Message((int)(Math.random()*100),"屎"); blockingQueue.put(e); System.out.println(getName()+"拉了"+e); } catch (InterruptedException e) { e.printStackTrace(); } } } }
    转载请注明原文地址: https://ju.6miu.com/read-3749.html

    最新回复(0)