国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

圖解SynchronousQueue原理詳解-公平模式

jifei / 3590人閱讀

摘要:如果節(jié)點(diǎn)不為說(shuō)明已經(jīng)有其他線程進(jìn)行操作將節(jié)點(diǎn)替換為節(jié)點(diǎn)等待有消費(fèi)者消費(fèi)線程。如果頭節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)是當(dāng)前節(jié)點(diǎn)以防止其他線程已經(jīng)修改了節(jié)點(diǎn)則運(yùn)算,否則直接返回。

一、介紹

SynchronousQueue是一個(gè)雙棧雙隊(duì)列算法,無(wú)空間的隊(duì)列或棧,任何一個(gè)對(duì)SynchronousQueue寫(xiě)需要等到一個(gè)對(duì)SynchronousQueue的讀操作,反之亦然。一個(gè)讀操作需要等待一個(gè)寫(xiě)操作,相當(dāng)于是交換通道,提供者和消費(fèi)者是需要組隊(duì)完成工作,缺少一個(gè)將會(huì)阻塞線程,知道等到配對(duì)為止。

SynchronousQueue是一個(gè)隊(duì)列和棧算法實(shí)現(xiàn),在SynchronousQueue中雙隊(duì)列FIFO提供公平模式,而雙棧LIFO提供的則是非公平模式。

對(duì)于SynchronousQueue來(lái)說(shuō),他的put方法和take方法都被抽象成統(tǒng)一方法來(lái)進(jìn)行操作,通過(guò)抽象出內(nèi)部類(lèi)Transferer,來(lái)實(shí)現(xiàn)不同的操作。

注意事項(xiàng):本文分析主要是針對(duì)jdk1.8的版本進(jìn)行分析,下面的代碼中的線程執(zhí)行順序可能并不能完全保證順序性,執(zhí)行時(shí)間比較短,所以暫且認(rèn)定有序執(zhí)行。

約定:圖片中以Reference-開(kāi)頭的代表對(duì)象的引用地址,通過(guò)箭頭方式進(jìn)行引用對(duì)象。

Transferer.transfer方法主要介紹如下所示:

abstract static class Transferer {
    /**
     * 執(zhí)行put和take方法.
     *
     * @param e 非空時(shí),表示這個(gè)元素要傳遞給消費(fèi)者(提供者-put);
     *          為空時(shí), 則表示當(dāng)前操作要請(qǐng)求消費(fèi)一個(gè)數(shù)據(jù)(消費(fèi)者-take)。
     *          offered by producer.
     * @param timed 決定是否存在timeout時(shí)間。
     * @param nanos 超時(shí)時(shí)長(zhǎng)。
     * @return 如果返回非空, 代表數(shù)據(jù)已經(jīng)被消費(fèi)或者正常提供; 如果為空,
     *         則表示由于超時(shí)或中斷導(dǎo)致失敗。可通過(guò)Thread.interrupted來(lái)檢查是那種。
     */
    abstract E transfer(E e, boolean timed, long nanos);
}

接下來(lái)看一下SynchronousQueue的字段信息:

/** CPU數(shù)量 */
static final int NCPUS = Runtime.getRuntime().availableProcessors();

/**
 * 自旋次數(shù),如果transfer指定了timeout時(shí)間,則使用maxTimeSpins,如果CPU數(shù)量小于2則自旋次數(shù)為0,否則為32
 * 此值為經(jīng)驗(yàn)值,不隨CPU數(shù)量增加而變化,這里只是個(gè)常量。
 */
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

/**
 * 自旋次數(shù),如果沒(méi)有指定時(shí)間設(shè)置,則使用maxUntimedSpins。如果NCPUS數(shù)量大于等于2則設(shè)定為為32*16,否則為0;
 */
static final int maxUntimedSpins = maxTimedSpins * 16;

/**
 * The number of nanoseconds for which it is faster to spin
 * rather than to use timed park. A rough estimate suffices.
 */
static final long spinForTimeoutThreshold = 1000L;

NCPUS:代表CPU的數(shù)量

maxTimedSpins:自旋次數(shù),如果transfer指定了timeout時(shí)間,則使用maxTimeSpins,如果CPU數(shù)量小于2則自旋次數(shù)為0,否則為32,此值為經(jīng)驗(yàn)值,不隨CPU數(shù)量增加而變化,這里只是個(gè)常量。

maxUntimedSpins:自旋次數(shù),如果沒(méi)有指定時(shí)間設(shè)置,則使用maxUntimedSpins。如果NCPUS數(shù)量大于等于2則設(shè)定為為32*16,否則為0;

spinForTimeoutThreshold:為了防止自定義的時(shí)間限過(guò)長(zhǎng),而設(shè)置的,如果設(shè)置的時(shí)間限長(zhǎng)于這個(gè)值則取這個(gè)spinForTimeoutThreshold 為時(shí)間限。這是為了優(yōu)化而考慮的。這個(gè)的單位為納秒。

公平模式-TransferQueue

TransferQueue內(nèi)部是如何進(jìn)行工作的,這里先大致講解下,隊(duì)列采用了互補(bǔ)模式進(jìn)行等待,QNode中有一個(gè)字段是isData,如果模式相同或空隊(duì)列時(shí)進(jìn)行等待操作,互補(bǔ)的情況下就進(jìn)行消費(fèi)操作。

入隊(duì)操作相同模式

不同模式時(shí)進(jìn)行出隊(duì)列操作:

這時(shí)候來(lái)了一個(gè)isData=false的互補(bǔ)模式,隊(duì)列就會(huì)變成如下?tīng)顟B(tài):

TransferQueue繼承自Transferer抽象類(lèi),并且實(shí)現(xiàn)了transfer方法,它主要包含以下內(nèi)容:

QNode

代表隊(duì)列中的節(jié)點(diǎn)元素,它內(nèi)部包含以下字段信息:

字段信息描述

字段 描述 類(lèi)型
next 下一個(gè)節(jié)點(diǎn) QNode
item 元素信息 Object
waiter 當(dāng)前等待的線程 Thread
isData 是否是數(shù)據(jù) boolean

方法信息描述

方法 描述
casNext 替換當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn)
casItem 替換當(dāng)前節(jié)點(diǎn)的item數(shù)據(jù)
tryCancel 取消當(dāng)前操作,將當(dāng)前item賦值為this(當(dāng)前QNode節(jié)點(diǎn))
isCancelled 如果item是this(當(dāng)前QNode節(jié)點(diǎn))的話就返回true,反之返回false
isOffList 如果已知此節(jié)點(diǎn)離隊(duì)列,判斷next節(jié)點(diǎn)是不是為this,則返回true,因?yàn)橛捎? advanceHead操作而忘記了其下一個(gè)指針。
E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS"ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null; // constructed/reused as needed
      // 分為兩種狀態(tài)1.有數(shù)據(jù)=true 2.無(wú)數(shù)據(jù)=false
    boolean isData = (e != null); 
        // 循環(huán)內(nèi)容
    for (;;) {
          // 尾部節(jié)點(diǎn)。
        QNode t = tail;
          // 頭部節(jié)點(diǎn)。
        QNode h = head;
          // 判斷頭部和尾部如果有一個(gè)為null則自旋轉(zhuǎn)。
        if (t == null || h == null)         // 還未進(jìn)行初始化的值。
            continue;                       // 自旋
                // 頭結(jié)點(diǎn)和尾節(jié)點(diǎn)相同或者尾節(jié)點(diǎn)的模式和當(dāng)前節(jié)點(diǎn)模式相同。
        if (h == t || t.isData == isData) { // 空或同模式。
              // tn為尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)信息。
            QNode tn = t.next;
              // 這里我認(rèn)為是閱讀不一致,原因是當(dāng)前線程還沒(méi)有阻塞的時(shí)候其他線程已經(jīng)修改了尾節(jié)點(diǎn)tail會(huì)導(dǎo)致當(dāng)前線程的tail節(jié)點(diǎn)不一致。
            if (t != tail)                  // inconsistent read
                continue;
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // 這里如果指定timed判斷時(shí)間小于等于0直接返回。
                return null;
              // 判斷新增節(jié)點(diǎn)是否為null,為null直接構(gòu)建新節(jié)點(diǎn)。
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // 如果next節(jié)點(diǎn)不為null說(shuō)明已經(jīng)有其他線程進(jìn)行tail操作
                continue;
                        // 將t節(jié)點(diǎn)替換為s節(jié)點(diǎn)
            advanceTail(t, s);             
              // 等待有消費(fèi)者消費(fèi)線程。
            Object x = awaitFulfill(s, e, timed, nanos);
              // 如果返回的x,指的是s.item,如果s.item指向自己的話清除操作。
            if (x == s) {
                clean(t, s);
                return null;
            }
                        // 如果沒(méi)有取消聯(lián)系
            if (!s.isOffList()) {          
                  // 將當(dāng)前節(jié)點(diǎn)替換頭結(jié)點(diǎn)
                advanceHead(t, s);          // unlink if head
                if (x != null)              // 取消item值,這里是take方法時(shí)會(huì)進(jìn)行item賦值為this
                    s.item = s;
                  // 將等待線程設(shè)置為null
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
              // 獲取頭結(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)
            QNode m = h.next;               // node to fulfill
              // 如果當(dāng)前線程尾節(jié)點(diǎn)和全局尾節(jié)點(diǎn)不一致,重新開(kāi)始
              // 頭結(jié)點(diǎn)的next節(jié)點(diǎn)為空,代表無(wú)下一個(gè)節(jié)點(diǎn),則重新開(kāi)始,
              // 當(dāng)前線程頭結(jié)點(diǎn)和全局頭結(jié)點(diǎn)不相等,則重新開(kāi)始
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
                    if (isData == (x != null) ||    // 如果x=null說(shuō)明已經(jīng)被讀取了。
                        x == m ||                   // x節(jié)點(diǎn)和m節(jié)點(diǎn)相等說(shuō)明被中斷操作,被取消操作了。
                        !m.casItem(x, e)) {         // 這里是將item值設(shè)置為null
                        advanceHead(h, m);          // 移動(dòng)頭結(jié)點(diǎn)到頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
                        continue;
                    }
          
            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

我們來(lái)看一下awaitFulfill方法內(nèi)容:

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
       // 如果指定了timed則為System.nanoTime() + nanos,反之為0。
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
      // 獲取當(dāng)前線程。
    Thread w = Thread.currentThread();
      // 如果頭節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)是當(dāng)前s節(jié)點(diǎn)(以防止其他線程已經(jīng)修改了head節(jié)點(diǎn))
      // 則運(yùn)算(timed ? maxTimedSpins : maxUntimedSpins),否則直接返回。
      // 指定了timed則使用maxTimedSpins,反之使用maxUntimedSpins
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
      // 自旋
    for (;;) {
          // 判斷是否已經(jīng)被中斷。
        if (w.isInterrupted())
              //嘗試取消,將當(dāng)前節(jié)點(diǎn)的item修改為當(dāng)前節(jié)點(diǎn)(this)。
            s.tryCancel(e);
          // 獲取當(dāng)前節(jié)點(diǎn)內(nèi)容。
        Object x = s.item;
          // 判斷當(dāng)前值和節(jié)點(diǎn)值不相同是返回,因?yàn)閺棾鰰r(shí)會(huì)將item值賦值為null。
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

首先先判斷有沒(méi)有被中斷,如果被中斷則取消本次操作,將當(dāng)前節(jié)點(diǎn)的item內(nèi)容賦值為當(dāng)前節(jié)點(diǎn)。

判斷當(dāng)前節(jié)點(diǎn)和節(jié)點(diǎn)值不相同是返回

將當(dāng)前線程賦值給當(dāng)前節(jié)點(diǎn)

自旋,如果指定了timed則使用LockSupport.parkNanos(this, nanos);,如果沒(méi)有指定則使用LockSupport.park(this);

中斷相應(yīng)是在下次才能被執(zhí)行。

通過(guò)上面源碼分析我們這里做出簡(jiǎn)單的示例代碼演示一下put操作和take操作是如何進(jìn)行運(yùn)作的,首先看一下示例代碼,如下所示:

/**
 * SynchronousQueue進(jìn)行put和take操作。
 *
 * @author battleheart
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        SynchronousQueue queue = new SynchronousQueue<>(true);
        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
      
        thread1.start();
        Thread.sleep(2000);
        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();
        Thread.sleep(10000);
      
        Thread thread3 = new Thread(() -> {
            try {
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread3.start();
    }
}

首先上來(lái)之后進(jìn)行的是兩次put操作,然后再take操作,默認(rèn)隊(duì)列上來(lái)會(huì)進(jìn)行初始化,初始化的內(nèi)容如下代碼所示:

TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

初始化后隊(duì)列的狀態(tài)如下圖所示:

當(dāng)線程1執(zhí)行put操作時(shí),來(lái)分析下代碼:

QNode t = tail;
QNode h = head;
if (t == null || h == null)         // saw uninitialized value
    continue;      

首先執(zhí)行局部變量t代表隊(duì)尾指針,h代表隊(duì)頭指針,判斷隊(duì)頭和隊(duì)尾不為空則進(jìn)行下面的操作,接下來(lái)是if…else語(yǔ)句這里是分水嶺,當(dāng)相同模式操作的時(shí)候執(zhí)行if語(yǔ)句,當(dāng)進(jìn)行不同模式操作時(shí)執(zhí)行的是else語(yǔ)句,程序是如何控制這樣的操作的呢?接下來(lái)我們慢慢分析一下:

if (h == t || t.isData == isData) { // 隊(duì)列為空或者模式相同時(shí)進(jìn)行if語(yǔ)句
    QNode tn = t.next;
    if (t != tail)                  // 判斷t是否是隊(duì)尾,不是則重新循環(huán)。
        continue;
    if (tn != null) {               // tn是隊(duì)尾的下個(gè)節(jié)點(diǎn),如果tn有內(nèi)容則將隊(duì)尾更換為tn,并且重新循環(huán)操作。
        advanceTail(t, tn);
        continue;
    }
    if (timed && nanos <= 0)        // 如果指定了timed并且延時(shí)時(shí)間用盡則直接返回空,這里操作主要是offer操作時(shí),因?yàn)殛?duì)列無(wú)存儲(chǔ)空間的當(dāng)offer時(shí)不允許插入。
        return null;
    if (s == null)                                    // 這里是新節(jié)點(diǎn)生成。
        s = new QNode(e, isData);
    if (!t.casNext(null, s))        // 將尾節(jié)點(diǎn)的next節(jié)點(diǎn)修改為當(dāng)前節(jié)點(diǎn)。
        continue;

    advanceTail(t, s);              // 隊(duì)尾移動(dòng)
    Object x = awaitFulfill(s, e, timed, nanos);    //自旋并且設(shè)置線程。
    if (x == s) {                   // wait was cancelled
        clean(t, s);
        return null;
    }

    if (!s.isOffList()) {           // not already unlinked
        advanceHead(t, s);          // unlink if head
        if (x != null)              // and forget fields
            s.item = s;
        s.waiter = null;
    }
    return (x != null) ? (E)x : e;

}

上面代碼是if語(yǔ)句中的內(nèi)容,進(jìn)入到if語(yǔ)句中的判斷是如果頭結(jié)點(diǎn)和尾節(jié)點(diǎn)相等代表隊(duì)列為空,并沒(méi)有元素所有要進(jìn)行插入隊(duì)列的操作,或者是隊(duì)尾的節(jié)點(diǎn)的isData標(biāo)志和當(dāng)前操作的節(jié)點(diǎn)的類(lèi)型一樣時(shí),會(huì)進(jìn)行入隊(duì)操作,isData標(biāo)識(shí)當(dāng)前元素是否是數(shù)據(jù),如果為true代表是數(shù)據(jù),如果為false則代表不是數(shù)據(jù),換句話說(shuō)只有模式相同的時(shí)候才會(huì)往隊(duì)列中存放,如果不是模式相同的時(shí)候則代表互補(bǔ)模式,就不走if語(yǔ)句了,而是走了else語(yǔ)句,上面代碼中做有注釋講解,下面看一下這里:

if (s == null)                                    // 這里是新節(jié)點(diǎn)生成。
    s = new QNode(e, isData);
if (!t.casNext(null, s))        // 將尾節(jié)點(diǎn)的next節(jié)點(diǎn)修改為當(dāng)前節(jié)點(diǎn)。
    continue;    

當(dāng)執(zhí)行上面代碼后,隊(duì)列的情況如下圖所示:(這里視為插入第一個(gè)元素圖,方便下面的引用)

接下來(lái)執(zhí)行這段代碼:

 advanceTail(t, s);              // 隊(duì)尾移動(dòng)


修改了tail節(jié)點(diǎn)后,這時(shí)候就需要進(jìn)行自旋操作,并且設(shè)置QNode的waiter等待線程,并且將線程等待,等到喚醒線程進(jìn)行喚醒操作

 Object x = awaitFulfill(s, e, timed, nanos);    //自旋并且設(shè)置線程。

方法內(nèi)部分析局部?jī)?nèi)容,上面已經(jīng)全部?jī)?nèi)容的分析:

if (spins > 0)
    --spins;
else if (s.waiter == null)
    s.waiter = w;
else if (!timed)
    LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanos);

如果自旋時(shí)間spins還有則進(jìn)行循環(huán)遞減操作,接下來(lái)判斷如果當(dāng)前節(jié)點(diǎn)的waiter是空則價(jià)格當(dāng)前線程賦值給waiter,上圖中顯然是為空的所以會(huì)把當(dāng)前線程進(jìn)行賦值給我waiter,接下來(lái)就是等待操作了。

上面線程則處于等待狀態(tài),接下來(lái)是線程二進(jìn)行操作,這里不進(jìn)行重復(fù)進(jìn)行,插入第二個(gè)元素隊(duì)列的狀況,此時(shí)線程二也處于等待狀態(tài)。

上面的主要是put了兩次操作后隊(duì)列的情況,接下來(lái)分析一下take操作時(shí)又是如何進(jìn)行操作的,當(dāng)take操作時(shí),isData為false,而隊(duì)尾的isData為true兩個(gè)不相等,所以不會(huì)進(jìn)入到if語(yǔ)句,而是進(jìn)入到了else語(yǔ)句

} else {                            // 互補(bǔ)模式
    QNode m = h.next;               // 獲取頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),進(jìn)行互補(bǔ)操作。
    if (t != tail || m == null || h != head)
        continue;                   // 這里就是為了防止閱讀不一致的問(wèn)題

    Object x = m.item;
    if (isData == (x != null) ||    // 如果x=null說(shuō)明已經(jīng)被讀取了。
        x == m ||                   // x節(jié)點(diǎn)和m節(jié)點(diǎn)相等說(shuō)明被中斷操作,被取消操作了。
        !m.casItem(x, e)) {         // 這里是將item值設(shè)置為null
        advanceHead(h, m);          // 移動(dòng)頭結(jié)點(diǎn)到頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
        continue;
    }

    advanceHead(h, m);              // successfully fulfilled
    LockSupport.unpark(m.waiter);
    return (x != null) ? (E)x : e;
}

首先獲取頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)用于互補(bǔ)操作,也就是take操作,接下來(lái)進(jìn)行閱讀不一致的判斷,防止其他線程進(jìn)行了閱讀操作,接下來(lái)獲取需要彈出內(nèi)容x=1,首先進(jìn)行判斷節(jié)點(diǎn)內(nèi)容是不是已經(jīng)被消費(fèi)了,節(jié)點(diǎn)內(nèi)容為null時(shí)則代表被消費(fèi)了,接下來(lái)判斷節(jié)點(diǎn)的item值是不是和本身相等如果相等話說(shuō)明節(jié)點(diǎn)被取消了或者被中斷了,然后移動(dòng)頭結(jié)點(diǎn)到下一個(gè)節(jié)點(diǎn)上,然后將refenrence-715的item值修改為null,至于為什么修改為null這里留下一個(gè)懸念,這里還是比較重要的,大家看到這里的時(shí)候需要注意下,顯然這些都不會(huì)成立,所以if語(yǔ)句中內(nèi)容不會(huì)被執(zhí)行,接下來(lái)的隊(duì)列的狀態(tài)是是這個(gè)樣子的:

OK,接下來(lái)就開(kāi)始移動(dòng)隊(duì)頭head了,將head移動(dòng)到m節(jié)點(diǎn)上,執(zhí)行代碼如下所示:

advanceHead(h, m);          

此時(shí)隊(duì)列的狀態(tài)是這個(gè)樣子的:

LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;

接下來(lái)將執(zhí)行喚醒被等待的線程,也就是thread-0,然后返回獲取item值1,take方法結(jié)束,但是這里并沒(méi)有結(jié)束,因?yàn)閱拘蚜藀ut的線程,此時(shí)會(huì)切換到put方法中,這時(shí)候線程喚醒后會(huì)執(zhí)行awaitFulfill方法,此時(shí)循環(huán)時(shí),有與item值修改為null則直接返回內(nèi)容。

Object x = s.item;
if (x != e)
    return x;

這里的代碼我們可以對(duì)照插入第一個(gè)元素圖,s節(jié)點(diǎn)也就是當(dāng)前m節(jié)點(diǎn),獲取值得時(shí)候已經(jīng)修改為null,但是當(dāng)時(shí)插入的值時(shí)1,所以兩個(gè)不想等了,則直接返回null值。

Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) {                   // wait was cancelled
    clean(t, s);
    return null;
}

if (!s.isOffList()) {           // not already unlinked
    advanceHead(t, s);          // unlink if head
    if (x != null)              // and forget fields
        s.item = s;
    s.waiter = null;
}
return (x != null) ? (E)x : e;

又返回到了transfer方法的if語(yǔ)句中,此時(shí)x和s并不相等所以不用進(jìn)行clean操作,首先判斷s節(jié)點(diǎn)是否已經(jīng)離隊(duì)了,顯然并沒(méi)有進(jìn)行離隊(duì)操作,advanceHead(t, s); 操作不會(huì)被執(zhí)行因?yàn)樯厦嬉呀鼘㈩^節(jié)點(diǎn)修改了,但是第一次插入的時(shí)候頭結(jié)點(diǎn)還是reference-716,此時(shí)已經(jīng)是reference-715,而t節(jié)點(diǎn)的引用地址是reference-716,所以不會(huì)操作,接下來(lái)就是將waiter設(shè)置為null,也就是忘記掉等待的線程。

分析了正常的take和put操作,接下來(lái)分析下中斷操作,由于中斷相應(yīng)后,會(huì)被執(zhí)行if(w.isInterrupted())這段代碼,它會(huì)執(zhí)行s.tryCancel(e)方法,這個(gè)方法的作用的是將QNode節(jié)點(diǎn)的item節(jié)點(diǎn)賦值為當(dāng)前QNode,這時(shí)候x和e值就不相等了( if (x != e)),x的值是s.item,則為當(dāng)前QNode,而e的值是用戶指定的值,這時(shí)候返回x(s.item)。返回到函數(shù)調(diào)用地方transfer中,這時(shí)候要執(zhí)行下面語(yǔ)句:

if (x == s) {
    clean(t, s);
    return null;
}

進(jìn)入到clean方法執(zhí)行清理當(dāng)前節(jié)點(diǎn),下面是方法clean代碼:

/**
 * Gets rid of cancelled node s with original predecessor pred.
 */
void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread
    /*
     * At any given time, exactly one node on list cannot be
     * deleted -- the last inserted node. To accommodate this,
     * if we cannot delete s, we save its predecessor as
     * "cleanMe", deleting the previously saved version
     * first. At least one of node s or the node previously
     * saved can always be deleted, so this always terminates.
     */
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        if (t == h)
            return;
        QNode tn = t.next;
          // 判斷現(xiàn)在的t是不是末尾節(jié)點(diǎn),可能其他線程插入了內(nèi)容導(dǎo)致不是最后的節(jié)點(diǎn)。
        if (t != tail)
            continue;
          // 如果不是最后節(jié)點(diǎn)的話將其現(xiàn)在t.next節(jié)點(diǎn)作為tail尾節(jié)點(diǎn)。
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
          // 如果當(dāng)前節(jié)點(diǎn)不是尾節(jié)點(diǎn)進(jìn)入到這里面。
        if (s != t) {        // If not tail, try to unsplice
              // 獲取當(dāng)前節(jié)點(diǎn)(被取消的節(jié)點(diǎn))的下一個(gè)節(jié)點(diǎn)。
            QNode sn = s.next;
              // 修改上一個(gè)節(jié)點(diǎn)的next(下一個(gè))元素為下下個(gè)節(jié)點(diǎn)。
            if (sn == s || pred.casNext(s, sn))
                  //返回。
                return;
        }
        QNode dp = cleanMe;
        if (dp != null) {    // 嘗試清除上一個(gè)標(biāo)記為清除的節(jié)點(diǎn)。
            QNode d = dp.next;    //1.獲取要被清除的節(jié)點(diǎn)
            QNode dn;
            if (d == null ||               // 被清除節(jié)點(diǎn)不為空
                d == dp ||                 // 被清除節(jié)點(diǎn)已經(jīng)離隊(duì)
                !d.isCancelled() ||        // 被清除節(jié)點(diǎn)是標(biāo)記為Cancel狀態(tài)的。
                (d != t &&                 // 被清除節(jié)點(diǎn)不是尾節(jié)點(diǎn)
                 (dn = d.next) != null &&  // 被清除節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)不為null
                 dn != d &&                //   that is on list
                 dp.casNext(d, dn)))       // 將被清除的節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)修改為被清除節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。
                casCleanMe(dp, null);             // 清空cleanMe節(jié)點(diǎn)。
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred)) // 這里將上一個(gè)節(jié)點(diǎn)標(biāo)記為被清除操作,但是其實(shí)要操作的是下一個(gè)節(jié)點(diǎn)。
            return;          // Postpone cleaning s
    }
}

如果節(jié)點(diǎn)中取消的頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),只需要移動(dòng)當(dāng)前head節(jié)點(diǎn)到下一個(gè)節(jié)點(diǎn)即可。

如果取消的是中間的節(jié)點(diǎn),則將當(dāng)前節(jié)點(diǎn)next節(jié)點(diǎn)修改為下下個(gè)節(jié)點(diǎn)。

如果修改為末尾的節(jié)點(diǎn),則將當(dāng)前節(jié)點(diǎn)放入到QNode的clearMe中,等待有內(nèi)容進(jìn)來(lái)之后下一次進(jìn)行清除操作。

實(shí)例一:清除頭結(jié)點(diǎn)下一個(gè)節(jié)點(diǎn),下面是實(shí)例代碼進(jìn)行講解:

/**
 * 清除頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)實(shí)例代碼。
 *
 * @author battleheart
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        SynchronousQueue queue = new SynchronousQueue<>(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);

        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread1.start();
                Thread.sleep(200);
          
        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();
          Thread.sleep(2000);
        thread1.interrupt();

    }
}

上面例子說(shuō)明我們啟動(dòng)了兩個(gè)線程,分別向SynchronousQueue隊(duì)列中添加了元素1和元素2,添加成功之后的,讓主線程休眠一會(huì),然后將第一個(gè)線程進(jìn)行中斷操作,添加兩個(gè)元素后節(jié)點(diǎn)所處在的狀態(tài)為下圖所示:

當(dāng)我們調(diào)用thread1.interrupt時(shí),此時(shí)線程1等待的消費(fèi)操作將被終止,會(huì)相應(yīng)上面awaitFulfill方法,該方法會(huì)運(yùn)行下面代碼:

if (w.isInterrupted())
    //嘗試取消,將當(dāng)前節(jié)點(diǎn)的item修改為當(dāng)前節(jié)點(diǎn)(this)。
    s.tryCancel(e);
// 獲取當(dāng)前節(jié)點(diǎn)內(nèi)容。
Object x = s.item;
// 判斷當(dāng)前值和節(jié)點(diǎn)值不相同是返回,因?yàn)閺棾鰰r(shí)會(huì)將item值賦值為null。
if (x != e)
    return x;

首先上來(lái)現(xiàn)將s節(jié)點(diǎn)(上圖中的Reference-715引用對(duì)象)的item節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)引用(Reference-715引用對(duì)象),所以s節(jié)點(diǎn)和e=1不相等則直接返回,此時(shí)節(jié)點(diǎn)的狀態(tài)變化如下所示:

退出awaitFulfill并且返回的是s節(jié)點(diǎn)內(nèi)容(實(shí)際上返回的就是s節(jié)點(diǎn)),接下來(lái)返回到調(diào)用awaitFulfill的方法transfer方法中

Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) {                   // 是否是被取消了
    clean(t, s);
    return null;
}

首先判斷的事x節(jié)點(diǎn)和s節(jié)點(diǎn)是否相等,上面我們也說(shuō)了明顯是相等的所以這里會(huì)進(jìn)入到clean方法中,clean(QNode pred, QNode s)clean方法一個(gè)是前節(jié)點(diǎn),一個(gè)是當(dāng)前被取消的節(jié)點(diǎn),也就是當(dāng)前s節(jié)點(diǎn)的前節(jié)點(diǎn)是head節(jié)點(diǎn),接下來(lái)我們一步一步的分析代碼:

s.waiter = null; // 刪除等待的線程。

進(jìn)入到方法體之后首先先進(jìn)行的是將當(dāng)前節(jié)點(diǎn)的等待線程刪除,如下圖所示:

接下來(lái)進(jìn)入while循環(huán),循環(huán)內(nèi)容時(shí)pred.next == s如果不是則表示已經(jīng)移除了節(jié)點(diǎn),反之還在隊(duì)列中,則進(jìn)行下面的操作:

QNode h = head;
QNode hn = h.next;   // 如果取消的是第一個(gè)節(jié)點(diǎn)則進(jìn)入下面語(yǔ)句
if (hn != null && hn.isCancelled()) {
    advanceHead(h, hn);
    continue;
}

可以看到首先h節(jié)點(diǎn)為head節(jié)點(diǎn),hn為頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),在進(jìn)行判斷頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)不為空并且頭結(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)是被中斷的節(jié)點(diǎn)(取消的節(jié)點(diǎn)),則進(jìn)入到if語(yǔ)句中,if語(yǔ)句其實(shí)也很簡(jiǎn)單就是將頭結(jié)點(diǎn)修改為頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)(s節(jié)點(diǎn),別取消節(jié)點(diǎn),并且將前節(jié)點(diǎn)的next節(jié)點(diǎn)修改為自己,也就是移除了之前的節(jié)點(diǎn),我們看下advanceHead方法:

void advanceHead(QNode h, QNode nh) {
    if (h == head &&
        UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // forget old next
}

首先上來(lái)先進(jìn)行CAS移動(dòng)頭結(jié)點(diǎn),再講原來(lái)頭結(jié)點(diǎn)h的next節(jié)點(diǎn)修改為自己(h),為什么這樣做呢?因?yàn)樯厦孢M(jìn)行advanceHead之后并沒(méi)有退出循環(huán),是進(jìn)行continue操作,也就是它并沒(méi)有跳出while循環(huán),他還會(huì)循環(huán)一次prev.next此時(shí)已經(jīng)不能等于s所以退出循環(huán),如下圖所示:

實(shí)例二:清除中間的節(jié)點(diǎn)

/**
 * SynchronousQueue實(shí)例二,清除中間的節(jié)點(diǎn)。
 *
 * @author battleheart
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        SynchronousQueue queue = new SynchronousQueue<>(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);

        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread1.start();
          //休眠一會(huì)。
                Thread.sleep(200);
        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();
          //休眠一會(huì)。
                Thread.sleep(200);
        Thread thread3 = new Thread(() -> {
            try {
                queue.put(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread3.start();
        //休眠一會(huì)。
        Thread.sleep(10000);
        thread2.interrupt();


    }
}

看上面例子,首先先進(jìn)行put操作三次,也就是入隊(duì)3條數(shù)據(jù),分別是整型值1,整型值2,整型值3,然后將當(dāng)前線程休眠一下,對(duì)中間線程進(jìn)行中斷操作,通過(guò)讓主線程休眠一會(huì)保證線程執(zhí)行順序性(當(dāng)然上面線程不一定能保證執(zhí)行順序,因?yàn)閜ut操作一下子就執(zhí)行完了所以這點(diǎn)時(shí)間是可以的),此時(shí)隊(duì)列所處的狀態(tài)來(lái)看一下下圖:

當(dāng)休眠一會(huì)之后,進(jìn)入到threa2進(jìn)行中斷操作,目前上圖中表示Reference-723被中斷操作,此時(shí)也會(huì)進(jìn)入到awaitFulfill方法中,將Reference-723的item節(jié)點(diǎn)修改為當(dāng)前節(jié)點(diǎn),如下圖所示:

進(jìn)入到clear方法中此時(shí)的prev節(jié)點(diǎn)為Reference-715,s節(jié)點(diǎn)是被清除節(jié)點(diǎn),還是首先進(jìn)入clear方法中先將waiter設(shè)置為null,取消當(dāng)前線程內(nèi)容,如下圖所示:

接下來(lái)進(jìn)入到循環(huán)中,進(jìn)行下面處理

QNode h = head;
QNode hn = h.next;   // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
    advanceHead(h, hn);
    continue;
}
QNode t = tail;      // Ensure consistent read for tail
if (t == h)
    return;
QNode tn = t.next;
if (t != tail)
    continue;
if (tn != null) {
    advanceTail(t, tn);
    continue;
}
if (s != t) {        // If not tail, try to unsplice
    QNode sn = s.next;
    if (sn == s || pred.casNext(s, sn))
        return;
}

第一個(gè)if語(yǔ)句已經(jīng)分析過(guò)了所以說(shuō)這里不會(huì)進(jìn)入到里面去,接下來(lái)是進(jìn)行尾節(jié)點(diǎn)t是否是等于head節(jié)點(diǎn)如果相等則代表沒(méi)有元素,在判斷當(dāng)前方法的t尾節(jié)點(diǎn)是不是真正的尾節(jié)點(diǎn)tail如果不是則進(jìn)行修改尾節(jié)點(diǎn),先來(lái)看一下現(xiàn)在的狀態(tài):

tn != null判斷如果tn不是尾節(jié)點(diǎn),則將tn作為尾節(jié)點(diǎn)處理,如果處理之后還不是尾節(jié)點(diǎn)還會(huì)進(jìn)行處理直到tail是尾節(jié)點(diǎn)未知,我們現(xiàn)在這個(gè)是尾節(jié)點(diǎn)所以跳過(guò)這段代碼。s != t通過(guò)上圖可以看到s節(jié)點(diǎn)是被清除節(jié)點(diǎn),并不是尾節(jié)點(diǎn)所以進(jìn)入到循環(huán)中:

if (s != t) {        // If not tail, try to unsplice
    QNode sn = s.next;
    if (sn == s || pred.casNext(s, sn))
        return;
}

首先獲取的s節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),上圖中表示Reference-725節(jié)點(diǎn),判斷sn是都等于當(dāng)前節(jié)點(diǎn)顯然這一條不成立,pred節(jié)點(diǎn)為Reference-715節(jié)點(diǎn),將715節(jié)點(diǎn)的next節(jié)點(diǎn)變成Reference-725節(jié)點(diǎn),這里就將原來(lái)的節(jié)點(diǎn)清理出去了,現(xiàn)在的狀態(tài)如下所示:

實(shí)例三:刪除的節(jié)點(diǎn)是尾節(jié)點(diǎn)

/**
 * SynchronousQueue實(shí)例三,刪除的節(jié)點(diǎn)為尾節(jié)點(diǎn)
 *
 * @author battleheart
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        SynchronousQueue queue = new SynchronousQueue<>(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);

        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread1.start();

        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();

        Thread.sleep(10000);
        thread2.interrupt();

        Thread.sleep(10000);

        Thread thread3 = new Thread(() -> {
            try {
                queue.put(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread3.start();

        Thread.sleep(10000);
        thread3.interrupt();
    }
}

該例子主要說(shuō)明一個(gè)問(wèn)題就是刪除的節(jié)點(diǎn)如果是末尾節(jié)點(diǎn)的話,clear方法又是如何處理的,首先啟動(dòng)了三個(gè)線程其中主線程休眠了一會(huì),為了能讓插入的順序保持線程1,線程2,線程3這樣子,啟動(dòng)第二個(gè)線程后,又將第二個(gè)線程中斷,這是第二個(gè)線程插入的節(jié)點(diǎn)為尾節(jié)點(diǎn),然后再啟動(dòng)第三個(gè)節(jié)點(diǎn)插入值,再中斷了第三個(gè)節(jié)點(diǎn)末尾節(jié)點(diǎn),說(shuō)一下為啥這樣操作,因?yàn)楫?dāng)清除尾節(jié)點(diǎn)時(shí),并不是直接移除當(dāng)前節(jié)點(diǎn),而是將被清除的節(jié)點(diǎn)的前節(jié)點(diǎn)設(shè)置到QNode的CleanMe中,等待下次clear方法時(shí)進(jìn)行清除上次保存在CleanMe的節(jié)點(diǎn),然后再處理當(dāng)前被中斷節(jié)點(diǎn),將新的被清理的節(jié)點(diǎn)prev設(shè)置為cleanMe當(dāng)中,等待下次進(jìn)行處理,接下來(lái)一步一步分析,首先我們先來(lái)看一下第二個(gè)線程啟動(dòng)后節(jié)點(diǎn)的狀態(tài)。

此時(shí)運(yùn)行thread2.interrupt();將第二個(gè)線程中斷,這時(shí)候會(huì)進(jìn)入到clear方法中,前面的代碼都不會(huì)被返回,會(huì)執(zhí)行下面的語(yǔ)句:

QNode dp = cleanMe;
if (dp != null) {    // Try unlinking previous cancelled node
    QNode d = dp.next;
    QNode dn;
    if (d == null ||               // d is gone or
        d == dp ||                 // d is off list or
        !d.isCancelled() ||        // d not cancelled or
        (d != t &&                 // d not tail and
         (dn = d.next) != null &&  //   has successor
         dn != d &&                //   that is on list
         dp.casNext(d, dn)))       // d unspliced
        casCleanMe(dp, null);
    if (dp == pred)
        return;      // s is already saved node
} else if (casCleanMe(null, pred))
    return;   

首先獲得TransferQueue當(dāng)中cleanMe節(jié)點(diǎn),此時(shí)獲取的為null,當(dāng)判斷dp!=null時(shí)就會(huì)被跳過(guò),直接執(zhí)行

casCleanMe(null, pred)此時(shí)pred傳入的值時(shí)t節(jié)點(diǎn)指向的內(nèi)容,也就是當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn),它會(huì)被標(biāo)記為清除操作節(jié)點(diǎn)(其實(shí)并不清楚它而是清除它下一個(gè)節(jié)點(diǎn),也就是說(shuō)item=this的節(jié)點(diǎn)),此時(shí)看一下節(jié)點(diǎn)狀態(tài)為下圖所示:

接下來(lái)第三個(gè)線程啟動(dòng)了這時(shí)候又往隊(duì)列中添加了元素3,此時(shí)隊(duì)列的狀況如下圖所示:

此時(shí)thread3也被中斷操作了,這時(shí)候還是運(yùn)行上面的代碼,但是這次不同的點(diǎn)在于cleanMe已經(jīng)不是空值,是有內(nèi)容的,首先獲取的是cleanMe的下一個(gè)節(jié)點(diǎn)(d),然我來(lái)把變量標(biāo)記在圖上然后看起來(lái)好分析一些,如下圖所示:

dp表示d節(jié)點(diǎn)的前一個(gè)pred節(jié)點(diǎn),dn表示d節(jié)點(diǎn)的next節(jié)點(diǎn),主要邏輯在這里:

if (d == null ||               // d is gone or
    d == dp ||                 // d is off list or
    !d.isCancelled() ||        // d not cancelled or
    (d != t &&                 // d not tail and
     (dn = d.next) != null &&  //   has successor
     dn != d &&                //   that is on list
     dp.casNext(d, dn)))       // d unspliced
    casCleanMe(dp, null);
if (dp == pred)
    return;      // s

首先判斷d節(jié)點(diǎn)是不是為null,如果d節(jié)點(diǎn)為null代表已經(jīng)清除掉了,如果cleanMe節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)和自己相等,說(shuō)明需要清除的節(jié)點(diǎn)已經(jīng)離隊(duì)了,判斷下個(gè)節(jié)點(diǎn)是不是需要被清除的節(jié)點(diǎn),目前看d節(jié)點(diǎn)是被清除的節(jié)點(diǎn),然后就將被清除的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)賦值給dn并且判斷d節(jié)點(diǎn)是不是末尾節(jié)點(diǎn),如果不是末尾節(jié)點(diǎn)則進(jìn)行dp.casNext方法,這個(gè)地方是關(guān)鍵點(diǎn),它將被清除節(jié)點(diǎn)d的前節(jié)點(diǎn)的next節(jié)點(diǎn)修改為被清除節(jié)點(diǎn)d的后面節(jié)點(diǎn)dn,然后調(diào)用caseCleanMe將TransferQueue中的cleanMe節(jié)點(diǎn)清空,此時(shí)節(jié)點(diǎn)的內(nèi)容如下所示:

可以看出將上一次標(biāo)記為清除的節(jié)點(diǎn)清除了隊(duì)列中,清除完了就完事兒?那這次的怎么弄呢?因?yàn)楝F(xiàn)在運(yùn)行的是thread3的中斷程序,所以上面并沒(méi)有退出,而是再次進(jìn)入循環(huán),循環(huán)之后發(fā)現(xiàn)dp為null則會(huì)運(yùn)行casCleanMe(null, pred),此時(shí)當(dāng)前節(jié)點(diǎn)s的前一個(gè)節(jié)點(diǎn)已經(jīng)被清除隊(duì)列,但是并不影響后續(xù)的清除操作,因?yàn)榍肮?jié)點(diǎn)的next節(jié)點(diǎn)還在維護(hù)中,也是前節(jié)點(diǎn)的next指向還是reference-725,如下圖所示:

就此分析完畢如果有不正確的地方請(qǐng)指正。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/74510.html

相關(guān)文章

  • SynchronousQueue原理詳解-非公平模式

    摘要:開(kāi)篇說(shuō)明本文分析采用的是約定下面內(nèi)容中代表的是引用地址,引用對(duì)應(yīng)的節(jié)點(diǎn)前面已經(jīng)講解了公平模式的內(nèi)容,今天來(lái)講解下關(guān)于非公平模式下的是如何進(jìn)行工作的,在源碼分析的時(shí)候,先來(lái)簡(jiǎn)單看一下非公平模式的簡(jiǎn)單原理,它采用的棧這種先進(jìn)后出的方式進(jìn)行非公 開(kāi)篇 說(shuō)明:本文分析采用的是jdk1.8約定:下面內(nèi)容中Ref-xxx代表的是引用地址,引用對(duì)應(yīng)的節(jié)點(diǎn) 前面已經(jīng)講解了公平模式的內(nèi)容,今天來(lái)講解下...

    cloud 評(píng)論0 收藏0
  • 圖解AQS原理之ReentrantLock詳解-公平

    摘要:概述前面已經(jīng)講解了關(guān)于的非公平鎖模式,關(guān)于非公平鎖,內(nèi)部其實(shí)告訴我們誰(shuí)先爭(zhēng)搶到鎖誰(shuí)就先獲得資源,下面就來(lái)分析一下公平鎖內(nèi)部是如何實(shí)現(xiàn)公平的如果沒(méi)有看過(guò)非公平鎖的先去了解下非公平鎖,因?yàn)檫@篇文章前面不會(huì)講太多內(nèi)部結(jié)構(gòu),直接會(huì)對(duì)源碼進(jìn)行分析前文 概述 前面已經(jīng)講解了關(guān)于AQS的非公平鎖模式,關(guān)于NonfairSync非公平鎖,內(nèi)部其實(shí)告訴我們誰(shuí)先爭(zhēng)搶到鎖誰(shuí)就先獲得資源,下面就來(lái)分析一下公平...

    Taonce 評(píng)論0 收藏0
  • Java多線程進(jìn)階(三五)—— J.U.C之collections框架:SynchronousQue

    摘要:三總結(jié)主要用于線程之間的數(shù)據(jù)交換,由于采用無(wú)鎖算法,其性能一般比單純的其它阻塞隊(duì)列要高。它的最大特點(diǎn)時(shí)不存儲(chǔ)實(shí)際元素,而是在內(nèi)部通過(guò)棧或隊(duì)列結(jié)構(gòu)保存阻塞線程。 showImg(https://segmentfault.com/img/bVbgOsh?w=900&h=900); 本文首發(fā)于一世流云專(zhuān)欄:https://segmentfault.com/blog... 一、Synchro...

    missonce 評(píng)論0 收藏0
  • 圖解AQS原理之ReentrantLock詳解-非公平

    摘要:內(nèi)部提供了兩種的實(shí)現(xiàn),一種公平模式,一種是非公平模式,如果沒(méi)有特別指定在構(gòu)造器中,默認(rèn)是非公平的模式,我們可以看一下無(wú)參的構(gòu)造函數(shù)。 概述 并發(fā)編程中,ReentrantLock的使用是比較多的,包括之前講的LinkedBlockingQueue和ArrayBlockQueue的內(nèi)部都是使用的ReentrantLock,談到它又不能的不說(shuō)AQS,AQS的全稱(chēng)是AbstractQueue...

    Clect 評(píng)論0 收藏0
  • BlockingQueue學(xué)習(xí)

    摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。同時(shí),也用于自帶線程池的緩沖隊(duì)列中,了解也有助于理解線程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。通過(guò)這些高效并且線程安全的隊(duì)列類(lèi),為我們快速搭建高質(zhì)量的多線程程序帶來(lái)極大的便利。同時(shí),BlockingQueue也用于...

    xuhong 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<