《Java高并发程序设计》总结--3. JDK并发包

    xiaoxiao2021-03-25  158

    1.多线程的团队协作:同步控制 1)synchronized的功能扩展:重入锁 重入锁可以完全替代synchronized关键字。在JDK5.0的早期版本,重入锁的性能远远好于synchronized,但从JDK6.0开始,JDK在synchronized上做了大量的优化,使得两者的性能差距并不大。 重入锁使用java.util.concurrent.locks.ReentrantLock类来实现,如下代码: import java.util.concurrent.locks.ReentrantLock; public class ReenterLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for (int j = 0; j < 1000000; j++) { lock.lock(); try { i++; } finally { lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReenterLock tl = new ReenterLock(); Thread t1 = new Thread(tl); Thread t2 = new Thread(tl); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } } 与synchronized相比,重入锁有着显示的操作过程。开发人员必须手动指定何时加锁,何时释放锁。重入锁对逻辑控制的灵活性要远远好于synchronized。但值得注意的是,在退出临界区时,必须记得释放锁,否则,其他线程就没有机会访问临界区了。这种锁是可以反复进入的。 中断响应 重入锁可以提供中断处理的能力。对于synchronized来说,如果一个线程在等待锁,那么结果只有两种情况,那么它获得这把锁继续执行,要么它就保持等待。而使用重入锁,则提供另外一种可能,那就是线程可以被中断。也就是在等待的过程中,程序可以根据需要取消对锁的请求。下面的代码产生了一个死锁,但得益于锁中断,可以轻易地解决这个死锁。 import java.util.concurrent.locks.ReentrantLock; public class IntLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public IntLock(int lock) { this.lock = lock; } @Override public void run() { try { if(lock == 1) { lock1.lockInterruptibly(); try { Thread.sleep(500); } catch (InterruptedException e) { lock2.lockInterruptibly(); } } else { lock2.lockInterruptibly(); try { Thread.sleep(500); } catch (InterruptedException e) { lock1.lockInterruptibly(); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { if(lock1.isHeldByCurrentThread()) lock1.unlock(); if(lock2.isHeldByCurrentThread()) lock2.unlock(); System.out.println(Thread.currentThread().getId()+":线程退出"); } } public static void main(String[] args) throws InterruptedException { IntLock r1 = new IntLock(1); IntLock r2 = new IntLock(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); Thread.sleep(1000); t2.interrupt(); } } 线程t1和t2启动后,t1先占用lock1,再请求lock2;t2先占用lock2,再请求lock1。因此,很容易形成t1和t2之间的相互等待。这里,对锁的请求,统一用lockInterruptibly()方法。这是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中,可以响应中断。Thread.sleep(1000)表示主线程main处于休眠,此时,两个线程处于死锁状态;t2.interrupt()表示t2线程被中断,故t2会放弃对lock1的申请,同时释放lock2。这个操作导致t1线程可以顺利得到lock2而继续执行下去。 锁申请等待限时。 除了等待外部通知之外,要避免死锁还有另外一种方法,那就是限时等待。 public class TimeLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { try { if(lock.tryLock(5,TimeUnit.SECONDS)) { Thread.sleep(6000); } else { System.out.println("get lock failed"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if(lock.isHeldByCurrentThread()) lock.unlock(); } } public static void main(String[] args) { TimeLock tl = new TimeLock(); Thread t1 = new Thread(tl); Thread t2 = new Thread(tl); t1.start(); t2.start(); } } 在这里,tryLock()方法接收两个参数,一个表示等待时长,另外一个表示计时单位。这里的单位设置为秒,时长为5,表示线程在这个锁请求中,最多等待5秒。如果超过5秒还没有得到锁,就会返回false。如果成功获得锁,则返回true。 在本例中,由于占用锁的线程会持有锁长达6秒,故另一个线程无法在5秒的等待时间内获得锁,因此,请求会失败。 ReentrantLock.tryLock()方法也可以不带参数直接运行。在这种情况下,当前线程会尝试获得锁,如果锁并未被其他线程占用,则申请锁会成功,并立即返回true。如果锁被其他线程占用,则当前线程不会进行等待,而是立即返回false。这种模式不会引起线程等待,因此也不会产生死锁。下面演示了这种使用方式: public class TryLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public TryLock(int lock) { this.lock = lock; } @Override public void run() { if(lock == 1) { while(true) { if(lock1.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { } if(lock2.tryLock()) { try { System.out.println(Thread.currentThread().getId() + ":My Job done"); return; } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } } } else { while(true) { if(lock2.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { } if(lock1.tryLock()) { try { System.out.println(Thread.currentThread().getId() + ":My Job done"); return; } finally { lock1.unlock(); } } } finally { lock2.unlock(); } } } } } public static void main(String[] args) { TryLock r1 = new TryLock(1); TryLock r2 = new TryLock(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); } } 上述代码中,采用了非常容易死锁的加锁顺序。但是使用tryLock()后,这种情况大大改善了。由于线程不会傻傻地等待,而是不停地尝试,因此,只要执行足够长的时间,线程总是会得到所有需要的资源,从而正常执行。 公平锁 如果使用synchronized关键字进行锁控制,那么产生的锁就是非公平的。而重用锁允许我们对其公平性进行设置。它有一个如下的构造函数: public Reentrantlock(bool fair) 当参数fair为true时,表示锁是公平的。 下面的代码可以很好地突出公平锁的特点: public class FairLock implements Runnable { public static ReentrantLock fairLock = new ReentrantLock(true); @Override public void run() { while(true) { try { fairLock.lock(); System.out.println(Thread.currentThread().getName() + "获得锁"); } finally { fairLock.unlock(); } } } public static void main(String[] args) { FairLock r1 = new FairLock(); Thread t1 = new Thread(r1,"Thread_t1"); Thread t2 = new Thread(r1,"Thread_t2"); t1.start(); t2.start(); } }对于ReentrantLock的几个重要方法整理如下: lock():获得锁,如果锁已经被占用,则等待。 lockInterruptibly():获得锁,但优先响应中断。 tryLock():尝试获得锁,如果成功,返回true,失败返回false。该方法不等待,立即返回。 tryLock(long time,TimeUnit unit):在给定时间内尝试获得锁。 unlock():释放锁。 在重入锁的实现中,主要包含三个要素: 第一,是原子状态。原子状态使用CAS操作来存储当前锁的状态,判断锁是否已经被别的线程持有。 第二,是等待队列。所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作。 第三,是阻塞原语park()和unpark(),用来挂起和恢复线程。没有得到锁的线程将会被挂起。 2)重入锁的好搭档:Condition条件 它和wait()和notify()方法的作用是大致相同的。但是wait()和notify()方法是和synchronized关键字合作使用的,而Condition是与重入锁相关联的。通过Lock()接口的Condition newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。 Condition接口提供的基本方法如下: void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); 以上方法的含义如下: await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()或者signalAll ()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。 awaitUninterruptibly()方法与await()方法基本相同,但是它并不会在等待过程中响应中断。 signal()方法用于唤醒一个在等待中的线程。相对于signalAll()方法会唤醒所有在等待中的线程。这和Object.notify()方法很类似。 下面代码简单地演示了Condition的功能: public class ReenterLockCondition implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println("Thread is going on"); } catch (InterruptedException e) { // TODO: handle exception } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { ReenterLockCondition tl = new ReenterLockCondition(); Thread t1 = new Thread(tl); t1.start(); Thread.sleep(2000); lock.lock(); condition.signal(); lock.unlock(); } } 当线程使用Condition.await()时,要求线程持有相关的重入锁,在Condition.await()调用后,这个线程会释放这把锁。同理,在Condition.siginal()方法调用时,也要求线程先获得相关的锁。在signal()方法调用后,系统会从当前Condition对象的等待队列中,唤醒一个线程。一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取,就可以继续执行。 3)允许多个线程同时访问:信号量(Semaphore) 信号量为多线程协作提供了更为强大的控制方法。信号量可以指定多个线程,同时访问某一资源。信号量主要提供了以下构造函数: public Semaphore(int permits) public Semaphore(int permits, boolean fair) 在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于制定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑方法有: public void acquire() public void acquireUninterruptibly() tryAcquire() public boolean tryAcquire(long timeout, TimeUnit unit) public void release() acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可后者当前线程被中断。acquireUninterruptibly()方法和acquire()方法类似,但是不响应中断。tryAcquire()尝试获得一个许可,如果成功返回true,失败则返回false,它不会进行等待,立即返回。release()用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。 4)ReadWriteLock 读写锁 ReadWriteLock是JDK5中提供的读写分离锁。读写分离锁可以有效地帮助减少锁竞争,以提升系统性能。 读写锁的访问约束如下表所示。  读写读非阻塞阻塞写阻塞阻塞 代码示例如下: public class ReadWriteLockDemo { private static Lock lock = new ReentrantLock(); private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private static Lock readLock = readWriteLock.readLock(); private static Lock writeLock = readWriteLock.writeLock(); private int value; public Object handleRead(Lock lock) throws InterruptedException { try { //模拟读操作 lock.lock(); Thread.sleep(1000); return value; } finally { lock.unlock(); } } public void handleWrite(Lock lock,int index) throws InterruptedException { try { //模拟写操作 lock.lock(); Thread.sleep(1000); value = index; } finally { lock.unlock(); } } public static void main(String[] args) { final ReadWriteLockDemo demo = new ReadWriteLockDemo(); Runnable readRunnable = new Runnable() { @Override public void run() { try { demo.handleRead(readLock); demo.handleRead(lock); } catch (InterruptedException e) { e.printStackTrace(); } } }; Runnable writeRunnable = new Runnable() { public void run() { try { demo.handleWrite(writeLock, new Random().nextInt()); } catch (InterruptedException e) { e.printStackTrace(); } } }; for(int i=0; i<18; i++) { new Thread(readRunnable).start(); } for(int i=18; i<20; i++) { new Thread(writeRunnable).start();; } } } 5)倒计时器:CountDownLatch 这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。CountDownLatch的构造函数接收一个整数作为参数,即当前这个计数器的计数个数。下面的例子演示了CountDownLatch的使用。 public class CountDownLatchDemo implements Runnable { static final CountDownLatch end = new CountDownLatch(10); static final CountDownLatchDemo demo = new CountDownLatchDemo(); @Override public void run() { try { Thread.sleep(new Random().nextInt(10)*1000); System.out.println("check complete"); end.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { exec.submit(demo); } end.await(); System.out.println("Fire!"); exec.shutdown(); } } 6)循环栅栏:CyclicBarrier  CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。   CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。   CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。 在所有参与者都已经在此 barrier 上调用 await方法之前,将一直等待。如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态: 最后一个线程到达;或者 其他某个线程中断当前线程;或者 其他某个线程中断另一个等待线程;或者 其他某个线程在等待 barrier 时超时;或者 其他某个线程在此 barrier 上调用 reset()。 下面的示例使用CyclicBarrier演示了司令命令士兵完成任务的场景: public class CyclicBarrierDemo { public static class Soldier implements Runnable { private String soldier; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic, String soldierName) { this.cyclic = cyclic; this.soldier = soldierName; } @Override public void run() { try { cyclic.await(); doWork(); cyclic.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } void doWork() { try { Thread.sleep(Math.abs(new Random().nextInt()000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier + ":任务完成"); } } public static class BarrierRun implements Runnable { boolean flag; int N; public BarrierRun(boolean flag, int N) { this.flag = flag; this.N = N; } @Override public void run() { if(flag) { System.out.println("司令:[士兵" + N + "个,任务完成!]"); } else { System.out.println("司令:[士兵" + N + "个,集合完毕!]"); flag = true; } } } public static void main(String[] args) { final int N = 10; Thread[] allSoldier = new Thread[N]; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); System.out.println("集合队伍!"); for (int i = 0; i < N; i++) { System.out.println("士兵" + i + "报道!"); allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i)); allSoldier[i].start(); } } } 如果当前线程: 在进入此方法时已经设置了该线程的中断状态;或者 在等待时被中断 则抛出 InterruptedException,并且清除当前线程的已中断状态。如果在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出BrokenBarrierException 异常。 如果任何线程在等待时被中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。 7)线程阻塞工具类:LockSupport LockSupport是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。和Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。 LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法。他们实现了一个限时的等待。 LockSupport类使用类似信号量的机制。它为每一个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可(也就是将许可变为不可用),如果许可不可用,就会阻塞。 示例代码如下: public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name) { super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); LockSupport.park(); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); } } 除了有定时阻塞的功能外,LockSupport.park()还能支持中断影响。但是和其他接收中断的函数很不一样,LockSupport.park()不会抛出InterruptedException异常。它只是会默默的返回,但是我们可以从Thread.interrupted()等方法中断标记。 public class LockSupportIntDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name) { super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); LockSupport.park(); if(Thread.interrupted()) { System.out.println(getName() + "被中断了"); } } System.out.println(getName()+"执行结束"); } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); t1.interrupt(); LockSupport.unpark(t2); } } 2. 线程复用:线程池 1)什么是线程池 为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。线程池中,总有那么几个活跃线程。当你需要使用线程时,可以从池子中随便拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将整个线程退回到池子,方便其他人使用。 2)JDK对线程池的支持 为了能后更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制,其本质就是一个线程池。它的核心成员如图所示: 以上成员均在java.util.concurrent包中,是JDK并发包的核心类。其中ThreadPoolExecutor表示一个线程池。Executors类则扮演这线程池工厂的角色。通过Executors可以取得一个拥有特定功能的线程池。ThreadPoolExecutor类实现了Executor接口,因此通过这个接口,任何Runnable的对象都可以被ThreadPoolExecutor线程池调度。 Executor框架提供了各种类型的线程池,主要有以下工厂方法。 public static ExecutorService newFixedThreadPool(int nThreads)public static ExecutorService newSingleThreadExecutor()public static ExecutorService newCachedThreadPool()public static ScheduledExecutorService newSingleThreadScheduledExecutor()public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 以上方法返回了具有不同工作特性的线程池,具体说明如下:  newFixedThreadPool,返回一个固定数量的线程池。当一个新任务提交时,如果有空闲线程,则执行。否则新任务暂存在一个任务队列中,待有空闲时,便处理在任务队列中的任务。  newSingleThreadExecutor,返回一个线程的线程池。当多余一个新任务提交时,会暂存在一个任务队列中,待有空闲时,按先入先出的顺序处理在任务队列中的任务。  newCachedThreadPool,返回一个可根据实际情况调整线程数量的线程池,线程数量不确定,若有空闲,则会有限复用线程。否则创建新线程处理任务。所有线程在当前任务执行完后,将返回线程池待复用。  newSingleThreadScheduledExecutor,返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService在Executor接口之上扩展了在给定时间执行某任务的功能。如果在某个固定的延时之后执行,或周期性执行某个任务。可以用这个工厂。  newScheduledThreadPool,返回一个ScheduledExecutorService对象,但该线程可以指定线程数量。 1. 固定大小的线程池 以newFixedThreadPool()为例。 public class ThredPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ExecutorService es = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { es.submit(task); } } } 2. 计划任务 另一个值得注意的方法是newScheduledThreadPool()。它返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。它的主要方法如下: //在给定的实际,对任务进行一次调度public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);//对任务进行周期性调度,任务调度的频率一定的,它是以上一个任务开始执行时间为起点,之后的period时间后调度下一次任务。public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//对任务进行周期性调度,在上一个任务结束后,再经过delay长的时间进行任务调度。public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); 这里给出了三个方法。方法schedule()会在给定时间,对任务进行一次调度。方法scheduleAtFixedRate()和方法scheduleWithFixedDelay()会对任务进行周期性的调度,但是两者有一定的区别。对于FixedRate方式来说,任务调度的频率是一定的。它是以上一个任务开始执行时间为起点,之后的period时间,调度下一次任务。而FixDelay则是在上一个任务结束后,再经过delay时间进行任务调度。 下面的例子使用scheduleAtFixedRate()方法调度一个任务。这个任务执行一秒钟时间,调度周期是2秒。也就是每2秒钟,任务就会被执行一次。 public class ScheduledExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(System.currentTimeMillis()/1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, 0, 2, TimeUnit.SECONDS); } } 如果任务的执行时间大于调度时间,那么任务就会在上一个任务结束后,立即被调用。如果采用scheduleWithFixedDelay(),执行时间为8秒,调度周期为2秒,那么任务的实际间隔是10秒。 3)核心线程池的内部实现 对于核心的几个线程池,无论是newFixedThreadPool()方法、newSingleThreadExecutor()还是newCachedThreadPool()方法,其内部实现均使用了ThreadPoolExecutor实现。下面给出三个线程池的实现方式: public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool(int nThreads) {      eturn new ThreadPoolExecutor(nThreads, nThreads,        0L, TimeUnit.MILLISECONDS,                      new LinkedBlockingQueue<Runnable>());      }   public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 由以上线程池实现代码可以看到,它们都只是ThreadPoolExecutor类的封装。ThreadPoolExecutor最重要的构造函数如下: public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 函数的参数含义如下: corePoolSize:指定了线程池中的线程数量。 maximumPoolSize:指定了线程池中的最大线程数量。 keepAliveTime:当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多次时间内会被销毁。 unit:keepAliveTime的单位。 workQueue:任务队列,被提交但尚未被执行的任务。 threadFactory:线程工厂,用于创建线程,一般用默认的即可。 handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。 参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用一下几种BlockingQueue。 1. 直接提交的队列:该功能由synchronousQueue对象提供,synchronousQueue对象是一个特殊的BlockingQueue。synchronousQueue没有容量,每一个插入操作都要等待一个响应的删除操作,反之每一个删除操作都要等待对应的插入操作。如果使用synchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲线程,则尝试创建线程,如果线程数量已经达到了最大值,则执行拒绝策略,因此,使用synchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。  2. 有界的任务队列:有界任务队列可以使用ArrayBlockingQueue实现。ArrayBlockingQueue构造函数必须带有一个容量参数,表示队列的最大容量。public ArrayBlockingQueue(int capacity)。当使用有界任务队列时,若有新任务需要执行时,如果线程池的实际线程数量小于corePoolSize,则会优先创建线程。若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见有界队列仅当在任务队列装满后,才可能将线程数量提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。  3. 无界的任务队列:无界队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,无界队列的任务队列不存在任务入队失败的情况。若有新任务需要执行时,如果线程池的实际线程数量小于corePoolSize,则会优先创建线程执行。但当系统的线程数量达到corePoolSize后就不再创建了,这里和有界任务队列是有明显区别的。若后续还有新任务加入,而又没有空闲线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,知道耗尽系统内存。  4. 优先任务队列:带有优先级别的队列,它通过PriorityBlokingQueue实现,可以控制任务执行的优先顺序。它是一个特殊的无界队列。无论是ArrayBlockingQueue还是LinkedBlockingQueue实现的队列,都是按照先进先出的算法处理任务,而PriorityBlokingQueue根据任务自身优先级顺序先后执行,在确保系统性能同时,也能很好的质量保证(总是确保高优先级的任务优先执行)。 newFixedThreadPool()方法的实现,它返回一个corePoolSize和maximumPoolSize一样的,并使用了LinkedBlockingQueue任务队列(无界队列)的线程池。当任务提交非常频繁时,该队列可能迅速膨胀,从而系统资源耗尽。  newSingleThreadExecutor()返回单线程线程池,是newFixedThreadPool()方法的退化,只是简单的将线程池数量设置为1。 newCachedThreadPool()方法返回corePoolSize为0而maximumPoolSize无穷大的线程池,这意味着没有任务的时候线程池内没有现场,而当任务提交时,该线程池使用空闲线程执行任务,若无空闲则将任务加入SynchronousQueue队列,而SynchronousQueue队列是直接提交队列,它总是破事线程池增加新的线程来执行任务。当任务执行完后由于corePoolSize为0,因此空闲线程在指定时间内(60s)被回收。对于newCachedThreadPool(),如果有大量任务提交,而任务又不那么快执行时,那么系统变回开启等量的线程处理,这样做法可能会很快耗尽系统的资源,因为它会增加无穷大数量的线程。 ThreadPoolExecutor的核心代码如下: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); /** * workerCountOf(c)获取当前线程池线程总数 * 当前线程数小于 corePoolSize核心线程数时,会将任务通过addWorker(command, true)方法直接调度执行。 * 否则进入下个if,将任务加入等待队列 **/ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } /** * workQueue.offer(command) 将任务加入等待队列。 * 如果加入失败(比如有界队列达到上限或者使用了synchronousQueue)则会执行else。 * **/ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } /** * addWorker(command, false)直接交给线程池, * 如果当前线程已达到maximumPoolSize,则提交失败执行reject()拒绝策略。 **/ else if (!addWorker(command, false)) reject(command); } 4)拒绝策略 线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。  JDK内置的拒绝策略如下:  1. AbortPolicy : 直接抛出异常,阻止系统正常运行。  2. CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。  3. DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。  4. DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。  以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际需要,完全可以自己扩展RejectedExecutionHandler接口。RejectedExecutionHandler的定义如下: public class RejectThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "is discard"); } }); for(int i=0; i<Integer.MAX_VALUE; i++) { es.submit(task); Thread.sleep(10); } } } 5)自定义线程创建:ThreadFactory ThreadFactory是一个接口,它只有一个方法,用来创建线程: Thread newThread(Runnable r); 下面的案例使用自定义的ThreadFactory,一方面记录了线程的创建,另一方面将所有的线程都设置为守护线程,这样,当主线程退出后,将会强制销毁线程池。 public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); System.out.println("create " + t); return t; } }); for(int i=0; i<5; i++) { es.submit(task); } Thread.sleep(2000); } 6)扩展线程池 ThreadPoolExecutor是一个可以扩展的线程池。它提供了beforeExecute()、afterExecute()、terminated()三个接口对线程池进行控制。这三个方法分别用于记录一个任务的开始、结束和整个线程池的退出。下面演示了对线程池的扩展,在这个拓展中,我们将记录每一个任务的执行日志。 public class ExtThreadPool { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId() + ",Task Name=" + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("准备执行:" + ((MyTask)r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("执行完成:" + ((MyTask)r).name); } @Override protected void terminated() { System.out.println("线程池退出"); } }; for(int i=0; i<5; i++) { MyTask task = new MyTask("TASK-GEYM-" + i); es.execute(task); Thread.sleep(10); } es.shutdown(); } } 7)优化线程池线程数量 线程池的大小对系统性能有一定的影响,过大或过小的线程数量都无法发挥最优的系统性能,因此要避免极大和极小两种情况。  在《java Concurrency in Practice》中给出了一个估算线程池大小的经验公式:  Ncpu = CPU数量  Ucpu = 目标CPU的使用率(0 ≤ Ucpu ≤ 1 )  W/C = 等待时间与计算时间的比率  最优的池大小等于  Nthreads = Ncpu * Ucpu * (1+W/C)  在java中可以通过Runtime.getRuntime().availableProcessors()取得可用CPU数量。 8)在线程池中寻找堆栈 向线程池讨回异常堆栈的方法。一种最简单的方法,就是放弃submit(),改用execute()。将任务提交的代码改成: pools.execute(new DivTask(100,i)); 或者使用下面的方法改造submit(): Future re = pools.submit(new DivTask(100,i)); re.get(); 上述方法,从异常堆栈中只能知道异常在哪里抛出的。还希望知道这个任务在哪里提交的,需要我们扩展ThreadPoolExecutor线程池,让它在调度之前,先保存一下提交任务线程的堆栈信息。如下所示: public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) { super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue); } @Override public void execute(Runnable task) { // TODO Auto-generated method stub super.execute(wrap(task, clientTrace(), Thread.currentThread().getName())); } @Override public Future<?> submit(Runnable task) { // TODO Auto-generated method stub return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private Exception clientTrace() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task,final Exception clientStack, String clientThreadName) { return new Runnable() { @Override public void run() { try { task.run(); } catch (Exception e) { clientStack.printStackTrace(); try { throw e; } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } }; } public static class DivTask implements Runnable { int a,b; public DivTask(int a,int b) { this.a = a; this.b = b; } @Override public void run() { double re = a/b; System.out.println(re); } } public static void main(String[] args) { ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for(int i=0; i<5; i++) pools.execute(new DivTask(100, i)); } } Fork/Join框架   fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。   fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。   fork/join框架的核心是 ForkJoinPool类,该类继承了AbstractExecutorService类。ForkJoinPool实现了工作窃取算法并且能够执行  ForkJoinTask任务。 在实际使用中,如果毫无顾忌地使用fork()开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK中,给出了一个ForkJoinPool线程池,对于fork()方法并不急于开启线程,而是提交给ForkJoinPool()线程池进行处理,以节省系统资源。可以向ForkJoinPool线程池提交一个ForkJoinTask任务。所谓ForkJoinTask任务就是支持fork()分解以及join()等待的任务。ForkJoinTask有两个重要的子类,RecursiveAction和RecursiveTask。它们分别代表没有返回值的任务和可以携带返回值的任务。 在使用fork/join框架之前,我们需要先对任务进行分割,任务分割代码应该跟下面的伪代码类似: if (任务足够小){ 直接执行该任务; }else{ 将任务一分为二; 执行这两个任务并等待结果; } 下面简单地展示Fork/Join框架的使用,这里用来计算数列求和。 public class CountTask extends RecursiveTask<Long> { private static final int THRESHOLD = 10000; private long start; private long end; public CountTask(long start,long end) { this.start = start; this.end = end; } public Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD; if(canCompute) { for(long i=start; i<=end; i++) { sum += i; } } else { long step = (start + end)/100; ArrayList<CountTask> subTasks = new ArrayList<CountTask>(); long pos = start; for(int i=0; i<100; i++) { long lastOne = pos + step; if(lastOne > end) lastOne = end; CountTask subTask = new CountTask(pos, lastOne); pos += step+1; subTasks.add(subTask); subTask.fork(); } for(CountTask t:subTasks) { sum += t.join(); } } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0,200000L); ForkJoinTask<Long> result = forkJoinPool.submit(task); try { long res = result.get(); System.out.println("sum="+res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } 在使用ForkJoin时需要注意,如果任务的划分层次很深,一直得不到返回,那么可能出现两种情况:第一,系统内的线程数量越积越多,导致性能严重下降。第二,函数的调用层次变得很深,最终导致栈溢出。 此外,ForkJoin线程池使用一个无锁是栈来管理空闲线程。如果一个工作线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中。待将来有任务可用时,再从栈中唤醒这些线程。 2. JDK的并发容器 1)并发集合简介 JDK提供的并发容器大部分在java.util.concurrent包中。如下所示: ConcurrentHashMap : 一个高效的线程安全的HashMap。 CopyOnWriteArrayList : 在读多写少的场景中,性能非常好,远远高于vector。 ConcurrentLinkedQueue : 高效并发队列,使用链表实现,可以看成线程安全的LinkedList。 BlockingQueue : 一个接口,JDK内部通过链表,数组等方式实现了这个接口,表示阻塞队列,非常适合用作数据共享通道。 ConcurrentSkipListMap : 跳表的实现,这是一个Map,使用跳表数据结构进行快速查找。 2)线程安全的HashMap 如果需要一个线程安全的HashMap,一种可行的方法是使用Collections.synchronizedMap()方法包装HashMap。如下代码,产生的HashMap就是线程安全的: public static Map m = Collections.synchronizedMap(new HashMap()); Collections.synchronizedMap()会生成一个名为SynchronizedMap的Map。它使用委托,将自己所有Map相关的功能交给传入的HashMap实现,而自己则主要负责保证线程安全。 具体参考下面的实现,首先SynchronizedMap内包装 private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize ...... public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } ......} 其他所有相关的Map操作都会使用这个mutex进行同步。从而实现线程安全。 这个包装的Map可以满足线程安全的要求。但是,它在多线程环境中的性能表现并不算太好。无论是对Map的读取或者写入,都需要获得mutex的锁,这会导致所有对Map的操作全部进入等待状态,直到mutex锁可用。 一个更加专业的并发HashMap是ConcurrentHashMap。它位于java.util.concurrent包内。它专门为并发进行了性能优化,因此,更加适合多线程的场合。 3)有关List的线程安全 在Java中,ArrayList和Vector都是使用数组作为其内部实现。两者最大的不同在于Vector是线程安全的,而ArrayList不是。此外,LinkedList使用链表的数据结构实现了List。但LinkedList并不是线程安全的,不过参考前面对HashMap的包装,在这里我们也可以使用Collections.synchronizedList()方法来包装任意List。如下所示: public static List<String> l = Collections.synchronizedList(new LinkedList<String>()); 此时生成的List对象就是线程安全的。 4)高效读写的队列:深度剖析ConcurrentLinkedQueue 在JDK中提供了一个ConcurrentLinkedQueue类用来实现高并发的队列。 作为一个链表,需要定义有关链表内的节点,在ConcurrentLinkedQueue中,定义的节点Node核心如下: private static class Node<E> { volatile E item; volatile Node<E> next; } 其中item是用来表示目标元素的,字段next表示当前Node的下一个元素。 对Node进行操作时,使用了CAS操作。 boolean casItem(E cmp,E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val) } boolean casNext(Node<E> cmp,Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } 方法casItem()表示设置当前Node的item值。它需要两个参数,第一个参数为期望值,第二个参数为设置目标值。当当前值等于cmp期望值时,就会将目标设置为val。同样,casNext()方法也是类似的,但它是用来设置next字段。 如下图所示,显示了插入时,tail的更新情况,可以看到tail的更新会产生滞后,并且每次更新会跳跃两个元素。 第一步添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。 第二步添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。 第三步添加元素3,设置tail节点的next节点为元素3节点。 第四步添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。 通过debug入队过程并观察head节点和tail节点的变化,发现入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助。 下面为ConcurrentLinkedQueue中向队列中添加元素的offer()方法。 01 public boolean offer(E e) { 02 checkNotNull(e); 02 final Node<E> newNode = new Node<E>(e); 04 05 for (Node<E> t = tail, p = t;;) { 06 Node<E> q = p.next; 07 if (q == null) { 08 //p是最后一个节点 09 if (p.casNext(null, newNode)) { 10 //每2次,更新下一个tail 11 if (p != t) 12 casTail(t, newNode); 13 return true; 14 } 15 //CAS竞争失败,再次尝试 16 } 17 else if (p == q) 18 //遇到哨兵节点,从head开始遍历 19 //但如果tail被修改,则使用tail(因为可能被修改正确) 20 p = (t != (t = tail)) ? t : head; 21 else 22 //取下一个节点或者最后一个节点 23 p = (p != t && t != (t = tail)) ? t : q; 24 } 25 } 首先值得注意的是,这个方法没有任何锁操作。线程安全完全由CAS操作和队列的算法来保证。整个方法的核心是for循环,这个循环没有出口,直到尝试成功,这也符合CAS操作的流程。当第一次加入元素时,由于队列为空,因此p.next为null。流程进入第8行。并将p的next节点赋值为newNode,也就是将新的元素加入到队列中。此时,p==t成立,因此不会执行第12行的代码更新tail末尾。如果casNext()成功,程序直接返回,如果失败,则再进行一次循环尝试,直到成功。因此增加一个元素后,tail并不会被更新。 当程序试图增加第2个元素时,由于t还在head的位置上,因此p.next指向实际的第一个元素,因此第6行的q!=null,这表示q不是最后的节点。由于往队列中增加元素需要最后一个节点的位置,因此,循环开始查找最后一个节点。于是,程序会进入第23行,获得最后一个节点。此时,p实际上是指向链表中的第一个元素,而它的next为null,故在第2个循环时,进入第8行。p更新自己的next,让它指向新加入的节点。如果成功,由于此时p!=t成功,则会更新t所在位置,将t移动到链表最后。 在第17行,处理了p==q的情况。这种情况是由于遇到了哨兵(sentinel)节点导致的。所谓哨兵节点,就是next指向自己的节点。这种节点在队列中的存在价值不大,主要表示要删除的节点,或者空节点。当遇到哨兵节点时,由于无法通过next取得后续的节点,因此很可能直接返回head,期望通过从链表头部开始遍历,进一步查找到链表末尾。但一旦发生在执行过程中,tail被其他线程修改的情况,则进行一次“打赌”,使用新的tail作为链表末尾(这样就避免了重新查找tail的开销)。 下面我们来看一下哨兵节点是如何产生的。弹出队列内的元素。其执行过程如下: 01 public E poll() { 02 restartFromHead: 03 for (;;) { 04 for (Node<E> h = head, p = h, q;;) { 05 E item = p.item; 06 if (item != null && p.casItem(item, null)) { 07 if (p != h) // hop two nodes at a time 08 updateHead(h, ((q = p.next) != null) ? q : p); 09 return item; 10 } 11 else if ((q = p.next) == null) { 12 updateHead(h, p); 13 return null; 14 } 15 else if (p == q) 16 continue restartFromHead; 17 else 18 p = q; 19 } 20 } 21 } 由于队列中只有一个元素,根据前文的描述,此时tail并没有更新,而是指向和head相同的位置。而此时,head本身的item域为null,其next为列表第一个元素。故在第一个循环中,代码直接进入第18行,将p赋值为q,而q就是p.next,也是当前列表中的第一个元素。接着,在第2轮循环中,p.item显然不为null,因此,代码应该可以进入第7行(如果CAS操作成功)。进入第7行,也意味着p的item域被设置为null(因为这是弹出元素,需要删除)。同时,此时p和h是不相等的(因为p已经指向原有的第一个元素了)。故执行了第8行的updateHead()操作,其实现如下: final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); } 可以看到,在updateHead中,就将p作为新的链表头部(通过casHead()实现),而原有的head就被设置为哨兵(通过lazySetNext()实现)。 这样一个哨兵节点就产生了,而由于此时原有的head头部和tail实际上是同一个元素。因此,再次offer()插入元素时,就会遇到这个tail,也就是哨兵。这就是offer()代码中,第17行判断的意义。 5)不变模式下的CopyOnWriteArrayList 为了将读取的性能发挥到极致,JDK中提供了CopyOnWriteArrayList类。对它来说,读取是完全不用加锁的,并且写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。当这个List需要修改时,并不修改原有的内容,而是对原有的数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据。这样就可以保证写操作不会影响读了。 下面的代码展示了有关读取的实现: private volatile transient Object[] array; public E get(int index) { return get(getArray(), index); } final Object[] getArray() { return array; } private E get(Object[] a, int index) { return (E) a[index]; } 读取代码没有任何同步控制和锁操作,因为内部数组array不会发生修改,只会被另外一个array替代,因此可以保证数据安全。 写操作如下: 01 public boolean add(E e) { 02 final ReentrantLock lock = this.lock; 03 lock.lock(); 04 try { 05 Object[] elements = getArray(); 06 int len = elements.length; 07 Object[] newElements = Arrays.copyOf(elements, len + 1); 08 newElements[len] = e; 09 setArray(newElements); 10 return true; 11 } finally { 12 lock.unlock(); 13 } 14 } 首先,写操作使用锁,这个锁仅限于控制写-写的情况。其重点在于第7行代码,进行了内部元素的完整复制。因此,会生成一个新的数组newElements。然后,将新的元素加入newElements。接着,在第9行,使用新的数组替换老的数组,修改就完成了。整个过程不会影响读取,并且修改完后,读取线程可以立即“察觉”到这个修改(因为array变量是volatile类型)。 6)数据共享通道:BlockingQueue 我们使用BlockingQueue进行多个线程间的数据共享。主要介绍ArrayBlockingQueue和LinkedBlockingQueue。ArrayBlockingQueue是基于数组实现的,而LinkedBlockingQueue基于链表。因此,ArrayBlockingQueue更适合做有界对垒,因为队列中可容纳的最大元素需要在队列创建时指定。而LinkedBlockingQueue适合做无界队列,或者那些边界值非常大的队列。 BlockingQueue让服务线程在队列为空时,进行等待,当有新的消息进入队列后,自动将线程唤醒。以ArrayBlockingQueue为例。 ArrayBlockingQueue的内部元素都放置在一个对象数组中: final Object[] items; 向队列中压入元素可以使用offer()方法和put()方法。对于offer()方法,如果当前队列已经满了,它就会立即返回false。如果没有满,则执行正常的入队操作。我们需要关注的是put()方法。put()方法也是将元素压入队列末尾。如果队列满了,它会一直等待,直到队列中有空闲的位置。 从队列中弹出元素可以使用poll()方法和take()方法。它们都是从队列的头部获得一个元素。不同之处在于:如果队列为空poll()方法直接返回null,而take()方法会等待,直到队列内有可用元素。 因此,put()方法和take()方法才是体现Blocking的关键。为了做好等待和通知两件事,在ArrayBlockingQueue内部定义了一下一些字段: final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; 当执行take()操作时,如果队列为空,则让当前线程等待在notEmpty上。新元素入队时,则进行一次notEmpty上的通知。 下面的代码显示了take()的过程: 01 public E take() throws InterruptedException { 02 final ReentrantLock lock = this.lock; 03 lock.lockInterruptibly(); 04 try { 05 while (count == 0) 06 notEmpty.await(); 07 return extract(); 08 } finally { 09 lock.unlock(); 10 } 11 } 第6行代码,就要求当前线程进行等待。当队列中有新元素时,线程会得到一个通知。下面是元素入队时的一段代码: 01 private void insert(E x) { 02 items[putIndex] = x; 03 putIndex = inc(putIndex); 04 ++count; 05 notEmpty.signal(); 06 } 注意第5行代码,当新元素进入队列后,需要通知等待在notEmpty上的线程,让它们继续工作。 同理,对于put()操作也是一样,当队列满时,需要让压入线程等待,如下面第7行。 01 public void put(E e) throws InterruptedException { 02 checkNotNull(e); 03 final ReentrantLock lock = this.lock; 04 lock.lockInterruptibly(); 05 try { 06 while (count == items.length) 07 notFull.await(); 08 insert(e); 09 } finally { 10 lock.unlock(); 11 } 12 } 当有元素从队列中被挪走,队列出现空位时,也需要通知等待入队的线程: 01 private E extract() { 02 final Object[] items = this.items; 03 E x = this.<E>cast(items[takeIndex]); 04 items[takeIndex] = null; 05 takeIndex = inc(takeIndex); 06 --count; 07 notFull.signal(); 08 return x; 09 } 上述代码表示从队列中拿走一个元素。当有空闲位置时,在第7行,通知等待入队的线程。 7)随机数据结构:跳表(SkipList) 跳表是一种可以用来快速查找的数据结构,有点类似平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。 跳表的另外一个特点是随机算法。跳表的本质是同时维护了多个链表,并且链表是分层的。如图所示。 最底层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集,一个元素插入哪些层是完全随机的。 跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始查找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续查找。也就是说在查找过程中,搜索是跳跃式的。 因此,跳表是一种使用空间换时间的算法。 使用跳表实现Map和使用哈希算法实现Map的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,会得到一个有序的结果。 实现这一数据结构的类是ConcurrentSkipListMap。下面展示了跳表的简单实用: Map<Integer, Integer> map = new ConcurrentSkipListMap<Integer, Integer>(); for(int i=0; i<30; i++) { map.put(i,i); } for(Map.Entry<Integer, Integer> entry : map.entrySet()) { System.out.println(entry.getKey()); } 和HashMap不同,对跳表的遍历输出是有序的。 跳表的内部实现有几个关键的数据结构组成。首先是Node,一个Node就是表示一个节点,里面含有两个重要的元素key和value(就是Map的key和value)。每个Node还会指向下一个Node,因此还有一个元素next。 static final class Node<K,V> { final K key; volatile Object value; volatile Node<K,V> next; 对Node的所有操作,使用的CAS方法: boolean casValue(Object cmp, Object val) { return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val); } boolean casNext(Node<K,V> cmp, Node<K,V> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } 方法casValue()用来设置value的值,相对的casNext()用来设置next的字段。 另外一个重要的数据结构是Index,表示索引。它内部包装了Node,同时增加了向下的引用和向右的引用。 static class Index<K,V> { final Node<K,V> node; final Index<K,V> down; volatile Index<K,V> right; 整个跳表是根据Index进行全网的组织的。 此外,对于每一层的表头,还需要记录当前处于哪一层。为此,还需要一个称为HeadIndex的数据结构,表示链表头部的第一个Index。它继承自Index。 static final class HeadIndex<K,V> extends Index<K,V> { final int level; HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) { super(node, down, right); this.level = level; } } 注:本篇博客内容摘自《Java高并发程序设计》
    转载请注明原文地址: https://ju.6miu.com/read-7642.html

    最新回复(0)