JDK并发包(concurrent) - 线程控制工具类

    xiaoxiao2021-03-25  129

    CountDownLatch

    是一个非常实用的多线程控制工具类,这个工具通常用来控制线程等待,他可以让某一个线程等待直到倒计时结束,在开始执行。

    就像火箭点火发射一样,为了确保发射的成功,在发射之前要进行一系列的检查,只有所有的检查都完成之后,才能进行发射,下面用CountDownLatch来模拟一下这个过程。

    package com.example.thread; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by mazhenhua on 2017/3/8. */ public class CountDownLatchDemo implements Runnable { static final CountDownLatch end = new CountDownLatch(10); static final CountDownLatchDemo demo = new CountDownLatchDemo(); @Override public void run() { try{ // 模拟检查任务 Thread.sleep(new Random().nextInt(10) * 1000); System.out.println("check complete"); end.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i ++){ exec.submit(demo); } //等待检查 end.await(); // 发射火箭 System.out.println("Fire!"); exec.shutdown(); } }

    代码中CountDownLatch 声明的计数数量为10个,也就是说只有10个线程都完成之后,等待CountDownLatch 的线程,才能继续往下运行。如果把for循环改成5,这个程序永远也停不下来,会一直等待下去。。。。。。

    CyclicBarrier

    CyclicBarrier是另外一种多线程并发控制实用工具。和CountDownLatch 非常类似,他也可以实现线程间的技术等待,但是比CountDownLatch 更加复杂且强大。 Cyclic意思是循环,也就是这个计数器可以循环使用。比如,假设我们将计数器设置为10,那么凑齐第一批10个线程之后,计数器就会归零,然后接着凑齐下一批,10个线程。

    CyclicBarrier的构造函数,还可以接受一个barrierAction,也就是说,当计数器计满一次后,所执行的动作。 例如:司令下达命令,需要召集10个士兵,然后分别执行10个任务,需要等到士兵集合完毕,才能下达具体的任务,需要10个任务都完成,才能宣布任务结束。

    package com.example.thread; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * Created by mazhenhua on 2017/3/8. */ public class CyclicBarrierDemo { public static class Soldier implements Runnable{ private String soldier; private final CyclicBarrier cyclic; Soldier(String soldier, CyclicBarrier cyclic) { this.soldier = soldier; this.cyclic = cyclic; } @Override public void run() { try { // 等待所有士兵到齐 cyclic.await(); doWork(); // 等待所有士兵完成工作 cyclic.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } void doWork(){ try { Thread.sleep(Math.abs(new Random().nextInt()%10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier + ": 任务完成"); } } public static class BarrierRun implements Runnable{ boolean flag; int N; public BarrierRun(boolean flag, int n) { this.flag = flag; N = n; } @Override public void run() { if (flag){ System.out.println("司令:[士兵" + N + "个,任务完成!]"); } else { System.out.println("司令:[士兵" + N + "个,集合完毕!]"); flag = true; } } } public static void main(String[] args) throws InterruptedException { final int N = 10; Thread[] allSoldier = new Thread[N]; boolean flag = false; CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new BarrierRun(flag, N)); // 设置屏障点,主要是为了执行这个方法 System.out.println("集合队伍"); for (int i = 0; i < N; ++i) { System.out.println("士兵" + i + "报道!"); allSoldier[i] = new Thread(new Soldier("士兵" + i, cyclicBarrier)); allSoldier[i].start(); } } }

    输出结果:

    CyclicBarrier.await()方法可能会抛出2个异常,一个是InterruptedException,也就是等待过程中可能被中断,应该说这是一个非常通用的异常。大部分迫使线程等待的方法都会抛出这个异常,是的线程在等待是时,依然可以响应外部的紧急事件。另外一个就是BrokenBarrierException,一旦遇到这个异常,则表示当前的CyclicBarrier 已经损坏了,可能系统无法等待所有线程集齐,继续等待也是徒劳。

    线程阻塞工具类:LockSupport

    LockSupport是一个非常方便实用的线程阻塞工具类,他可以在线程内任意位置让线程阻塞,和Thread.suspend()相比,它填补了resume()在线程挂起之前执行,而导致的线程无法继续执行的情况。和Object.wait()相比,他又不需要先获得对象锁,也不会抛出InterruptedException异常。

    如果看过我前面的博客的话,应该还记得在介绍线程挂起的时候,出现了死活的问题吧。现在我们用LockSupport重写这个程序,上代码:

    package com.example.thread; import java.util.concurrent.locks.LockSupport; /** * Created by mazhenhua on 2017/3/8. */ public class LockSupportDemo { public static Object object = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread{ public ChangeObjectThread(String name) { super.setName(name); } @Override public void run() { synchronized (object){ System.out.println("in " + getName()); LockSupport.park(); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); } }

    执行结果:

    执行这段代码,你会发现自始至终都能够正常结束,并不会应为unpark()发生在park()之前而导致永久性挂起。这是因为LockSupport类使用了类似信号量的机制,它为每一个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可,如果许可不可用就会阻塞。而unpark()则是是的一个许可变为可用,与信号量不同的是,许可不能累加,一个线程,不可能拥有超过一个的许可。

    这个特点使得,及时unpark()发生在park()之前,他也可以使得下一次的park()操作立即返回。

    在使用suspend()导致线程永久性挂起时,查看线程的状态依然是RUNNABLE状态。如果是park()挂起的线程会非常明确的给出WATING状态,而且会标明是park引起的,这样对查问题非常的有帮助。

    转载请注明原文地址: https://ju.6miu.com/read-10141.html

    最新回复(0)