java wait()/notify() 實現生產者消費者模式詳解
java wait()/notify() 實現生產者消費者模式
java中的多線程會涉及到線程間通信,常見的線程通信方式,例如共享變量、管道流等,這裡我們要實現生產者消費者模式,也需要涉及到線程通信,不過這裡我們用到瞭java中的wait()、notify()方法:
wait()
:進入臨界區的線程在運行到一部分後,發現進行後面的任務所需的資源還沒有準備充分,所以調用wait()方法,讓線程阻塞,等待資源,同時釋放臨界區的鎖,此時線程的狀態也從RUNNABLE狀態變為WAITING狀態;
notify()
:準備資源的線程在準備好資源後,調用notify()方法通知需要使用資源的線程,同時釋放臨界區的鎖,將臨界區的鎖交給使用資源的線程。
wait()、notify()這兩個方法,都必須要在臨界區中調用,即是在synchronized同步塊中調用,不然會拋出IllegalMonitorStateException的異常。
實現源碼:
生產者線程類:
package threads; import java.util.List; import java.util.UUID; public class Producer extends Thread{ private List<String> storage;//生產者倉庫 public Producer(List<String> storage) { this.storage = storage; } public void run(){ //生產者每隔1s生產1~100消息 long oldTime = System.currentTimeMillis(); while(true){ synchronized(storage){ if (System.currentTimeMillis() - oldTime >= 1000) { oldTime = System.currentTimeMillis(); int size = (int)(Math.random()*100) + 1; for (int i = 0; i < size; i++) { String msg = UUID.randomUUID().toString(); storage.add(msg); } System.out.println("線程"+this.getName()+"生產消息"+size+"條"); storage.notify(); } } } } }
消費者線程類:
package threads; import java.util.List; public class Consumer extends Thread{ private List<String> storage;//倉庫 public Consumer(List<String> storage) { this.storage = storage; } public void run(){ while(true){ synchronized(storage){ //消費者去倉庫拿消息的時候,如果發現倉庫數據為空,則等待 if (storage.isEmpty()) { try { storage.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int size = storage.size(); for (int i = size - 1; i >= 0; i--) { storage.remove(i); } System.out.println("線程"+this.getName()+"成功消費"+size+"條消息"); } } } }
倉庫類:
package threads; import java.util.ArrayList; import java.util.List; public class Storage { private List<String> storage;//生產者和消費者共享的倉庫 public Storage() { storage = new ArrayList<String>(); } public List<String> getStorage() { return storage; } public void setStorage(List<String> storage) { this.storage = storage; } }
main方法類:
package threads; public class App { public static void main(String[] args) { Storage storage = new Storage(); Producer producer = new Producer(storage.getStorage()); Consumer consumer = new Consumer(storage.getStorage()); producer.start(); consumer.start(); } }
生產消費效果:
Wait/Notify通知機制解析
前言
我們知道,java的wait/notify的通知機制可以用來實現線程間通信。wait表示線程的等待,調用該方法會導致線程阻塞,直至另一線程調用notify或notifyAll方法才可另其繼續執行。經典的生產者、消費者模式即是使用wait/notify機制得以完成。在這篇文章中,我們將深入解析這一機制,瞭解其背後的原理。
線程的狀態
在瞭解wait/notify機制前,先熟悉一下java線程的幾個生命周期。分別為初始(NEW)、運行(RUNNABLE)、阻塞(BLOCKED)、等待(WAITING)、超時等待(TIMED_WAITING)、終止(TERMINATED)等狀態(位於java.lang.Thread.State枚舉類中)。
以下是對這幾個狀態的簡要說明,詳細說明見該類註釋。
狀態名稱 | 說明 |
---|---|
NEW | 初始狀態,線程被構建,但未調用start()方法 |
RUNNABLE | 運行狀態,調用start()方法後。在java線程中,將操作系統線程的就緒和運行統稱運行狀態 |
BLOCKED | 阻塞狀態,線程等待進入synchronized代碼塊或方法中,等待獲取鎖 |
WAITING | 等待狀態,線程可調用wait、join等操作使自己陷入等待狀態,並等待其他線程做出特定操作(如notify或中斷) |
TIMED_WAITING | 超時等待,線程調用sleep(timeout)、wait(timeout)等操作進入超時等待狀態,超時後自行返回 |
TERMINATED | 終止狀態,線程運行結束 |
對於以上線程間的狀態及轉化關系,我們需要知道
- WAITING(等待狀態)和TIMED_WAITING(超時等待)都會令線程進入等待狀態,不同的是TIMED_WAITING會在超時後自行返回,而WAITING則需要等待至條件改變。
- 進入阻塞狀態的唯一前提是在等待獲取同步鎖。java註釋說的很明白,隻有兩種情況可以使線程進入阻塞狀態:一是等待進入synchronized塊或方法,另一個是在調用wait()方法後重新進入synchronized塊或方法。下文會有詳細解釋。
- Lock類對於鎖的實現不會令線程進入阻塞狀態,Lock底層調用LockSupport.park()方法,使線程進入的是等待狀態。
wait/notify用例
讓我們先通過一個示例解析
wait()方法可以使線程進入等待狀態,而notify()可以使等待的狀態喚醒。這樣的同步機制十分適合生產者、消費者模式:消費者消費某個資源,而生產者生產該資源。當該資源缺失時,消費者調用wait()方法進行自我阻塞,等待生產者的生產;生產者生產完畢後調用notify/notifyAll()喚醒消費者進行消費。
以下是代碼示例,其中flag標志表示資源的有無。
public class ThreadTest { static final Object obj = new Object(); private static boolean flag = false; public static void main(String[] args) throws Exception { Thread consume = new Thread(new Consume(), "Consume"); Thread produce = new Thread(new Produce(), "Produce"); consume.start(); Thread.sleep(1000); produce.start(); try { produce.join(); consume.join(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產者線程 static class Produce implements Runnable { @Override public void run() { synchronized (obj) { System.out.println("進入生產者線程"); System.out.println("生產"); try { TimeUnit.MILLISECONDS.sleep(2000); //模擬生產過程 flag = true; obj.notify(); //通知消費者 TimeUnit.MILLISECONDS.sleep(1000); //模擬其他耗時操作 System.out.println("退出生產者線程"); } catch (InterruptedException e) { e.printStackTrace(); } } } } //消費者線程 static class Consume implements Runnable { @Override public void run() { synchronized (obj) { System.out.println("進入消費者線程"); System.out.println("wait flag 1:" + flag); while (!flag) { //判斷條件是否滿足,若不滿足則等待 try { System.out.println("還沒生產,進入等待"); obj.wait(); System.out.println("結束等待"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("wait flag 2:" + flag); System.out.println("消費"); System.out.println("退出消費者線程"); } } } }
輸出結果為:
進入消費者線程
wait flag 1:false
還沒生產,進入等待
進入生產者線程
生產
退出生產者線程
結束等待
wait flag 2:true
消費
退出消費者線程
理解瞭輸出結果的順序,也就明白瞭wait/notify的基本用法。有以下幾點需要知道:
- 在示例中沒有體現但很重要的是,wait/notify方法的調用必須處在該對象的鎖(Monitor)中,也即,在調用這些方法時首先需要獲得該對象的鎖。否則會爬出IllegalMonitorStateException異常。
- 從輸出結果來看,在生產者調用notify()後,消費者並沒有立即被喚醒,而是等到生產者退出同步塊後才喚醒執行。(這點其實也好理解,synchronized同步方法(塊)同一時刻隻允許一個線程在裡面,生產者不退出,消費者也進不去)
- 註意,消費者被喚醒後是從wait()方法(被阻塞的地方)後面執行,而不是重新從同步塊開頭。
深入瞭解
這一節我們探討wait/notify與線程狀態之間的關系。深入瞭解線程的生命周期。
由前面線程的狀態轉化圖可知,當調用wait()方法後,線程會進入WAITING(等待狀態),後續被notify()後,並沒有立即被執行,而是進入等待獲取鎖的阻塞隊列。
對於每個對象來說,都有自己的等待隊列和阻塞隊列。以前面的生產者、消費者為例,我們拿obj對象作為對象鎖,配合圖示。內部流程如下
- 當線程A(消費者)調用wait()方法後,線程A讓出鎖,自己進入等待狀態,同時加入鎖對象的等待隊列。
- 線程B(生產者)獲取鎖後,調用notify方法通知鎖對象的等待隊列,使得線程A從等待隊列進入阻塞隊列。
- 線程A進入阻塞隊列後,直至線程B釋放鎖後,線程A競爭得到鎖繼續從wait()方法後執行。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- 分析java並發中的wait notify notifyAll
- Java線程通信之wait-notify通信方式詳解
- Java並發編程多線程間的同步控制和通信詳解
- Java多線程Thread類的使用及註意事項
- 詳解Java多線程與並發