Java並發編程多線程間的同步控制和通信詳解

正文

使用多線程並發處理,目的是為瞭讓程序更充分地利用CPU ,好能加快程序的處理速度和用戶體驗。如果每個線程各自處理的部分互不相幹,那真是極好的,我們在程序主線程要做的同步控制最多也就是等待幾個工作線程的執行完畢,如果不 Care 結果的話,連同步等待都能省去,主線程撒開手讓這些線程幹就行瞭。

不過,現實還是很殘酷的,大部分情況下,多個線程是會有競爭操作同一個對象的情況的,這個時候就會導致並發常見的一個問題–數據競爭(Data Racing)。

這篇文章我們就來討論一下這個並發導致的問題,以及多線程間進行同步控制和通信的知識,本文大綱如下:

並發導致的Data Racing問題

怎麼理解這個問題呢,拿一個多個線程同時對累加器對象進行累加的例子來解釋吧。

package com.learnthread;
public class DataRacingTest {
    public static void main(String[] args) throws InterruptedException {
        final DataRacingTest test = new DataRacingTest();
        // 創建兩個線程,執行 add100000() 操作
        // 創建Thread 實例時的 Runnable 接口實現,這裡直接使用瞭 Lambda
        Thread th1 = new Thread(()-> test.add100000());
        Thread th2 = new Thread(()-> test.add100000());
        // 啟動兩個線程
        th1.start();
        th2.start();
        // 等待兩個線程執行結束
        th1.join();
        th2.join();
        System.out.println(test.count);
    }
    private long count = 0;
    // 想復現 Data Racing,去掉這裡的 synchronized
    private  void add100000() {
        int idx = 0;
        while(idx++ < 100000) {
            count += 1;
        }
    }
}

上面這個例程,如果我們不啟動 th2 線程,隻用 th1 一個線程進行累加操作的話結果是 100000。按照這個思維,如果我們啟動兩個線程那麼最後累加的結果就應該是 200000。 但實際上並不是,我們運行一下上面的例程,得到的結果是:

168404
Process finished with exit code 0

當然這個在每個人的機器上的結果是不一樣的,而且也是有可能恰好等於 200000,需要多運行幾次,或者是多開幾個線程執行累加,出現 Data Racing 的幾率才高。

程序出現 Data Racing 的現象,就意味著最終拿到的數據是不正確的。那麼為瞭避免這個問題就需要通過加鎖來解決瞭,讓同一時間隻有持有鎖的線程才能對數據對象進行操作。當然針對簡單的運算、賦值等操作我們也能直接使用原子操作實現無鎖解決 Data Racing, 我們為瞭示例足夠簡單易懂才舉瞭一個累加的例子,實際上如果是一段業務邏輯操作的話,就隻能使用加鎖來保證不會出現 Data Racing瞭。

加鎖,隻是線程並發同步控制的一種,還有釋放鎖、喚醒線程、同步等待線程執行完畢等操作,下面我們會逐一進行學習。

同步控制–synchronized

開頭的那個例程,如果想避免 Data Racing,那麼就需要加上同步鎖,讓同一個時間隻能有一個線程操作數據對象。 針對我們的例程,我們隻需要在 add100000 方法的聲明中加上 synchronized 即可。

    // 想復現 Data Racing,去掉這裡的 synchronized
    private synchronized void add100000() {
        int idx = 0;
        while(idx++ < 100000) {
            count += 1;
        }
    }

是不是很簡單,當然 synchronized 的用法遠不止這個,它可以加在實例方法、靜態方法、代碼塊上,如果使用的不對,就不能正確地給需要同步鎖保護的對象加上鎖。

synchronized 是 Java 中的關鍵字,是利用鎖的機制來實現互斥同步的。 synchronized 可以保證在同一個時刻,隻有一個線程可以執行某個方法或者某個代碼塊。 如果不需要 Lock 、讀寫鎖ReadWriteLock 所提供的高級同步特性,應該優先考慮使用synchronized 這種方式加鎖,主要原因如下:

  • Java 自 1.6 版本以後,對 synchronized 做瞭大量的優化,其性能已經與 JUC 包中的 LockReadWriteLock 基本上持平。從趨勢來看,Java 未來仍將繼續優化 synchronized ,而不是 ReentrantLock 。
  • ReentrantLock 是 Oracle JDK 的 API,在其他版本的 JDK 中不一定支持;而 synchronized 是 JVM 的內置特性,所有 JDK 版本都提供支持。

synchronized 可以應用在實例方法、靜態方法和代碼塊上:

  • synchronized 關鍵字修飾實例方法,即為同步實例方法,鎖是當前的實例對象。
  • synchronized 關鍵字修飾類的靜態方法,即為同步靜態方法,鎖是當前的類的 Class 對象。
  • 如果把 synchronized 應用在代碼塊上,鎖是 synchronized 括號裡配置的對象,synchronized(this) {..} 鎖就是代碼塊所在實例的對象,synchronized(類名.class) {...} ,鎖就是類的 Class 對象。

同步實例方法和代碼塊

上面我們已經看過怎麼給實例方法加 synchronized 讓它變成同步方法瞭。下面我們看一下,synchronized 給實例方法加鎖時,不能保證資源被同步鎖保護的例子。

class Account {
  private int balance;
  // 轉賬
  synchronized void transfer(Account target, int amt){
    if (this.balance > amt) {
      this.balance -= amt;
      target.balance += amt;
    }
  }
}

在這段代碼中,臨界區內有兩個資源,分別是轉出賬戶的餘額 this.balance 和轉入賬戶的餘額 target.balance,並且用的是一把實例對象的鎖。問題就出在 this 這把鎖上,this 這把鎖可以保護自己的餘額 this.balance,卻保護不瞭別人的餘額 target.balance,就像你不能用自傢的鎖來保護別人傢的資產一個道理。

應該保證使用的鎖能保護所有應受保護資源。我們可以使用Account.class 作為加鎖的對象Account.class 是所有 Account 類的對象共享的,而且是 Java 虛擬機在加載 Account 類的時候創建的,保證瞭它的全局唯一性。

class Account {
  private int balance;
  // 轉賬
  void transfer(Account target, int amt){
    synchronized(Account.class) {
      if (this.balance > amt) {
        this.balance -= amt;
        target.balance += amt;
      }
    }
  }
}

用 synchronized 給 Account.class 加鎖,這樣就保證出賬、入賬兩個 Account 對象在同步代碼塊裡都能收到保護。

當然我們也可以使用這筆轉賬的交易對象作為加鎖的對象,保證隻有這比交易的兩個 Account 對象受保護,這樣就不會影響到其他轉賬交易裡的出賬、入賬 Account 對象瞭。

class Account {
  private Trans trans;
  private int balance;
  private Account();
  // 創建 Account 時傳入同一個 交易對象作為 lock 對象
  public Account(Trans trans) {
    this.trans = trans;
  }
  // 轉賬
  void transfer(Account target, int amt){
    // 此處檢查所有對象共享的鎖
    synchronized(trans) {
      if (this.balance > amt) {
        this.balance -= amt;
        target.balance += amt;
      }
    }
  }
}

通過解決上面這個問題我們順道就把 synchronized 修飾同步代碼塊的知識點學瞭, 現在我們來看 synchronized 的最後一個用法–修飾同步靜態方法。

同步靜態方法

靜態方法的同步是指,用 synchronized 修飾的靜態方法,與使用所在類的 Class 對象實現的同步代碼塊,效果類似。因為在 JVM 中一個類隻能對應一個類的 Class 對象,所以同時隻允許一個線程執行同一個類中的靜態同步方法。

對於同一個類中的多個靜態同步方法,持有鎖的線程可以執行每個類中的靜態同步方法而無需等待。不管類中的哪個靜態同步方法被調用,一個類隻能由一個線程同時執行。

package com.learnthread;
public class SynchronizedStatic implements Runnable {
    private static final int MAX = 100000;
    private static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        SynchronizedStatic instance = new SynchronizedStatic();
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        // 等待工作線程執行結束
        t1.join();
        t2.join();
        System.out.println(count);
    }
    @Override
    public void run() {
        for (int i = 0; i < MAX; i++) {
            increase();
        }
    }
    /**
     * synchronized 修飾靜態方法
     */
    public synchronized static void increase() {
        count++;
    }
}

線程掛起和喚醒

上面我們看瞭使用 synchronized 給對象加同步鎖,讓同一時間隻有一個線程能操作臨界區的控制。接下來,我們看一下線程的掛起和喚醒,這兩個操作使用被線程成功加鎖的對象的 waitnotify 方法來完成,喚醒除瞭notify 外還有 notifyAll方法用來喚醒所有線程。下面我們先看一下這幾個方法的解釋。

  • waitwait 會自動釋放當前線程占有的對象鎖,並請求操作系統掛起當前線程,讓線程從 Running 狀態轉入 Waiting 狀態,等待被 notify / notifyAll 來喚醒。如果沒有釋放鎖,那麼其它線程就無法進入對象的同步方法或者同步代碼塊中,那麼就無法執行 notify 或者 notifyAll 來喚醒掛起的線程,會造成死鎖。
  • notify – 喚醒一個正在 Waiting 狀態的線程,並讓它拿到對象鎖,具體喚醒哪一個線程由 JVM 控制 。
  • notifyAll – 喚醒所有正在 Waiting 狀態的線程,接下來它們需要競爭對象鎖。

這裡有兩點需要各位註意的地方, 第一個是 waitnotifynotifyAll 都是 Object 類中的方法,而不是 Thread 類的。

因為 Object 是始祖類,是不是意味著所有類的對象都能調用這幾個方法呢?是,也不是… 因為 wait、notify、notifyAll 隻能用在 synchronized 方法或者 synchronized 代碼塊中使用,否則會在運行時拋出 IllegalMonitorStateException。換句話說,隻有被 synchronized 加上鎖的對象,才能調用這三個方法。

為什麼 waitnotifynotifyAll 不定義在 Thread 類中?為什麼 waitnotifynotifyAll 要配合 synchronized 使用? 理解為什麼這麼設計,需要瞭解幾個基本知識點:

  • 每一個 Java 對象都有一個與之對應的監視器(monitor)
  • 每一個監視器裡面都有一個 對象鎖 、一個 等待隊列、一個 同步隊列

瞭解瞭以上概念,我們回過頭來理解前面兩個問題。

為什麼這幾個方法不定義在 Thread 中?

  • 由於每個對象都擁有對象鎖,讓當前線程等待某個對象鎖,自然應該基於這個對象(Object)來操作,而非使用當前線程(Thread)來操作。因為當前線程可能會等待多個線程釋放鎖,如果基於線程(Thread)來操作,就非常復雜瞭。

為什麼 wait、notify、notifyAll 要配合 synchronized 使用?

  • 如果調用某個對象的 wait 方法,當前線程必須擁有這個對象的對象鎖,因此調用 wait 方法必須在 synchronized 方法和 synchronized 代碼塊中。

下面看一個 wait、notify、notifyAll 的一個經典使用案例,實現一個生產者、消費者模式:

package com.learnthread;
import java.util.PriorityQueue;
public class ThreadWaitNotifyDemo {
    private static final int QUEUE_SIZE = 10;
    private static final PriorityQueue&lt;Integer&gt; queue = new PriorityQueue&lt;&gt;(QUEUE_SIZE);
    public static void main(String[] args) {
        new Producer("生產者A").start();
        new Producer("生產者B").start();
        new Consumer("消費者A").start();
        new Consumer("消費者B").start();
    }
    static class Consumer extends Thread {
        Consumer(String name) {
            super(name);
        }
        @Override
        public void run() {
            while (true) {
                synchronized (queue) {
                    while (queue.size() == 0) {
                        try {
                            System.out.println("隊列空,等待數據");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notifyAll();
                        }
                    }
                    queue.poll(); // 每次移走隊首元素
                    queue.notifyAll();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " 從隊列取走一個元素,隊列當前有:" + queue.size() + "個元素");
                }
            }
        }
    }
    static class Producer extends Thread {
        Producer(String name) {
            super(name);
        }
        @Override
        public void run() {
            while (true) {
                synchronized (queue) {
                    while (queue.size() == QUEUE_SIZE) {
                        try {
                            System.out.println("隊列滿,等待有空餘空間");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notifyAll();
                        }
                    }
                    queue.offer(1); // 每次插入一個元素
                    queue.notifyAll();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " 向隊列取中插入一個元素,隊列當前有:" + queue.size() + "個元素");
                }
            }
        }
    }
}

上面的例程有兩個生產者和兩個消費者。生產者向隊列中放數據,每次向隊列中放入數據後使用 notifyAll 喚醒消費者線程,當隊列滿後生產者會 wait 讓出線程,等待消費者取走數據後再被喚醒 (消費者取數據後也會調用 notifyAll )。同理消費者在隊列空後也會使用 wait 讓出線程,等待生產者向隊列中放入數據後被喚醒。

線程等待–join

waitnotify 方法一樣,join 是另一種線程間同步機制。當我們調用線程對象 join 方法時,調用線程會進入等待狀態,它會一直處於等待狀態,直到被引用的線程執行結束。在上面的幾個例子中,我們已經使用過瞭 join 方法

   ...
	public static void main(String[] args) throws InterruptedException {
        SynchronizedStatic instance = new SynchronizedStatic();
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        // 等待工作線程執行結束
        t1.join();
        t2.join();
        System.out.println(count);
    }

這個例子裡,主線程調用 t1 和 t2 的 join 方法後,就會一直等待,直到他們兩個執行結束。如果 t1 或者 t2 線程處理時間過長,調用它們 join 方法的主線程將一直等待,程序阻塞住。為瞭避免這些情況,可以使用能指定超時時間的重載版本的 join 方法。

    t2.join(1000); // 最長等待1s

如果引用的線程被中斷,join方法也會返回。在這種情況下,還會觸發 InterruptedException。所以上面的main方法為瞭演示方便,直接選擇拋出瞭 InterruptedException

總結

同步控制的一大思路就是加鎖,除瞭本問學習到的 sychronized 同步控制,Java 裡還有 JUC 的可重入鎖、讀寫鎖這種加鎖方式,這個我們後續介紹 JUC 的時候會給大傢講解。

另外一種思路是不加鎖,讓線程和線程之間盡量不要使用共享數據,ThreadLocal 就是這種思路,下篇我們介紹 Java 的線程本地存儲 — ThreadLocal,更多關於Java 並發多線程同步控制通信的資料請關註WalkonNet其它相關文章!

推薦閱讀: