摘要:關鍵字經過編譯之后,會在同步塊的前后分別形成和這兩個字節碼指令。當我們的把字節碼加載到內存的時候,會對這兩個指令進行解析。這兩個字節碼都需要一個類型的參數來指明要鎖定和解鎖的對象。最后喚醒暫停的線程。
文章簡介
前面我有文章介紹了synchronized的基本原理,這篇文章我會從jvm源碼分析synchronized的實現邏輯,希望讓大家有一個更加深度的認識
內容導航從synchronized的字節碼說起
什么是monitor
分析synchronized的源碼
從synchronized的字節碼說起由于synchronized的實現是在jvm層面,所以我們如果要看它的源碼,需要從字節碼入手。這段代碼演示了synchronized作為實例鎖的兩種用法,我們觀察一下這段代碼生成的字節碼
</>復制代碼
public class App
{
public synchronized void test1(){
}
public void test2(){
synchronized (this){
}
}
public static void main( String[] args ){
System.out.println( "Hello World!" );
}
}
</>復制代碼
進入classpath目錄下找到App.class文件, 在cmd中輸入 javap -v App.class查看字節碼
</>復制代碼
public synchronized void test1();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 10: 0
LocalVariableTable:
Start Length Slot Name Signature
0 1 0 this Lcom/gupaoedu/openclass/App;
public void test2();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: aload_0
1: dup
2: astore_1
3: monitorenter //監視器進入,獲取鎖
4: aload_1
5: monitorexit //監視器退出,釋放鎖
6: goto 14
9: astore_2
10: aload_1
11: monitorexit
12: aload_2
13: athrow
14: return
通過字節碼我們可以發現,修飾在方法層面的同步關鍵字,會多一個 ACC_SYNCHRONIZED的flag;修飾在代碼塊層面的同步塊會多一個 monitorenter和 monitorexit關鍵字。無論采用哪一種方式,本質上都是對一個對象的監視器(monitor)進行獲取,而這個獲取的過程是排他的,也就是同一個時刻只能有一個線程獲得同步塊對象的監視器。
在 synchronized的原理分析這篇文章中,有提到對象監視器。
</>復制代碼
synchronized關鍵字經過編譯之后,會在同步塊的前后分別形成monitorenter和monitorexit這兩個字節碼指令。當我們的JVM把字節碼加載到內存的時候,會對這兩個指令進行解析。這兩個字節碼都需要一個Object類型的參數來指明要鎖定和解鎖的對象。如果Java程序中的synchronized明確指定了對象參數,那么這個對象就是加鎖和解鎖的對象;如果沒有明確指定,那就根據synchronized修飾的是實例方法還是類方法,獲取對應的對象實例或Class對象來作為鎖對象
什么是monitor
在分析源代碼之前需要了解oop, oopDesc, markOop等相關概念,在Synchronized的原理分析這篇文章中,我們講到了synchronized的同步鎖實際上是存儲在對象頭中,這個對象頭是一個Java對象在內存中的布局的一部分。Java中的每一個Object在JVM內部都會有一個native的C++對象oop/oopDesc與之對應。在hotspot源碼 oop.hpp中oopDesc的定義如下
</>復制代碼
class oopDesc {
friend class VMStructs;
private:
volatile markOop _mark;
union _metadata {
Klass* _klass;
narrowKlass _compressed_klass;
} _metadata;
其中 markOop就是我們所說的Mark Word,用于存儲鎖的標識。
hotspot源碼 markOop.hpp文件代碼片段
</>復制代碼
class markOopDesc: public oopDesc {
private:
// Conversion
uintptr_t value() const { return (uintptr_t) this; }
public:
// Constants
enum { age_bits = 4,
lock_bits = 2,
biased_lock_bits = 1,
max_hash_bits = BitsPerWord - age_bits - lock_bits - biased_lock_bits,
hash_bits = max_hash_bits > 31 ? 31 : max_hash_bits,
cms_bits = LP64_ONLY(1) NOT_LP64(0),
epoch_bits = 2
};
...
}
markOopDesc繼承自oopDesc,并且擴展了自己的monitor方法,這個方法返回一個ObjectMonitor指針對象,在hotspot虛擬機中,采用ObjectMonitor類來實現monitor
</>復制代碼
bool has_monitor() const {
return ((value() & monitor_value) != 0);
}
ObjectMonitor* monitor() const {
assert(has_monitor(), "check");
// Use xor instead of &~ to provide one extra tag-bit check.
return (ObjectMonitor*) (value() ^ monitor_value);
}
在 ObjectMonitor.hpp中,可以看到ObjectMonitor的定義
</>復制代碼
class ObjectMonitor {
...
ObjectMonitor() {
_header = NULL; //markOop對象頭
_count = 0;
_waiters = 0, //等待線程數
_recursions = 0; //重入次數
_object = NULL;
_owner = NULL; //獲得ObjectMonitor對象的線程
_WaitSet = NULL; //處于wait狀態的線程,會被加入到waitSet
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ; //處于等待鎖BLOCKED狀態的線程
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0; //監視器前一個擁有線程的ID
}
...
簡單總結一下,同步塊的實現使用 monitorenter和 monitorexit指令,而同步方法是依靠方法修飾符上的flag ACC_SYNCHRONIZED來完成。其本質是對一個對象監視器(monitor)進行獲取,這個獲取過程是排他的,也就是同一個時刻只能有一個線程獲得由synchronized所保護對象的監視器。所謂的監視器,實際上可以理解為一個同步工具,它是由Java對象進行描述的。在Hotspot中,是通過ObjectMonitor來實現,每個對象中都會內置一個ObjectMonitor對象
從 monitorenter和 monitorexit這兩個指令來開始閱讀源碼,JVM將字節碼加載到內存以后,會對這兩個指令進行解釋執行, monitorenter, monitorexit的指令解析是通過 InterpreterRuntime.cpp中的兩個方法實現
</>復制代碼
InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem)
InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem)
//JavaThread 當前獲取鎖的線程
//BasicObjectLock 基礎對象鎖
我們基于monitorenter為入口,沿著偏向鎖->輕量級鎖->重量級鎖的路徑來分析synchronized的實現過程
</>復制代碼
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
...
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
...
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
UseBiasedLocking是在JVM啟動的時候,是否啟動偏向鎖的標識
如果支持偏向鎖,則執行 ObjectSynchronizer::fast_enter的邏輯
如果不支持偏向鎖,則執行 ObjectSynchronizer::slow_enter邏輯,繞過偏向鎖,直接進入輕量級鎖
ObjectSynchronizer::fast_enter的實現在 synchronizer.cpp文件中,代碼如下
</>復制代碼
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) { //判斷是否開啟了偏向鎖
if (!SafepointSynchronize::is_at_safepoint()) { //如果不處于全局安全點
//通過`revoke_and_rebias`這個函數嘗試獲取偏向鎖
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {//如果是撤銷與重偏向直接返回
return;
}
} else {//如果在安全點,撤銷偏向鎖
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter (obj, lock, THREAD) ;
}
fast_enter方法的主要流程做一個簡單的解釋
再次檢查偏向鎖是否開啟
當處于不安全點時,通過 revoke_and_rebias嘗試獲取偏向鎖,如果成功則直接返回,如果失敗則進入輕量級鎖獲取過程
revoke_and_rebias這個偏向鎖的獲取邏輯在 biasedLocking.cpp中
如果偏向鎖未開啟,則進入 slow_enter獲取輕量級鎖的流程
偏向鎖的獲取邏輯BiasedLocking::revoke_and_rebias 是用來獲取當前偏向鎖的狀態(可能是偏向鎖撤銷后重新偏向)。這個方法的邏輯在 biasedLocking.cpp中
</>復制代碼
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
markOop mark = obj->mark(); //獲取鎖對象的對象頭
//判斷mark是否為可偏向狀態,即mark的偏向鎖標志位為1,鎖標志位為 01,線程id為null
if (mark->is_biased_anonymously() && !attempt_rebias) {
//這個分支是進行對象的hashCode計算時會進入,在一個非全局安全點進行偏向鎖撤銷
markOop biased_value = mark;
//創建一個非偏向的markword
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
//Atomic:cmpxchg_ptr是CAS操作,通過cas重新設置偏向鎖狀態
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {//如果CAS成功,返回偏向鎖撤銷狀態
return BIAS_REVOKED;
}
} else if (mark->has_bias_pattern()) {//如果鎖對象為可偏向狀態(biased_lock:1, lock:01,不管線程id是否為空),嘗試重新偏向
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
//如果已經有線程對鎖對象進行了全局鎖定,則取消偏向鎖操作
if (!prototype_header->has_bias_pattern()) {
markOop biased_value = mark;
//CAS 更新對象頭markword為非偏向鎖
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED; //返回偏向鎖撤銷狀態
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
//如果偏向鎖過期,則進入當前分支
if (attempt_rebias) {//如果允許嘗試獲取偏向鎖
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
//通過CAS 操作, 將本線程的 ThreadID 、時間錯、分代年齡嘗試寫入對象頭中
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) { //CAS成功,則返回撤銷和重新偏向狀態
return BIAS_REVOKED_AND_REBIASED;
}
} else {//不嘗試獲取偏向鎖,則取消偏向鎖
//通過CAS操作更新分代年齡
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) { //如果CAS操作成功,返回偏向鎖撤銷狀態
return BIAS_REVOKED;
}
}
}
}
...//省略
}
偏向鎖的撤銷
當到達一個全局安全點時,這時會根據偏向鎖的狀態來判斷是否需要撤銷偏向鎖,調用 revoke_at_safepoint方法,這個方法也是在 biasedLocking.cpp中定義的
</>復制代碼
void BiasedLocking::revoke_at_safepoint(Handle h_obj) {
assert(SafepointSynchronize::is_at_safepoint(), "must only be called while at safepoint");
oop obj = h_obj();
//更新撤銷偏向鎖計數,并返回偏向鎖撤銷次數和偏向次數
HeuristicsResult heuristics = update_heuristics(obj, false);
if (heuristics == HR_SINGLE_REVOKE) {//可偏向且未達到批量處理的閾值(下面會多帶帶解釋)
revoke_bias(obj, false, false, NULL); //撤銷偏向鎖
} else if ((heuristics == HR_BULK_REBIAS) ||
(heuristics == HR_BULK_REVOKE)) {//如果是多次撤銷或者多次偏向
//批量撤銷
bulk_revoke_or_rebias_at_safepoint(obj, (heuristics == HR_BULK_REBIAS), false, NULL);
}
clean_up_cached_monitor_info();
}
偏向鎖的釋放,需要等待全局安全點(在這個時間點上沒有正在執行的字節碼),首先暫停擁有偏向鎖的線程,然后檢查持有偏向鎖的線程是否還活著,如果線程不處于活動狀態,則將對象頭設置成無鎖狀態。如果線程仍然活著,則會升級為輕量級鎖,遍歷偏向對象的所記錄。棧幀中的鎖記錄和對象頭的Mark Word要么重新偏向其他線程,要么恢復到無鎖,或者標記對象不適合作為偏向鎖。最后喚醒暫停的線程。
</>復制代碼
JVM內部為每個類維護了一個偏向鎖revoke計數器,對偏向鎖撤銷進行計數,當這個值達到指定閾值時,JVM會認為這個類的偏向鎖有問題,需要重新偏向(rebias),對所有屬于這個類的對象進行重偏向的操作成為 批量重偏向(bulk rebias)。在做bulk rebias時,會對這個類的epoch的值做遞增,這個epoch會存儲在對象頭中的epoch字段。在判斷這個對象是否獲得偏向鎖的條件是:markword的 biased_lock:1、lock:01、threadid和當前線程id相等、epoch字段和所屬類的epoch值相同,如果epoch的值不一樣,要么就是撤銷偏向鎖、要么就是rebias; 如果這個類的revoke計數器的值繼續增加到一個閾值,那么jvm會認為這個類不適合偏向鎖,就需要進行bulk revoke操作
輕量級鎖的獲取邏輯
輕量級鎖的獲取,是調用 ::slow_enter方法,該方法同樣位于 synchronizer.cpp文件中
</>復制代碼
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
if (mark->is_neutral()) { //如果當前是無鎖狀態, markword的biase_lock:0,lock:01
//直接把mark保存到BasicLock對象的_displaced_header字段
lock->set_displaced_header(mark);
//通過CAS將mark word更新為指向BasicLock對象的指針,更新成功表示獲得了輕量級鎖
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
// Fall through to inflate() ...
}
//如果markword處于加鎖狀態、且markword中的ptr指針指向當前線程的棧幀,表示為重入操作,不需要爭搶鎖
else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don"t relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
#if 0
// The following optimization isn"t particularly useful.
if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) {
lock->set_displaced_header (NULL) ;
return ;
}
#endif
//代碼執行到這里,說明有多個線程競爭輕量級鎖,輕量級鎖通過`inflate`進行膨脹升級為重量級鎖
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
輕量級鎖的獲取邏輯簡單再整理一下
mark->is_neutral()方法, is_neutral這個方法是在 markOop.hpp中定義,如果 biased_lock:0且lock:01表示無鎖狀態
如果mark處于無鎖狀態,則進入步驟(3),否則執行步驟(5)
把mark保存到BasicLock對象的displacedheader字段
通過CAS嘗試將markword更新為指向BasicLock對象的指針,如果更新成功,表示競爭到鎖,則執行同步代碼,否則執行步驟(5)
如果當前mark處于加鎖狀態,且mark中的ptr指針指向當前線程的棧幀,則執行同步代碼,否則說明有多個線程競爭輕量級鎖,輕量級鎖需要膨脹升級為重量級鎖
輕量級鎖的釋放邏輯輕量級鎖的釋放是通過 monitorexit調用
</>復制代碼
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// Free entry. This must be done here, since a pending exception might be installed on
// exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
elem->set_obj(NULL);
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
這段代碼中主要是通過 ObjectSynchronizer::slow_exit來執行
</>復制代碼
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}
ObjectSynchronizer::fast_exit的代碼如下
</>復制代碼
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op
markOop dhw = lock->displaced_header(); //獲取鎖對象中的對象頭
markOop mark ;
if (dhw == NULL) {
// Recursive stack-lock.
// Diagnostics -- Could be: stack-locked, inflating, inflated.
mark = object->mark() ;
assert (!mark->is_neutral(), "invariant") ;
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor() ;
assert(((oop)(m->object()))->mark() == mark, "invariant") ;
assert(m->is_entered(THREAD), "invariant") ;
}
return ;
}
mark = object->mark() ; //獲取線程棧幀中鎖記錄(LockRecord)中的markword
// If the object is stack-locked by the current thread, try to
// swing the displaced header from the box back to the mark.
if (mark == (markOop) lock) {
assert (dhw->is_neutral(), "invariant") ;
//通過CAS嘗試將Displaced Mark Word替換回對象頭,如果成功,表示鎖釋放成功。
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
//鎖膨脹,調用重量級鎖的釋放鎖方法
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
輕量級鎖的釋放也比較簡單,就是將當前線程棧幀中鎖記錄空間中的Mark Word替換到鎖對象的對象頭中,如果成功表示鎖釋放成功。否則,鎖膨脹成重量級鎖,實現重量級鎖的釋放鎖邏輯
鎖膨脹的過程分析重量級鎖是通過對象內部的監視器(monitor)來實現,而monitor的本質是依賴操作系統底層的MutexLock實現的。我們先來看鎖的膨脹過程,從前面的分析中已經知道了所膨脹的過程是通過 ObjectSynchronizer::inflate方法實現的,代碼如下
</>復制代碼
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
// Inflate mutates the heap ...
// Relaxing assertion for bug 6320749.
assert (Universe::verify_in_progress() ||
!SafepointSynchronize::is_at_safepoint(), "invariant") ;
for (;;) { //通過無意義的循環實現自旋操作
const markOop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
if (mark->has_monitor()) {//has_monitor是markOop.hpp中的方法,如果為true表示當前鎖已經是重量級鎖了
ObjectMonitor * inf = mark->monitor() ;//獲得重量級鎖的對象監視器直接返回
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
if (mark == markOopDesc::INFLATING()) {//膨脹等待,表示存在線程正在膨脹,通過continue進行下一輪的膨脹
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
if (mark->has_locker()) {//表示當前鎖為輕量級鎖,以下是輕量級鎖的膨脹邏輯
ObjectMonitor * m = omAlloc (Self) ;//獲取一個可用的ObjectMonitor
// Optimistically prepare the objectmonitor - anticipate successful CAS
// We do this before the CAS in order to minimize the length of time
// in which INFLATING appears in the mark.
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
/**將object->mark_addr()和mark比較,如果這兩個值相等,則將object->mark_addr()
改成markOopDesc::INFLATING(),相等返回是mark,不相等返回的是object->mark_addr()**/
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
if (cmp != mark) {//CAS失敗
omRelease (Self, m, true) ;//釋放監視器
continue ; // 重試
}
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
//CAS成功以后,設置ObjectMonitor相關屬性
m->set_header(dmw) ;
m->set_owner(mark->locker());
m->set_object(object);
// TODO-FIXME: assert BasicLock->dhw != 0.
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
object->release_set_mark(markOopDesc::encode(m));
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite stacklock) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ; //返回ObjectMonitor
}
//如果是無鎖狀態
assert (mark->is_neutral(), "invariant");
ObjectMonitor * m = omAlloc (Self) ; ////獲取一個可用的ObjectMonitor
//設置ObjectMonitor相關屬性
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
/**將object->mark_addr()和mark比較,如果這兩個值相等,則將object->mark_addr()
改成markOopDesc::encode(m),相等返回是mark,不相等返回的是object->mark_addr()**/
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
//CAS失敗,說明出現了鎖競爭,則釋放監視器重行競爭鎖
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
// interference - the markword changed - just retry.
// The state-transitions are one-way, so there"s no chance of
// live-lock -- "Inflated" is an absorbing state.
}
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite neutral) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ; //返回ObjectMonitor對象
}
}
鎖膨脹的過程稍微有點復雜,整個鎖膨脹的過程是通過自旋來完成的,具體的實現邏輯簡答總結以下幾點
mark->has_monitor() 判斷如果當前鎖對象為重量級鎖,也就是lock:10,則執行(2),否則執行(3)
通過 mark->monitor獲得重量級鎖的對象監視器ObjectMonitor并返回,鎖膨脹過程結束
如果當前鎖處于 INFLATING,說明有其他線程在執行鎖膨脹,那么當前線程通過自旋等待其他線程鎖膨脹完成
如果當前是輕量級鎖狀態 mark->has_locker(),則進行鎖膨脹。首先,通過omAlloc方法獲得一個可用的ObjectMonitor,并設置初始數據;然后通過CAS將對象頭設置為`markOopDesc:INFLATING,表示當前鎖正在膨脹,如果CAS失敗,繼續自旋
如果是無鎖狀態,邏輯類似第4步驟
重量級鎖的競爭邏輯</>復制代碼
鎖膨脹的過程實際上是獲得一個ObjectMonitor對象監視器,而真正搶占鎖的邏輯,在 ObjectMonitor::enter方法里面
重量級鎖的競爭,在 ObjectMonitor::enter方法中,代碼文件在 objectMonitor.cpp重量級鎖的代碼就不一一分析了,簡單說一下下面這段代碼主要做的幾件事
通過CAS將monitor的 _owner字段設置為當前線程,如果設置成功,則直接返回
如果之前的 _owner指向的是當前的線程,說明是重入,執行 _recursions++增加重入次數
如果當前線程獲取監視器鎖成功,將 _recursions設置為1, _owner設置為當前線程
如果獲取鎖失敗,則等待鎖釋放
</>復制代碼
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {//CAS成功
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert OwnerIsThread == 1
return ;
}
if (cur == Self) {
// TODO-FIXME: check for integer overflow! BUGID 6557169.
_recursions ++ ;
return ;
}
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
// We"ve encountered genuine contention.
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
// Try one round of spinning *before* enqueueing Self
// and before going through the awkward and expensive state
// transitions. The following spin is strictly optional ...
// Note that if we acquire the monitor from an initial spin
// we forgo posting JVMTI events and firing DTRACE probes.
if (Knob_SpinEarly && TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
return ;
}
assert (_owner != Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (Self->is_Java_thread() , "invariant") ;
JavaThread * jt = (JavaThread *) Self ;
assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;
assert (jt->thread_state() != _thread_blocked , "invariant") ;
assert (this->object() != NULL , "invariant") ;
assert (_count >= 0, "invariant") ;
// Prevent deflation at STW-time. See deflate_idle_monitors() and is_busy().
// Ensure the object-monitor relationship remains stable while there"s contention.
Atomic::inc_ptr(&_count);
EventJavaMonitorEnter event;
{ // Change java thread status to indicate blocked on monitor enter.
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);
DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_enter()) {
JvmtiExport::post_monitor_contended_enter(jt, this);
}
OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);
Self->set_current_pending_monitor(this);
// TODO-FIXME: change the following for(;;) loop to straight-line code.
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
//
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don"t want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;
jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
...//此處省略無數行代碼
如果獲取鎖失敗,則需要通過自旋的方式等待鎖釋放,自旋執行的方法是 ObjectMonitor::EnterI,部分代碼如下
將當前線程封裝成ObjectWaiter對象node,狀態設置成TS_CXQ
通過自旋操作將node節點push到_cxq隊列
node節點添加到_cxq隊列之后,繼續通過自旋嘗試獲取鎖,如果在指定的閾值范圍內沒有獲得鎖,則通過park將當前線程掛起,等待被喚醒
</>復制代碼
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
...//省略很多代碼
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
// Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
ObjectWaiter * nxt ;
for (;;) { //自旋,講node添加到_cxq隊列
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
...//省略很多代碼
//node節點添加到_cxq隊列之后,繼續通過自旋嘗試獲取鎖,如果在指定的閾值范圍內沒有獲得鎖,則通過park將當前線程掛起,等待被喚醒
for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
// park self //通過park掛起當前線程
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;//當前線程掛起
}
if (TryLock(Self) > 0) break ; //當線程被喚醒時,會從這里繼續執行
TEVENT (Inflated enter - Futile wakeup) ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
++ nWakeups ;
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset() ;
OrderAccess::fence() ;
}
if (_succ == Self) _succ = NULL ;
// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence() ;
}
...//省略很多代碼
}
TryLock(self)的代碼是在 ObjectMonitor::TryLock定義的,代碼的實現如下
</>復制代碼
代碼的實現原理很簡單,通過自旋,CAS設置monitor的_owner字段為當前線程,如果成功,表示獲取到了鎖,如果失敗,則繼續被掛起
</>復制代碼
int ObjectMonitor::TryLock (Thread * Self) {
for (;;) {
void * own = _owner ;
if (own != NULL) return 0 ;
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
// Either guarantee _recursions == 0 or set _recursions = 0.
assert (_recursions == 0, "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert that OwnerIsThread == 1
return 1 ;
}
// The lock had been free momentarily, but we lost the race to the lock.
// Interference -- the CAS failed.
// We can either return -1 or retry.
// Retry doesn"t make as much sense because the lock was just acquired.
if (true) return -1 ;
}
}
重量級鎖的釋放
重量級鎖的釋放是通過 ObjectMonitor::exit來實現的,釋放以后會通知被阻塞的線程去競爭鎖
判斷當前鎖對象中的owner沒有指向當前線程,如果owner指向的BasicLock在當前線程棧上,那么將_owner指向當前線程
如果當前鎖對象中的_owner指向當前線程,則判斷當前線程重入鎖的次數,如果不為0,繼續執行ObjectMonitor::exit(),直到重入鎖次數為0為止
釋放當前鎖,并根據QMode的模式判斷,是否將_cxq中掛起的線程喚醒。還是其他操作
</>復制代碼
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
if (THREAD != _owner) {//如果當前鎖對象中的_owner沒有指向當前線程
//如果_owner指向的BasicLock在當前線程棧上,那么將_owner指向當前線程
if (THREAD->is_lock_owned((address) _owner)) {
// Transmute _owner from a BasicLock pointer to a Thread address.
// We don"t need to hold _mutex for this transition.
// Non-null to Non-null is safe as long as all readers can
// tolerate either flavor.
assert (_recursions == 0, "invariant") ;
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
// NOTE: we need to handle unbalanced monitor enter/exit
// in native code by throwing an exception.
// TODO: Throw an IllegalMonitorStateException ?
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
//如果當前,線程重入鎖的次數,不為0,那么就重新走ObjectMonitor::exit,直到重入鎖次數為0為止
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
...//此處省略很多代碼
for (;;) {
if (Knob_ExitPolicy == 0) {
OrderAccess::release_store(&_owner, (void*)NULL); //釋放鎖
OrderAccess::storeload(); // See if we need to wake a successor
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT(Inflated exit - simple egress);
return;
}
TEVENT(Inflated exit - complex egress);
//省略部分代碼...
}
//省略部分代碼...
ObjectWaiter * w = NULL;
int QMode = Knob_QMode;
//根據QMode的模式判斷,
//如果QMode == 2則直接從_cxq掛起的線程中喚醒
if (QMode == 2 && _cxq != NULL) {
w = _cxq;
ExitEpilog(Self, w);
return;
}
//省略部分代碼... 省略的代碼為根據QMode的不同,不同的喚醒機制
}
}
根據不同的策略(由QMode指定),從cxq或EntryList中獲取頭節點,通過ObjectMonitor::ExitEpilog方法喚醒該節點封裝的線程,喚醒操作最終由unpark完成
</>復制代碼
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
{
assert (_owner == Self, "invariant") ;
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
// Hygiene -- once we"ve set _owner = NULL we can"t safely dereference Wakee again.
// The thread associated with Wakee may have grabbed the lock and "Wakee" may be
// out-of-scope (non-extant).
Wakee = NULL ;
// Drop the lock
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
Trigger->unpark() ; //unpark喚醒線程
// Maintain stats and report events to JVMTI
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}
</>復制代碼
分析源碼,需要很大的耐心,希望大家能有耐心看下去,有疑問歡迎微信留言
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72573.html
摘要:與一樣,該類繼承抽象類,并且通過外部的屬性文件定義邏輯視圖名稱與真正的視圖對象的關系,屬性文件默認是下的,可以通過或屬性來指定,該屬性指的是文件的基名稱,也就是說以屬性值開頭的屬性文件。 概述 本章再學習另外兩個ViewResolver,分別是XmlViewResolver和ResourceBundleViewResolver,從功能上說,這兩個視圖解析器都是從外部資源文件中查找視圖V...
摘要:原文地址游客前言金三銀四,很多同學心里大概都準備著年后找工作或者跳槽。最近有很多同學都在交流群里求大廠面試題。 最近整理了一波面試題,包括安卓JAVA方面的,目前大廠還是以安卓源碼,算法,以及數據結構為主,有一些中小型公司也會問到混合開發的知識,至于我為什么傾向于混合開發,我的一句話就是走上編程之路,將來你要學不僅僅是這些,豐富自己方能與世接軌,做好全棧的裝備。 原文地址:游客kutd...
閱讀 2871·2021-09-22 15:43
閱讀 4798·2021-09-06 15:02
閱讀 859·2019-08-29 13:55
閱讀 1692·2019-08-29 12:58
閱讀 3083·2019-08-29 12:38
閱讀 1261·2019-08-26 12:20
閱讀 2278·2019-08-26 12:12
閱讀 3324·2019-08-23 18:35