http://www.xuebuyuan.com/2736871.html
//一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。 //同步队列没有任何内部容量。 //底层两种数据结构:队列(实现公平策略)和栈(实现非公平策略),队列与栈都是通过链表来实现的。 /**队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。 * 数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。*/ public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; abstract static class Transferer<E> { // 转移数据,put或者take操作 abstract E transfer(E e, boolean timed, long nanos); } /** The number of CPUs, for spin control */ static final int NCPUS = Runtime.getRuntime().availableProcessors(); static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; static final int maxUntimedSpins = maxTimedSpins * 16; static final long spinForTimeoutThreshold = 1000L; /** Dual stack */ static final class TransferStack<E> extends Transferer<E> { // 表示消费数据的消费者 static final int REQUEST = 0; // 表示生产数据的生产者 static final int DATA = 1; // 表示匹配另一个生产者或消费者 static final int FULFILLING = 2; static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** Node class for TransferStacks. */ static final class SNode { volatile SNode next; // 后继结点 volatile SNode match; // 相匹配的结点 volatile Thread waiter; // 等待线程 Object item; // 元素 int mode; //模式 volatile SNode head; SNode(Object item) { this.item = item; } boolean tryMatch(SNode s) { // 本结点的match域为null并且比较并替换match成功 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter;// 获取本节点的等待线程 if (w != null) { // 等待线程非null,则唤醒线程 waiter = null; LockSupport.unpark(w); } return true; } // 如果match不为null或者CAS设置失败,则比较match域是否等于s结点,若相等,则表示已经匹配过 return match == s; } } /** * Tries to cancel a wait by matching node to itself. */ void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; } /** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // 根据e确定此次转移的模式(put or take) int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head;//保存头结点 if (h == null || h.mode == mode) {// 头结点为null或者头结点的模式与此次转移的模式相同 if (timed && nanos <= 0) {// 设置了timed并且等待时间小于等于0,表示不能等待,需要立即操作 if (h != null && h.isCancelled())// 头结点不为null并且头结点被取消 casHead(h, h.next); else return null; } // 生成一个SNode结点;将原来的head头结点设置为该结点的next结点;将head头结点设置为该结点 else if (casHead(h, s = snode(s, e, h, mode))) { // 空旋或者阻塞直到s结点被FulFill操作所匹配 SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } // h重新赋值为head头结点且不为null;头结点的next域为s结点,表示有结点插入到s结点之前,完成了匹配 if ((h = head) != null && h.next == s) casHead(h, s.next); return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // 没有FULFILLING标记,尝试匹配 if (h.isCancelled()) // 已经被取消 casHead(h, h.next);//弹出头结点 // 生成一个SNode结点;将原来的head头结点设置为该结点的next结点;将head头结点设置为该结点 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { SNode m = s.next; if (m == null) { // all waiters are gone casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) {// 尝试匹配且成功 casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // 头结点正在匹配 SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // m与h匹配成功 casHead(h, mn); // pop both h and m else //匹配失败 h.casNext(m, mn); //h的next域指向mn,移除m结点 } } } } /** * Spins/blocks until node s is matched by a fulfill operation. * 当前线程自旋或阻塞,直到结点被匹配 * @param s the waiting node * @param timed true if timed wait * @param nanos timeout value * @return matched node, or s if cancelled */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted())// 当前线程被中断 s.tryCancel();//取消s结点 SNode m = s.match;// 获取s结点的match域 if (m != null)// m不为null,存在匹配结点m,返回m return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) {// 继续等待的时间小于等于0,等待超时 s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null)// 等待线程为null s.waiter = w; // 设置waiter为当前线程 else if (!timed)// 没有设置timed标识 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) // 继续等待的时间大于阈值 LockSupport.parkNanos(this, nanos); } } /** * Returns true if node s is at head or there is an active * fulfiller. */ boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); } } /** Dual Queue */ static final class TransferQueue<E> extends Transferer<E> { /** Node class for TransferQueue. */ static final class QNode { volatile QNode next; //后继结点 volatile Object item; volatile Thread waiter; //等待线程 final boolean isData; QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 取消本结点,将item域设置为自身 void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } boolean isCancelled() { return item == this; } // 是否不在队列中 boolean isOffList() { return next == this; } } //队列头结点 transient volatile QNode head; //队列尾结点 transient volatile QNode tail; // 指向一个取消的结点,当一个结点是最后插入队列时,当被取消时,它可能还没有离开队列 transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); head = h; tail = h; } /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next } /** * Tries to cas nt as new tail. */ void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } /** * Tries to CAS cleanMe slot. */ boolean casCleanMe(QNode cmp, QNode val) { return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); } /** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // 确定此次转移的类型(put or take) boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null)// 看到未初始化的头尾结点 continue; // spin // 头结点与尾结点相等或者尾结点的模式与当前结点模式相同 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail)// t不为尾结点,不一致,重试 continue; if (tn != null) { // tn不为null,有其他线程添加了tn结点 advanceTail(t, tn);// 设置新的尾结点为tn continue; } if (timed && nanos <= 0) // can't wait return null; if (s == null) // s为null s = new QNode(e, isData); if (!t.casNext(null, s)) // 设置t结点的next域s continue; advanceTail(t, s); // s设置新的尾结点 // 空旋或者阻塞直到s结点被匹配 Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // x与s相等,表示已经取消 clean(t, s); return null; } if (!s.isOffList()) { // s结点还没离开队列 advanceHead(t, s); // 设置新的头结点 if (x != null) s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else { //互补模式 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; Object x = m.item;// 获取m结点的元素域 if (isData == (x != null) || // m结点被匹配 x == m || // m结点被取消 !m.casItem(x, e)) { // CAS操作失败 advanceHead(h, m); // 队列头结点出队列,并重试 continue; } // 匹配成功,设置新的头结点 advanceHead(h, m); LockSupport.unpark(m.waiter);// unpark m结点对应的等待线程 return (x != null) ? (E)x : e; } } } /** * Spins/blocks until node s is fulfilled. * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */ Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 计算空旋时间 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; if (x != e)//item变化则返回数据 return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } /** * Gets rid of cancelled node s with original predecessor pred. */ void clean(QNode pred, QNode s) { s.waiter = null; // 设置等待线程为null /* * 在任何时候,最后插入的结点不能删除,为了满足这个条件 * 如果不能删除s结点,我们将s结点的前驱设置为cleanMe结点 * 删除之前保存的版本,至少s结点或者之前保存的结点能够被删除 * 所以最后总是会结束 */ while (pred.next == s) { // Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head if (hn != null && hn.isCancelled()) {// hn不为null并且hn被取消 advanceHead(h, hn);// 设置新的头结点 continue; } QNode t = tail; // Ensure consistent read for tail if (t == h)// 尾结点为头结点,表示队列为空 return; QNode tn = t.next; if (t != tail) continue; if (tn != null) { advanceTail(t, tn); continue; } if (s != t) { // s不为尾结点,移除s QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } } } }