淺析從同步原語看非阻塞同步以及Java中的應用

一、從硬件原語上理解同步(非特指Java)

同步機制是多處理機系統的重要組成部分,其實現方式除瞭關系到計算的正確性之外還有效率的問題。同步機制的實現通常是在硬件提供的同步指令的基礎上,在通過用戶級別軟件例程實現的。上面說到的樂觀策略實際上就是建立在硬件指令集的基礎上的(我們需要實際操作和沖突檢測是原子性的),一般有下面的常用指令:測試並設置(test_and_set)、獲取並增加(fetch_and_increment)、原子交換(Atomic_Exchange)、比較並交換(CAS)、加載連接條件存儲(LL/SC),下面我們會講到這些以及通過這些硬件同步原語實現的旋轉鎖和柵欄同步。

1.1、基本硬件原語

在多處理機中實現同步,所需的主要功能是一組能以原子操作讀出並修改存儲單元的硬件原語。如果沒有這種操作,建立基本的同步原語的代價會非常大。基本硬件原語有幾種形式提供選擇,他們都能以原子操作的方式讀改存儲單元,並指出進行的操作是否能以原子形式進行,這些原語作為基本構建提供構造各種各樣的用戶及同步操作。

一個典型的例子就是原子交換(Atomic Exchange),他的功能是將一個存儲單元中的值和一個寄存器的值進行交換。我們看看這個原語怎樣構造一個我們通常意義上說的簡單的鎖。

假設現在我們構造這樣一個簡單的鎖:其值為0表示鎖是開的(鎖可用),為1表示上鎖(不可用)。當處理器要給該鎖上鎖的時候,將對應於該鎖的存儲單元的值與存放在某個寄存器中的1進行交換。如果別的處理器已經上瞭鎖,那麼交換指令返回的值為1否則為0。返回0的時候,因為是原子交換,鎖的值就會從0變為1表示上鎖成功;返回1,原子交換鎖的值還是1,但是返回1表示已經被上瞭鎖。我們考慮使用這個鎖:假設兩個處理器同時進行交換操作(原子交換),競爭的結果就是,隻有一個處理器會先執行成功而得到返回值0,而另一個得到的返回值為1表示已經被上鎖。從這些我們可以看出,采用原子交換指令是實現同步的關鍵:這個原子交換操作的不可再分的,兩個交換操作將由寫順序機制確定先後順序,這也保證瞭兩個線程不能同時獲取同步變量鎖。

除此之外,還有別的原語可以實現同步(關鍵都在於能以原子的方式讀-改-寫存儲單元的值)。例如:測試並置定(test_and_set)(先測試一個存儲單元的值,如果符合條件就修改其值),另一個同步原語是讀取並加1(fetch_and_increment))(返回存儲單元的值並自動增加該值)。

那麼,上面的基本原語操作又是怎樣實現的呢,這在一條指令中完成上述操作顯然是困難的(在一條不可中斷的指令中完成一次存儲器的讀改寫,而且要求不允許其他的訪存操作還要避免死鎖)。現在的計算機上采用一對指令來實現上述的同步原語。該指令對由兩條特殊的指令組成,一條是特殊的load指令(LL指令),另一條是特殊的store指令(SC)。指令的執行順序是:如果LL指令指明的存儲單元的值在SC對其進行寫之前被其他的指令改寫過,則第二條指令執行失敗,如果在兩條指令之間進行切換也會導致執行SC失敗,而SC指令將通過返回一個值來指出該指令操作是否成功(如果返回的1表示執行成功,返回0表示失敗)。為什麼說這對指令相當於原子操作呢,這指的是是所有其他處理器進行的操作或者在這對指令之前執行或者在其後執行,不存在兩條指令之間進行,所以在這一對指令之間不存在任何其他處理器改變相應存儲單元的值。

下面是一段實現對R1指出的存儲單元進行的原子交換操作

try:OR    R3,R4,R0 //R4中為交換值,將該值送入R3

    LL    R2,0(R1) //將0(R1)中的值取到R2

    SC    R3,0(R1) //若0(R1)中的值與R3中的值相同,則置R3的值為1,否則為0

    BEQZ R3,try //R3的值為0表示存失敗,轉移重新嘗試

    MOV R4,R2 //成功,將取出的值送往R4     

最終R4和由R1指向的存儲單元值進行瞭原子交換,在LL和SC之間如果有別的處理器插入並且修改瞭存儲單元的值則SC都會返回0並存入R3中從而重新執行交換操作。下面是實現各個講到的讀取並加1(fetch_and_increment)原語的實現

try:LL    R2,0(R1) //將0(R1)中的值送入R2

    DADDIU    R2,R2,#1 //加1操作(R2+1->R2)

    SC    R2,0(R1) //如果0(R1)中的值和R2中的值相同就置R2的值為1,否則為0

    BEQZ    R2,try //R2的值為0表示存失敗,轉移到開始出重新執行

上面的指令的執行需要跟蹤地址,通常LL指令指定一個寄存器,該寄存器中存放著目的存儲單元的地址,這個寄存器稱為連接寄存器,如果發生中斷切換或者與連接寄存器中的地址匹配的cache塊被作廢(被別的SC指令訪問),則將連接寄存器清零,SC指令則檢查它的存儲地址和連接寄存器匯中的內容是夠匹配,如果匹配則SC指令繼續執行,否則執行失敗。

1.2、用一致性實現鎖

我們現在用上面的原子交換的同步原語實現自旋鎖(spin lock)(處理器不停請求獲得鎖的試用權,圍繞該鎖反復執行循環程序,直到獲得鎖)。自旋鎖適用於這樣的場景:鎖被占用時間少,在獲得鎖之後加鎖的過程延遲小。

下面我們考慮使用一種簡單的方法實現:將鎖變量保存在存儲器中,處理器可以不斷通過原子交換操作來請求其使用權,比如使用原子交換操作獲得其返回值從而直達鎖變量的使用情況。釋放鎖的時候,處理器隻需要將說置為0。如下面的程序:使用原子交換操作堆自旋鎖進行加鎖,其中R1中存放的是自旋鎖變量的地址

        DADDIU R2,R0,#1

lockit: EXCH R2,0(R1) //原子交換,獲得自旋鎖的值並在下面比較自旋鎖的值為1還是0,為1表示已經上鎖

        BNEZ R2,lockit //若R2的內容不為0,則表示已經有其他程序獲得瞭鎖變量,就繼續旋轉等待

下面我們對這個簡單的自旋鎖實現進行一些改進(下面說到的可類比JMM內存模型理解)如果計算機支持Cache一致性,就可以將鎖調入Cache中(類比本地內存),並通過一致性保證使得鎖的值保持和存儲器中的值一致(類比內存可見性和本地內存主內存的值一致同步)。這樣做有下面的好處:①使得環繞自旋鎖的線程(自旋請求鎖變量)隻對本地Cache中的鎖(主存中的副本)進行操作,而不用再每次請求占用鎖時候進行一次全局的訪存操作(訪問主內存存儲器中存放的鎖的值) ②利用訪問鎖的程序局部性原理(處理器最近使用的鎖可能不久後還會使用),這種情況就可以使得鎖駐留在對應的Cache中,大大減少瞭獲得鎖所需要的時間(處於性能考慮,需要減少全局訪存操作)。

在改進之前,我們應該知道,在上面的簡單實現的基礎上(上面的每次循環交換均需要一次寫操作,因為有多個處理器會同時請求加鎖,這就會導致一個處理器請求成功後,其他處理器都會寫不命中),需要對這個程序進行改進,使得它隻對本地副本中的鎖變量進行讀取和檢測,直到發現鎖已經被釋放。發現釋放之後,立刻去進行交換操作跟別的處理器競爭鎖變量。所有這些進程還是以原子交換的方式獲得鎖,也隻有一個進程可以獲得成功(獲得鎖變量成功的進程交換後看到的鎖變量值為0,交換之後的鎖變量值為1表示上鎖成功;而獲得失敗的進程雖然也交換瞭鎖變量的值,但是因為交換後自己看到的鎖變量的值已經是1,就表示自己進程失敗瞭),其他的需要繼續旋轉等待。當獲得鎖的進程使用完之後,將鎖變量置為0表示釋放鎖由其他需要獲取的進程去競爭它(其他進程會在自己的Cache中發現鎖變量的值發生變化,這是上面所說的Cache一致性)。下面是修改後的旋轉鎖程序

lockit: LD    R2,0(R1) //取得鎖的值

        BNEZ R2,lockit //如果鎖還沒有釋放(R2的值還是1)

        DADDIU    R2,R0,#1 //將R2值置為1(這裡面可以這樣想:上面BNEZ執行失敗表示R2值為0,那麼這個時候就+1)

        EXCH R2,0(R1) //將R2中的值和0(R1)中的鎖變量進行原子交換

        BNEZ R2,lockit //上面第一次判斷是當前進程首先發現主存中的鎖變量值發生變化;

                       //進行原子交換結果判斷和上面一樣,如果狡猾後返回值為0表示成功,為1表示失敗就繼續旋轉等待獲取

1.3、使用上面的旋轉鎖實現我們一個同步原語——柵欄同步

首先解釋一下什麼叫柵欄同步(barrier)。假設有一個類似於柵欄的東西,它會強制所有到達柵欄的進程進行等待,直到全部的進程都到達之後釋放所有到達的進程繼續往下執行,從而形成同步。下面我們就通過上面說的旋轉鎖來簡單模擬實現這樣的一個同步原語

使用兩個旋轉鎖,一個表示計數器,記錄已經到達該柵欄的進程數;另一個用來封鎖進程知道最後一個進程到達該柵欄。為瞭實現柵欄,我們需要一個變量,到達並阻塞住的進程需要在這個變量上自旋等待知道滿足它需要的條件(都到達柵欄然後才能往下執行)。我們使用spin表示這個條件condition。如下的程序所示,其中lock和unlock提供基本的旋轉鎖,變量count記錄已經到達柵欄的進程數,total表示已經到達柵欄的進程總數,對counterlock加鎖保證瞭增量操作的原子性,release用來封鎖最後一個到達柵欄的進程。spin(release==1)表示需要全部進程都到達柵欄。

lock(counterlock); //確保更新的原子性
if(count == 0) release = 0; //第一個進程到達,這時候重置release為0表示在其值變為1之前後續到達的進程都需要等待
count = count + 1; //記錄到達的進程數
unlock(counterlock); //釋放鎖
if(count == total) { //進程全部到達
    count = 0; //重置計數器count
    release = 1; //將release置為1表示釋放所欲到達的進程
} else { //進程還沒有全部到達
    spin(release == 1); //已經到達的進程旋轉等待知道所有的進程到達(言外之意就是release=1)
}

但是上面的這種簡單實現還是存在問題的,我們考慮下面這種可能發生的情況:當柵欄的使用在循環當中時候,這時候所有釋放的進程在運行一段時間之後還會到達柵欄,假設其中一個進程在上次釋放的時候還沒有來得及離開柵欄,而是依舊停留在旋轉操作上(可能操作系統重新進行進程調度導致那個進程沒有來得及離開柵欄)。如果第二次柵欄使用的時候,一個執行較快的進程到達柵欄(這個快的意思是,當他到達柵欄之後上次那個還沒有離開柵欄的進程還在旋轉操作上),這個快的進程會發現count=0,那麼他就會將release置為0,這時候就會導致那個還在旋轉等待的進程發現release值為0,然後那就更不會再退出這個旋轉操作瞭,就相當於被捆綁在柵欄上出不去(這個問題會導致後續的count計數少瞭一個進程到達,而總是小於total),那這樣的話,由於count總是小於total那不是所有到達柵欄的進程都在spin上一直自旋瞭嗎。那怎麼解決這個問題呢,一種方法就是在進程離開柵欄的時候也進行計數,在上次使用柵欄的進程全部離開柵欄之前不允許執行快的進程再次使用並初始化柵欄的一些變量值。還有一種方法是使用sense_reversing柵欄,即每個進程隻用一個本地私有變量local_sense並初始化為1,用它和release判斷進程是否需要自旋等待。

二、Java中的原子性操作概述

所謂原子操作,就是指執行一系列操作的時候,要麼全部執行要麼全部不執行,不存在隻執行一部分的情況。在設置計數器的時候一般是讀取當前的值,然後+1在更新(讀-改-寫的過程),如果不能保證這這幾個操作的過程的原子性就可能出現線程安全問題,比如下面的代碼示例,++value在沒有任何額外保證的前提下不是原子操作。

public class ThreadUnSafe{
    private Long value;

    public Long getValue() {return value;}

    public void increment() {++value;}
}

使用Javap -c XX.class查看匯編代碼如下

這是個復合操作,是不具備原子性的。而保證這個操作原子性的方法最簡單的就是加上synchronized關鍵字,使用synchronized可以實現線程安全性,但是這是個獨占鎖,沒有獲取內部鎖的線程會被阻塞住(即便是這裡的getValue操作,多線程訪問也會阻塞住),這對於並發性能的提高是不好的(而這裡也不能簡單的去掉getValue上的synchronized,因為讀操作需要保證value的讀一致性,即需要獲得主內存中的值而不是線程工作內存中的可能是舊的副本值)。那麼除瞭加鎖之外其他安全的方法?後面講到的原子類(使用CAS實現)就可以作為一個選擇。

三、Java中的CAS操作概述

Java中提供非阻塞的volatile關鍵字解決保證共享變量的可見性問題,但是不能解決部分符合操作不具備原子性的問題(比如自增運算)。CAS即CompareAndSwap是JDK提供的非阻塞原子操作,通過硬件保證比較更新的原子性。我們通過compareAndSwapLong來簡單介紹CAS:

compareAndSwapLong(Object obj, long valueOffset, long expect, long update),該方法中compareAndSwap表示比較並交換,方法中有四個操作數,其中obj表示對象內存的位置,valueOffset表示對象中存儲變量的偏移量,expect表示變量的預期值,update表示更新值。操作含義就是,若果對象obj中內存偏移量為valueOffset的變量值為expect則使用心得update值替換舊的值expect,這是處理器提供的一個原子指令。這些方法有sun.misc.Unsafe類提供。後面我們會說到Unsafe類

在此之前我們先說一下CAS操作的一個經典的ABA問題:假如線程1 使用CAS修改初始值為A的變量X,那麼線程1會首先回去當前變量X的值(A),然後使用CAS操作嘗試修改X的值為B,如果使用CAS修改成功瞭,那麼程序一定執行正確瞭嗎?在往下的假設看,如果線程I在獲取變量X的值A後,在執行CAS之前線程II使用CAS修改變量X的值為B然後由修改回瞭A。這時候雖然線程I執行CAS時候X的值依舊是A但是這個A已經不是線程I獲取時候的A瞭,這就是ABA問題。ABA產生的原因是變量的狀態值產生瞭環形轉換,即變量值從A->B,然後又從B->A。jdk中提供瞭帶有標記的原子類AtomicStampedReference(時間戳原子引用)通過控制變量的版本保證CAS的正確性。如下所做的測試ABA問題以及使用AtomicStampedReference來解決這個問題

3.1、模擬ABA問題

下面的程序輸出結果會是這樣的

package test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class TestAtomicStampedReference {

    static AtomicReference<Integer> atomicReference = new AtomicReference<>(1);

    public static void main(String[] args) {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                atomicReference.compareAndSet(1,2);
                atomicReference.compareAndSet(2,1);
                System.out.println(Thread.currentThread() + "線程修改後的變量值" + atomicReference.get());
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                //sleep 1秒,保證線程t1完成1->2->1的模擬ABA操作
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                atomicReference.compareAndSet(1,3);
                System.out.println(Thread.currentThread() + "線程修改後的變量值" + atomicReference.get());
            }
        });

        t1.start();
        t2.start();
    }
}

3.2、使用AtomicStampedReference重新實現

下面是運行結果

package test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;

public class TestAtomicStampedReference {

    static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(10,1); //定義初始值和初始版本號
    public static void main(String[] args) {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                //線程1獲得初始版本號並sleep1秒
                int version = atomicStampedReference.getStamp();
                System.out.println(Thread.currentThread() + "當前線程獲得的版本號" + version);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + "修改變量結果true/false?:" +
                        atomicStampedReference.compareAndSet(10,11,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1)
                        + "修改後的結果:" + atomicStampedReference.getReference());
                System.out.println(Thread.currentThread() + "修改變量結果true/false?:" +
                        atomicStampedReference.compareAndSet(11,10,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1)
                        + "修改後的結果:" + atomicStampedReference.getReference());
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                //首先獲得初始版本號,sleep2秒讓線程1完成10->11->10的模擬ABA操作
                int version = atomicStampedReference.getStamp();
                System.out.println(Thread.currentThread() + "當前線程獲得的版本號" + version);
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + "修改變量結果true/false?:" +
                        atomicStampedReference.compareAndSet(10,20,version,atomicStampedReference.getStamp()+1)
                        + "修改後的結果:" + atomicStampedReference.getReference());
            }
        });

        t1.start();
        t2.start();
    }
}

四、Java中的Unsafe類

JDK中的rt.jar包中的Unsafe類提供瞭硬件級別的原子性操作

Unsafe類中許多方法都是native方法,他們使用JNI的方式訪問本地C++中的實現庫。下面我們瞭解一下Unsafe類提供的幾個主要的方法以及如何使用unsafe類進行一些編程操作。

4.1、Unsafe類中的重要方法介紹

(1)public native long objectFieldOffset(Field var1):返回指定的變量在所屬類中的內存偏移地址,該偏移地址僅僅在該Unsafe函數中訪問指定字段時候使用。如下使用Unsafe類獲取變量value在Atomic對象中的內存偏移量

 

(2)public native int arrayBaseOffset(Class<?> var1):獲取數組中第一個元素的地址

(3)public native int arrayIndexScale(Class<?> var1):獲取數組中一個元素占用的字節

(4)public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5):比較對象var1中的偏移量為var2的變量的值是否與var4相同,相同則使用var6的值更新,並返回true,否則返回false。

(5)public native long getLongVolatile(Object var1, long var2):獲取對象var1中偏移量為offset的變量對應volatile語義的值。

(6)public native void putLongVolatile(Object var1, long var2, long var4):設置var1對象中offset偏移類型為long的值為var4,支持volatile語義

(7)public native void putOrderedLong(Object var1, long var2, long var4):設置對象obj中offset偏移地址對應的long型的field的值為value。這是一個有延遲的putLongVolatile方法,並且不保證對應的值類型的修改對其他線程可見,隻有變量在隻用volatile修飾並且預計會被意外修改的時候才會使用該方法、

(8)public native void park(boolean var1, long var2):阻塞當前線程,其中參數var1等於false且var2等於0表示一直阻塞,var2大於0表示等待指定的時間後阻塞線程會被喚醒。這個var的值是相對的,為一個增量值,也就是相當當前時間累加事假後當前線程就會被喚醒。如果var1位true,並且var2大於0,則表示阻塞的線程到指定的時間點後就會被喚醒,這裡的時間var2是個絕對時間,是某個時間點換算為ms後的值。

(9)public native void unpark(Object var1):喚醒調用park方法之後的線程。

下面是jdk8之後新增加的,我們列出Long類型的方法

(10)getAndSetLong()方法:獲取當前對象var1中偏移量為var2的變量volatile語義的當前值,並設置變量volatile語義的值為var4。

首先使用getLongVolatile獲取當前變量的值,然後使用CAS原子操作設置新的值。這裡使用while是當CAS失敗時候進行重試。

 

(11)getAndAddLong()方法:獲取對象var1中偏移量為var2變量的volatile語義的值,設置變量值為原始值+var4

 

4.2、Unsafe類的使用

考慮編寫出下面的程序,並在自己的IDE中運行下面的程序,觀察結果。

package test;

import sun.misc.Unsafe;

public class TestUnsafe {

    //獲取Unsafe的實例
    static Unsafe unsafe = Unsafe.getUnsafe();

    //記錄變量value在TestUnsafe中的偏移量
    static long valueState;

    //變量
    private volatile long value;

    static {
        try {
            //獲取value變量在TestUnsafe類中的偏移量
            valueState = unsafe.objectFieldOffset(TestUnsafe.class.getDeclaredField("value"));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    public static void main(String[] args) {
        TestUnsafe testUnsafe = new TestUnsafe();
        System.out.println(unsafe.compareAndSwapInt(testUnsafe,valueState,0,1));
    }
}

上面的程序中首先獲取Unsafe的一個實例,然後使用unsafe的objectFieldOffset方法獲取TestUnsafe類中value變量,計算在TestUnsafe類中value變量的內存偏移地址並保存到valueState中。main中調用unsafe的compareAndSwapInt方法設置testUnsafe對象的value變量的值為1(如果是0的話)。value初始默認是0,我們希望代碼能輸出true(即compareAndSwapInt能夠執行成功),但是最終運行時下面的結果

我們看到上面的異常報錯在getUnsafe方法位置,下來我們看一看getUnsafe方法

public static Unsafe getUnsafe() {
    //(1)獲取調用getUnsafe類的這個Class類,按照上面的程序中的TestUnsafa類
    Class var0 = Reflection.getCallerClass();
    //(2)看下面的那個方法
    if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
        throw new SecurityException("Unsafe");
    } else {
        return theUnsafe;
    }
}
/**
 * (3)判斷是不是啟動類加載器加載的類,即看看是不是由BootStrapClassLoader加載的TestUnsafe.class,
 *    由於我們這是一個簡單測試類,是由應用程序類加載器AppClassLoader加載的,所以直接報出SecurityException異常
 */
public static boolean isSystemDomainLoader(ClassLoader var0) {
    return var0 == null;
}

由於Unsafe類rt.jar包提供的,該包下面的類都是通過Bootstrap類加載器加載的,而我們使用的main方法所在的類是由AppClassLoader加載的,所以在main方法中加載Unsafe類的時候根據雙親委派機制會委托給Bootstrap加載。那麼如果想要使用Unsafe類應該怎樣使用呢,《深入理解java虛擬機》中這一塊告訴我們可以使用反射來使用,下面我們來試一下

package test;

import sun.misc.Unsafe;

import java.lang.reflect.Field;

public class TestUnsafe2 {

    static Unsafe unsafe;

    static long valueOffset;

    private volatile long value = 0;

    static {
        try {
            //使用反射獲取Unsafe的成員變量theUnsafe
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            //設置為課存取
            field.setAccessible(true);
            //設置該變量的值
            unsafe = (Unsafe) field.get(null);
            //獲取value偏移量
            valueOffset = unsafe.objectFieldOffset(TestUnsafe2.class.getDeclaredField("value"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        TestUnsafe2 test = new TestUnsafe2();
        System.out.println("修改變量結果true/false?:" +
                unsafe.compareAndSwapInt(test,valueOffset,0,1)
                + "修改後的結果:" + test.value);
    }
}

得到下面的結果:

五、JUC中原子操作類AtomicLong的原理探究

5.1、原操作類概述

JUC包中提供瞭很多原子操作類,這些類都是通過上面說到的非阻塞CAS算法來實現的,相比較使用鎖來實現原子性操作CAS在性能上有很大提高。由於原子操作類的原理都大致相同,所以下面分析AtomicLong類的實現原理來進一步瞭解原子操作類。

5.2、AtomicLong的源碼

下面是AtomicLong原子類的部分源碼,其中主要包含其成員變量以及一些靜態代碼塊和構造方法

public class AtomicLong extends Number implements java.io.Serializable {

    //(1)獲取Unsafe實例
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    //(2)保存value值的偏移量
    private static final long valueOffset;

    //(3)判斷當前JVM是否支持Long類型的無鎖CAS
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
    private static native boolean VMSupportsCS8();

    static {
        try {
            //(4)獲取value值在AtomicLong中的偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    //(5)實際存的變量值value
    private volatile long value;

    //構造方法
    public AtomicLong(long initialValue) {
        value = initialValue;
    }
}

在上面的部分代碼中,代碼(1)通過Unsafe.getUnsafe()方法獲取到Unsafe類的實例(AtomicLong類也是rt.jar包下面的,所以AtomicLong也是通過啟動類加載器進行類加載的)。(2)(4)兩處是計算並保存AtomicLong類中存儲的變量value的偏移量。(5)中的value被聲明為volatile的,這是為瞭在多線程下保證內存的可見性,而value就是具體存放計數的變量。下面我們看看AtomicLong中的主要幾個函數

(1)遞增和遞減的源碼

//使用unsafe的方法,原子性的設置value值為原始值+1,返回值為遞增之後的值
public final long getAndIncrement() {
    return unsafe.getAndAddLong(this, valueOffset, 1L);
}
public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
//使用unsafe的方法,原子性的設置value值為原始值-1,返回值為遞減之後的值
public final long getAndDecrement() {
    return unsafe.getAndAddLong(this, valueOffset, -1L);
}
public final long decrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}

在上面的代碼中都是通過調用Unsafe類的getAndAddLong方法來實現操作的,我們來看看這個方法,這個方法是個原子性操作:其中的第一個參數是AtomicLong實例的引用,第二個參數是value變量在AtomicLong中的偏移量,第三個參數是要設置為第二個變量的值。下面就是getAndAddLong方法的實現,以及一些分析

public final long getAndAddLong(Object var1, long var2, long var4) {
    long var6;
    do {
        //public native long getLongVolatile(Object var1, long var2);
        //該方法就是獲取var1引用指向的內存地址中偏移量為var2位置的值,然後賦給var6
        var6 = this.getLongVolatile(var1, var2);
    /**public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
     * var1:AtomicXXX類型的一個引用,指向堆內存中的一塊地址
     * var2:AtomicXXX源碼中的valueOffset,表示AtomicXXX源碼中實際存儲的值value在原子類型內存中的地址偏移量
     * var4:要比較的目標值expectValue,如果從內存指定地址處(var1和var2決定的那塊地址)的值和該值相等,則CAS成功
     * var6:CAS成功後向該內存中寫進的新值
     */
    //該方法就是使用CAS的方式,比較指定內存地址處(var1指向的內存地址塊中偏移量為var2處)的值和上面同一塊地址處取出的var6是否相等,
    //相等就將var6+var4(這裡可以看成var6+1)和指定內存地址處(var2引用指向的地址塊中偏移量為var2處)的值交換,並返回true,然後就會結束循環
    //CAS失敗返回false,然後繼續執行循環體內部的代碼,直到成功(也就是自增運算成功就會跳出循環並返回自增後的值)
    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

    return var6;
}

(2)CompareAndSet方法

下面是compaerAndSet方法的實現,主要還是調用unsafe類的compareAndSwapLong方法,其原理和上面分析的差不多,都是通過CAS的方式進行比較交換值。

public final boolean compareAndSet(long expect, long update) {
    //public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

(3)擴展,下面是compareAndSwapInt的底層實現,實際上是通過硬件同步原語來實現的CAS,下面的cmpxchg就是基於硬件原語實現的

UNSAFE_ENTRY(jboolean,Usafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UsafeWrapper("Usafe_CompareAndSwapInt");
oop p = JNIHasdles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x,addr,e)) == e;
UNSAFE_END

(4)下面是一個例子,使用AtomicLong來進行技術運算

package test;

import java.util.concurrent.atomic.AtomicLong;

public class TestAtomic1 {

    //創建AtomicLong類型的計數器
    private static AtomicLong atomicLong = new AtomicLong();
//    private static Long atomicLong = 0L;
    //創建兩個數組,計算數組中的0的個數
    private static Integer[] arr1 = {0,1,2,3,0,5,6,0,56,0};
    private static Integer[] arr2 = {10,1,2,3,0,5,6,0,56,0};

    public static void main(String[] args) throws InterruptedException {

        //線程1統計arr1中0的個數
        Thread t1  = new Thread(new Runnable() {
            @Override
            public void run() {
                int size = arr1.length;
                for (int i = 0; i < size; i++) {
                    if(arr1[i].intValue() == 0) {
//                        atomicLong.getAndIncrement();
                        atomicLong++;
                    }
                }
            }
        });

        Thread t2  = new Thread(new Runnable() {
            @Override
            public void run() {
                int size = arr2.length;
                for (int i = 0; i < size; i++) {
                    if(arr2[i].intValue() == 0) {
//                        atomicLong.getAndIncrement();
                        atomicLong++;
                    }
                }
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("兩個數組中0出現的次數為: " + atomicLong);//兩個數組中0出現的次數為: 7
    }
}

如果沒有使用原子類型進行計數運算,那麼可能就是下面的結果

  

以上就是淺析從同步原語看非阻塞同步以及Java中的應用的詳細內容,更多關於同步 非阻塞同步的資料請關註WalkonNet其它相關文章!