摘要:注意這里指的不是當次而是之后,所以如果我們使用隊列的方法返回,就知道隊列是否為空,但是不知道之后是否為空,并且,當關注的操作發生時,在插入或取出操作的返回值里告知此信息,來指導是否繼續注冊寫操作。
前言
本文寫給對ConcurrentLinkedQueue的實現和非阻塞同步算法的實現原理有一定了解,但缺少實踐經驗的朋友,文中包括了實戰中的嘗試、所走的彎路,經驗和教訓。
背景介紹上個月,我被安排獨自負責一個聊天系統的服務端,因為一些原因,我沒使用現成的開源框架,網絡那塊直接使用AIO,收數據時,因為只會從channel里過來,所以不需要考慮同步問題;但是發送數據時,因為有聊天消息的轉發,所以必需處理這個同步問題。AIO中,是處理完一個注冊的操作后,再執行我們定義的方法,此時,如果還有數據需要寫,則繼續注冊寫操作,如果沒有則不注冊;提交數據時,如果當前沒有注冊寫操作,則注冊一個,否則僅提交(此時再注冊則會報異常)。這樣,需要同步的點就是:如何知道當前還有沒有數據需要發送(因為其它線程也可以提交數據到此處),和如何知道此次提交時,有沒有注冊寫操作。總之,要完成:有數據要發送時,必需有寫操作被注冊,并且只能注冊一次;沒有數據時,不能有寫操作被注冊。
問題分析經過分析,上面的問題,可以抽象成:我需要知道當往隊列里插入一條數據之前,該隊列是否為空,如果為空則應該注冊新的寫操作。當從隊列里取出一條數據后,該隊列是否為非空,如果非空則應該繼續注冊寫操作。(本文之后以“關注的操作”來表示這種場景下的插入或取出操作)
目前的問題是,我使用的隊列是ConcurrentLinkedQueue,但是它的取出數據的方法,沒有返回值告訴我們從隊列里取出數據之后隊列是否為空,如果是使用size或peek方法來執行判斷,那就必需加鎖了,否則在拿到隊列大小時,可能隊列大小已經變化了。所以我首先想到的是,如何對該隊列進行改造,讓它提供該信息。
注意:這里指的不是當次而是之后,所以如果我們使用隊列的peek()方法返回null,就知道隊列是否為空,但是不知道之后是否為空 ,并且,當關注的操作發生時,在插入或取出操作的返回值里告知此信息,來指導是否繼續注冊寫操作。
用鎖實現如果使用鎖的話很容易處理,用鎖同步插入和取出方法,下面是鎖實現的參考:
public E poll() { synchronized (this) { E re = q.poll(); // 獲取元素后,隊列是空,表示是我關注的操作 if (q.peek() == null) { } return re; } } public void offer(E e) { synchronized (this) { // 插入元素前,隊列是空,表示是我關注的操作 if (q.peek() == null) { } q.offer(e); } }
但因為是服務端,我想用非阻塞同步算法來實現。
嘗試方案一我第一次想到的改造辦法是,將head占位節點改成固定的,頭節點移動時,只將head節點的next指向新的節點,在插入數據時,如果是在head節點上成功執行的該操作,那么該插入就是關注的的操作;在取出時,如果將head節點的next置為了null,那么該取出就是關注的操作(?因為之前的占位節點是變化的,所以沒法比較,必需用同步,現在是固定的了,所以可以直接與head節點進行比較?)。如此一來,問題好像被解決了。改造完之后,出于嚴謹,我仔細讀了一遍代碼,發現引入了新的問題,我的取出操作是這樣寫的
/** * @author trytocatch@163.com */ public E poll(){ for(;;){ Node n = head.nextRef.get();//head指向固定的head節點,為final if(n == null) return null; Node m = n.nextRef.get(); if(head.next.compareAndSet(n,m){ if(m==null) ;//此時為關注的操作(為了簡化代碼顯示,不將該信息當作返回值返回了,僅注釋) return n.itemRef.get(); } } }
這里有一個致命的問題:如果m為null,在CAS期間,插入了新節點,n的next由null變成了非null,緊接著又把head的next更新為了null,那么鏈就斷了,該方法還存在一些其它的問題,如當隊列為空的時候,尾節點指向了錯誤的位置,本應該是head的。我認為最根本的原因在于,head不能設為固定的,否則會引發一堆問題。第一次嘗試宣告失敗。
嘗試方案二這次我嘗試將head跟tail兩個引用包裝成一個對象,然后對這個對象進行CAS更新(這僅為一處理論上的嘗試,因為從性能上面來講,已經大打折扣了,創建了很多額外的對象),如果head跟tail指向了同一個節點,則認為隊列是空的,根據此信息來判斷一個操作是不是關注的操作。但該嘗試僅停留在了理論分析階段,期間發現了一些小問題,沒法解決,后來我發現,我把ConcurrentLinkedQueue原本可以分成兩步執行的插入和取出操作(更新節點的next或item引用,然后再更新head或tail引用),變成了必需一步完成,ConcurrentLinkedQueue尚不能一步完成,我何德何能,可將它們一步完成?所以直接放棄了。
解決方案一經過兩次的失敗嘗試,我幾乎絕望了,我懷疑這是不是不能判斷出是否為關注的操作。
因為是在做項目,周末已經過去了,不能再搞這些“研究”了,所以我退而求其次,想了個不太漂亮的辦法,在隊列外部維護一個變量,記錄隊列有多大,在插入或取出后,更新該變量,使用的是AtomicInteger,如果更新時,將變量從1變成0,或從0變成了1,就認為該插入或取出為關注的操作。
private AtomicInteger size = new AtomicInteger(0); public E poll() { E re = q.poll(); if (re == null) return null; for(int old;;){ old = size.get(); if(size.compareAndSet(old,old-1)){ // 獲取元素后,隊列是空,表示是我關注的操作 if(old == 1){ } break; } } return re; } public void offer(E e) { q.offer(e); for(int old;;){ old = size.get(); if(size.compareAndSet(old,old+1)){ // 插入元素前,隊列是空,表示是我關注的操作 if(old == 0){ } break; } } }
此時,也許細心的朋友會問,因為沒有使用鎖,這個變量并不能真實反映隊列的大小,也就不能確定它是不是關注的操作。沒錯,是不能真實反映,但是,我獲取關注的操作的目的是用來指導我:該不該注冊新的寫操作,該變量的值變化就能提供正確的指導,所以,同樣是正確的,只不過途徑不同而已。理論上的分析和后來的項目正確運行都印證了該方法的正確性。
解決方案二因為上面的方法額外加了一次lock-free級別的CAS操作,我心里總不太舒服,空余時間總在琢磨,真的就沒有辦法,在不增加額外lock-free級別CAS開支的情況下,知曉一個操作是不是關注的操作?
后來經分析,如果要知曉是不是關注的操作,跟兩個數據有關,真實的頭節點跟尾節點(不同于head跟tail,因為它們是滯后的,之前將它們包裝成一個對象就是犯了該錯誤),ConcurrentLinkedQueue的實現中,這兩個節點是沒有競爭的,一個是更新item,一個是更新next,必需得讓他們競爭同一個東西,才能解決該問題,于是我想到了一個辦法,取出完成后,如果該節點的next為null,就將其用CAS置為一個特殊的值,若成功則認為是關注的操作;插入成功后,如果next被替換掉的值不是null而是這個特殊值,那么該插入也為關注的操作。這僅增加了一次wait-free級別的CAS操作(取出后的那次CAS),perfect!
因為ConcurrentLinkedQueue的很多變量、內部類都是私有的,沒法通過繼承來改造,沒辦法,只得自行實現。對于隊列里使用的Node,實現的方式有很多種,可以使用AtomicReference、AtomicReferenceFieldUpdater來實現,如果你愿意的話,甚至是像ConcurrentLinkedQueue一樣,用sun.misc.Unsafe來實現(注意:一般來說,sun包下的類是不推薦使用的),各有優缺點吧,所以我就不提供該隊列的具體實現了,下面給出在ConcurrentLinkedQueue(版本:1.7.0_10)基礎上進行的改造,供參考。注意,如果需要用到size等方法,因為特殊值的引入,影響了之前的判斷邏輯,應重新編寫。
/** * @author trytocatch@163.com */ private static final Node MARK_NODE = new Node(null); public boolean offer(E e) { checkNotNull(e); final Node newNode = new Node(e); for (Node t = tail, p = t;;) { Node q = p.next; if (q == null || q == MARK_NODE) {//修改1:加入條件:或q == MARK_NODE // p is last node if (p.casNext(q, newNode)) {//修改2:將null改為q // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (q == MARK_NODE)//修改3: ;//此時為關注的操作(為了簡化代碼顯示,僅注釋) if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } } public E poll() { restartFromHead: for (;;) { for (Node h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); if (p.casNext(null,MARK_NODE))//修改1: ;//此時為關注的操作 return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }小結
設計非阻塞算法的關鍵在于,找出競爭點,如果獲取的某個信息跟兩個操作有關,那么應該讓這兩個操作競爭同一個東西,這樣才能反應出它們的關系。
by trytocache via ifeve
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/64036.html
摘要:如果停止了版本更新,可使用方法來解除所有因而阻塞的線程,包括指定版本號的。如果自己維護版本號,則應該保證遞增。 前言 相比上一篇而言,本文不需要太多的準備知識,但技巧性更強一些。因為分析、設計的過程比較復雜繁瑣,也限于篇幅,所以,主要展示如何解決這些需求,和講解代碼。另外,所講的內容也是后一篇實戰中需要用到的一個工具類。 需求介紹 我需要編寫一個同步工具,它需要提供這樣幾個方法:...
摘要:線程安全的線程安全的,在讀多寫少的場合性能非常好,遠遠好于高效的并發隊列,使用鏈表實現。這樣帶來的好處是在高并發的情況下,你會需要一個全局鎖來保證整個平衡樹的線程安全。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...
摘要:是無阻塞隊列的一種實現,依賴與算法實現。這樣比從往后找更有效率出隊規則定義補充一項,也表示節點已經被刪除參考方法。 ConcurrentLinkedQueue是無阻塞隊列的一種實現, 依賴與CAS算法實現。 入隊offer if(q==null)當前是尾節點 -> CAS賦值tail.next = newNode, 成功就跳出循環 elseif(p == q)尾節點被移除 -> ...
閱讀 992·2021-11-23 09:51
閱讀 3481·2021-11-22 12:04
閱讀 2725·2021-11-11 16:55
閱讀 2950·2019-08-30 15:55
閱讀 3236·2019-08-29 14:22
閱讀 3360·2019-08-28 18:06
閱讀 1249·2019-08-26 18:36
閱讀 2136·2019-08-26 12:08