java并发编程——Condition(waitsignalnotify的等待-通知模式 )

    xiaoxiao2021-12-14  17

    文章目录

    是什么使用场景怎样用使用实例:源码剖析await()signal

    是什么

    与Object.wait()\Object.notify()功能很类似。

    以AQS非静态内部类的方式实现,因此Condition初始化的前提是先要有Lock实例,并且要先获取到锁。

    使用场景

    需要进程之间有协作的场景,典型的如等待-通知模型,生产-消费模型,比如有界队列:队列满了需要阻塞插入元素的线程,删除操作线程完成后唤醒被阻塞的插入操作线程。队列空了,需要阻塞删除操作线程,当有插入操作线程完成时唤醒被阻塞的删除操作线程;

    怎样用

    对比于Object的wait/notify的典型使用方式:

    Object wait/notify的使用:

    //Object监视器 wait/notify threadA: synchronized(theObject){//必须先要获得monitor锁(对象锁) while(!flag) theObject.wait(); doSomethings(); } threadB: synchronized(theObject){ ... flag=true; theObject.notify(); }

    Lock.Conodition await/signal的使用:

    //Conodition await/signal Condition condition=lock.newConditon(); threadA: lock.lock() try{ while(!flag) condition.await(); doSomeThings(); }catch{ }finally{ Lock.unlock() } threadB: lock.lock() try{ ... flag=true; condition.signal(); }catch{ }finally{ Lock.unlock() }

    使用实例:

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Food { private boolean isBoiled = false; private Lock lock = new ReentrantLock(); // 单个Lock产生一个Condition对象,用来管理任务间的通信 private Condition condition = lock.newCondition(); @Override public String toString() { return "Food [isBoiled=" + isBoiled + "]"; } public void cook() { // 显示锁必须紧跟try-finally,调用 await()\signalAll()都必须获得锁,最后finally中释放 lock.lock(); try { isBoiled = true; System.out.println("cook "); condition.signalAll(); } finally { lock.unlock(); } // isBoiled = true; // notifyAll(); } public void eat() { lock.lock(); try { isBoiled = false; System.out.println("eat "); condition.signalAll(); } finally { lock.unlock(); } // isBoiled = false; // notifyAll(); } public void waitForEat() throws InterruptedException { lock.lock(); try { while (isBoiled == true) { condition.await(); } } finally { lock.unlock(); } // while (isBoiled == true) { // wait(); // // } } public void waitForCook() throws InterruptedException { lock.lock(); try { while (isBoiled == false) { condition.await(); } } finally { lock.unlock(); } // while (isBoiled == false) { // wait(); // } } } class Cook implements Runnable { private Food food; public Cook(Food food) { this.food = food; } @Override public void run() { try { while (!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(200); food.cook(); // System.out.println("Cook " + food); food.waitForEat(); } } catch (Exception e) { System.err.println("Cook task interrupted "); } System.out.println("Cook task off "); } } class Eater implements Runnable { private Food food; public Eater(Food food) { this.food = food; } @Override public void run() { try { while (!Thread.interrupted()) { food.waitForCook(); food.eat(); // System.out.println("Eater " + food); } } catch (Exception e) { System.err.println("Eater task interrupted "); } System.out.println("Eater task off "); } } public class CookAndEat { public static void main(String[] args) throws InterruptedException { Food food = new Food(); ExecutorService ex = Executors.newFixedThreadPool(2); ex.execute(new Eater(food)); ex.execute(new Cook(food)); ex.execute(new Eater(food)); ex.execute(new Cook(food)); TimeUnit.SECONDS.sleep(1); ex.shutdownNow(); } }

    源码剖析

    前提:lock获取到锁(lock.lcok()成功返回)

    await()

    public final void await() throws InterruptedException { if (Thread.interrupted())// 中断响应 throw new InterruptedException(); // addConditionWaiter: // 1.尾结点Cancelled状态的结点,那么遍历队Condition列中的所有结点,提出Cancelled的结点。 // 2.将当前线程构造为一个Condition结点,并加入队列尾部。 Node node = addConditionWaiter(); // 释放锁并且唤醒等待这个锁的后继结点,并返回这个结点进入await方法时锁的状态(正常情况下应该是>=1的) int savedState = fullyRelease(node); int interruptMode = 0; // 如果node在SyncQueue上,那么说明这个结点已经从Conditon队列中摘除,并放入了Sync队列。 // 也就是说对这个结点执行了Signal. while (!isOnSyncQueue(node)) { LockSupport.park(this);// 如果当前结点在conditon中不在sync队列中,阻塞,因为这个结点并没有其他线程对它执行signal if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 当收到另外一个线程的signal信号后,继续执行下列逻辑,重新开始正式竞争锁。同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// 当前线程再次获取锁&&不需要抛出异常 interruptMode = REINTERRUPT;// 如果获取锁acquireQueued时返回true表示该线程被中断过 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters();// 清理下条件队列中的不是在等待条件的节点 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();// 应该>=1 // AQS.release模板方法:CAS获取锁-修改state并唤醒后继结点. // 注意!!!: // 这里的逻辑是在Sync队列上操作的,当前线程执行Condition方法前提是获取了锁,那么当前线程应该是Sync队列的head结点。 // 这里release释放了当前结点(Sync.head)的锁,并在需要的情况下唤醒Sync.head的后继结点 if (release(savedState)) { failed = false; return savedState; } else {// 调用condition之前没有获取锁 throw new IllegalMonitorStateException(); } } finally { if (failed)// 如果失败,则将当前结点置为Cancelled。Condition中结点的状态只有CONDITION与CANCELLED以及再唤醒逻辑中置为0 node.waitStatus = Node.CANCELLED;// 对应addConditionWaiter中的unlinkCancelledWaiters } }

    总结 1.将当前线程构造为一个Condition结点,并加入Conditon队列尾部(lastWaiter.nextWaiter)。 Condition队列由三个Node引用构成: Node firstWaiter;Node lastWaiter;Node nextWaiter; 2.释放当前线程对应结点(当前线程结点在Sync队列上)的同步状态:释放锁 3.判断是否在Sync队列中,不在则阻塞。除次调用await构造的Conditon队列结点肯定不在Sync,每次调用一个对应的notify或者notifyAll才会把Conditon队列结点移动到Sync队列尾部。 那么换句话说:这里的阻塞,其实是在等待其他线程执行对应的notify方法。 4.如果其他线程F执行了notify方法,则这个结点从Conditon队列移入Sync队列。

    然后其他线程F释放了锁(释放锁逻辑中会唤醒后继结点,当然Conditon结点在Sync上也是要排队的),当前线程从park处被唤醒,再次判断,isOnSyncQueue==true,执行AQS.acquireQueued方法(当前线程已经在Sync队列上了),在Sync同步队里尝试获取锁(不赘述,详见AQS解析)** 也就是说,一个线程调用conditon.await()并返回的过程:这个线程会从Conditon队列移动到Sync队列,并且两次获取锁。

    signal

    public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //从Conditon队列头部开始唤醒结点(notifyAll会从头部依次唤醒所有Condition结点) if (first != null) doSignal(first); } private void doSignal(Node first) { do { // 从conditon队列中移除这个结点(Node first).即将放入Sync队列尾部 if ((firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 加入到sync队列尾部失败 &&结点移除失败 } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * 将Condition队列的头结点从Condition队列转移到同步队列上,为了保证Condition队列的结点再次同步获取锁 */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or attempt * to set waitStatus fails, wake up to resync (in which case the * waitStatus can be transiently and harmlessly wrong). * 拼接到队列中并且尝试设置前驱结点的waitStatus状态,用来表示线程在等待状态。 * 如果结点取消了或者尝试设置waitStatus失败,唤醒以再次同步 */ Node p = enq(node);// 结点插入Sync队列尾部,并返回这个结点的前驱结点(也就是之前的Tail) int ws = p.waitStatus; // 是取消状态||设置为Signal态没有成功(表示node前驱结点的状态异常:需要将node结点在sync队列中唤醒,重新进行同步操作) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

    总结: 1.从conditon队列头部移除结点(Node firstWaiter).即将放入Sync队列尾部: 2.从Conditon队列头部开始唤醒结点(firstWaiter): 3.将这个结点状态从CONDITION修改为0;放入Sync队列尾部,并将这个结点的前驱结点设为SIGNAL态。 4.执行完Conditon.signal,并且附带在finally中释放锁。释放锁这个逻辑里会唤醒后继结点,后继结点也会唤醒后继结点直到 后继是在Conditon上等待的线程,被唤醒。也就是说Condition从Condition队列移动到Sync队列,排队等待自己被唤醒

    signalAll对每个结点都做了一次唤醒、放入同步队列的操作;而signal只对**condition队列的firstwaiter(也就是等待时间最长的结点)**做了同步处理。

    转载请注明原文地址: https://ju.6miu.com/read-965303.html

    最新回复(0)