java wait()/notify() 實現生產者消費者模式詳解

java wait()/notify() 實現生產者消費者模式

java中的多線程會涉及到線程間通信,常見的線程通信方式,例如共享變量、管道流等,這裡我們要實現生產者消費者模式,也需要涉及到線程通信,不過這裡我們用到瞭java中的wait()、notify()方法:

wait():進入臨界區的線程在運行到一部分後,發現進行後面的任務所需的資源還沒有準備充分,所以調用wait()方法,讓線程阻塞,等待資源,同時釋放臨界區的鎖,此時線程的狀態也從RUNNABLE狀態變為WAITING狀態;

notify():準備資源的線程在準備好資源後,調用notify()方法通知需要使用資源的線程,同時釋放臨界區的鎖,將臨界區的鎖交給使用資源的線程。

wait()、notify()這兩個方法,都必須要在臨界區中調用,即是在synchronized同步塊中調用,不然會拋出IllegalMonitorStateException的異常。

實現源碼:

生產者線程類:

package threads; 
import java.util.List;
import java.util.UUID; 
public class Producer extends Thread{ 
 private List<String> storage;//生產者倉庫
 public Producer(List<String> storage) {
  this.storage = storage;
 }
 public void run(){
  //生產者每隔1s生產1~100消息
  long oldTime = System.currentTimeMillis();
  while(true){
   synchronized(storage){
    if (System.currentTimeMillis() - oldTime >= 1000) {
     oldTime = System.currentTimeMillis();
     int size = (int)(Math.random()*100) + 1;
     for (int i = 0; i < size; i++) {
      String msg = UUID.randomUUID().toString();
      storage.add(msg);
     }
     System.out.println("線程"+this.getName()+"生產消息"+size+"條");
     storage.notify();
    }
   }
  }
 }
}

消費者線程類:

package threads; 
import java.util.List; 
public class Consumer extends Thread{ 
 private List<String> storage;//倉庫
 public Consumer(List<String> storage) {
  this.storage = storage;
 }
 public void run(){
  while(true){
   synchronized(storage){
    //消費者去倉庫拿消息的時候,如果發現倉庫數據為空,則等待
    if (storage.isEmpty()) {
     try {
      storage.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    int size = storage.size();
    for (int i = size - 1; i >= 0; i--) {
     storage.remove(i);
    }
    System.out.println("線程"+this.getName()+"成功消費"+size+"條消息");
   }
  }
 }
}

倉庫類:

package threads; 
import java.util.ArrayList;
import java.util.List; 

public class Storage { 
 private List<String> storage;//生產者和消費者共享的倉庫
 public Storage() {
  storage = new ArrayList<String>();
 }
 public List<String> getStorage() {
  return storage;
 }
 public void setStorage(List<String> storage) {
  this.storage = storage;
 } 
}

main方法類:

package threads; 
public class App { 
 public static void main(String[] args) {
  Storage storage = new Storage();
  Producer producer = new Producer(storage.getStorage());
  Consumer consumer = new Consumer(storage.getStorage());
  producer.start();
  consumer.start();
 }
}

生產消費效果:

Wait/Notify通知機制解析

前言

我們知道,java的wait/notify的通知機制可以用來實現線程間通信。wait表示線程的等待,調用該方法會導致線程阻塞,直至另一線程調用notify或notifyAll方法才可另其繼續執行。經典的生產者、消費者模式即是使用wait/notify機制得以完成。在這篇文章中,我們將深入解析這一機制,瞭解其背後的原理。

線程的狀態

在瞭解wait/notify機制前,先熟悉一下java線程的幾個生命周期。分別為初始(NEW)、運行(RUNNABLE)、阻塞(BLOCKED)、等待(WAITING)、超時等待(TIMED_WAITING)、終止(TERMINATED)等狀態(位於java.lang.Thread.State枚舉類中)。

以下是對這幾個狀態的簡要說明,詳細說明見該類註釋。

狀態名稱 說明
NEW 初始狀態,線程被構建,但未調用start()方法
RUNNABLE 運行狀態,調用start()方法後。在java線程中,將操作系統線程的就緒和運行統稱運行狀態
BLOCKED 阻塞狀態,線程等待進入synchronized代碼塊或方法中,等待獲取鎖
WAITING 等待狀態,線程可調用wait、join等操作使自己陷入等待狀態,並等待其他線程做出特定操作(如notify或中斷)
TIMED_WAITING 超時等待,線程調用sleep(timeout)、wait(timeout)等操作進入超時等待狀態,超時後自行返回
TERMINATED 終止狀態,線程運行結束

對於以上線程間的狀態及轉化關系,我們需要知道

  • WAITING(等待狀態)和TIMED_WAITING(超時等待)都會令線程進入等待狀態,不同的是TIMED_WAITING會在超時後自行返回,而WAITING則需要等待至條件改變。
  • 進入阻塞狀態的唯一前提是在等待獲取同步鎖。java註釋說的很明白,隻有兩種情況可以使線程進入阻塞狀態:一是等待進入synchronized塊或方法,另一個是在調用wait()方法後重新進入synchronized塊或方法。下文會有詳細解釋。
  • Lock類對於鎖的實現不會令線程進入阻塞狀態,Lock底層調用LockSupport.park()方法,使線程進入的是等待狀態。

wait/notify用例

讓我們先通過一個示例解析

wait()方法可以使線程進入等待狀態,而notify()可以使等待的狀態喚醒。這樣的同步機制十分適合生產者、消費者模式:消費者消費某個資源,而生產者生產該資源。當該資源缺失時,消費者調用wait()方法進行自我阻塞,等待生產者的生產;生產者生產完畢後調用notify/notifyAll()喚醒消費者進行消費。

以下是代碼示例,其中flag標志表示資源的有無。

public class ThreadTest {
    static final Object obj = new Object();
    private static boolean flag = false;
    public static void main(String[] args) throws Exception {
        Thread consume = new Thread(new Consume(), "Consume");
        Thread produce = new Thread(new Produce(), "Produce");
        consume.start();
        Thread.sleep(1000);
        produce.start();
        try {
            produce.join();
            consume.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // 生產者線程
    static class Produce implements Runnable {
        @Override
        public void run() {
            synchronized (obj) {
                System.out.println("進入生產者線程");
                System.out.println("生產");
                try {
                    TimeUnit.MILLISECONDS.sleep(2000);  //模擬生產過程
                    flag = true;
                    obj.notify();  //通知消費者
                    TimeUnit.MILLISECONDS.sleep(1000);  //模擬其他耗時操作
                    System.out.println("退出生產者線程");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    //消費者線程
    static class Consume implements Runnable {
        @Override
        public void run() {
            synchronized (obj) {
                System.out.println("進入消費者線程");
                System.out.println("wait flag 1:" + flag);
                while (!flag) {  //判斷條件是否滿足,若不滿足則等待
                    try {
                        System.out.println("還沒生產,進入等待");
                        obj.wait();
                        System.out.println("結束等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("wait flag 2:" + flag);
                System.out.println("消費");
                System.out.println("退出消費者線程");
            }
        }
    }
}

輸出結果為:

進入消費者線程

wait flag 1:false

還沒生產,進入等待

進入生產者線程

生產

退出生產者線程

結束等待

wait flag 2:true

消費

退出消費者線程

理解瞭輸出結果的順序,也就明白瞭wait/notify的基本用法。有以下幾點需要知道:

  • 在示例中沒有體現但很重要的是,wait/notify方法的調用必須處在該對象的鎖(Monitor)中,也即,在調用這些方法時首先需要獲得該對象的鎖。否則會爬出IllegalMonitorStateException異常。
  • 從輸出結果來看,在生產者調用notify()後,消費者並沒有立即被喚醒,而是等到生產者退出同步塊後才喚醒執行。(這點其實也好理解,synchronized同步方法(塊)同一時刻隻允許一個線程在裡面,生產者不退出,消費者也進不去)
  • 註意,消費者被喚醒後是從wait()方法(被阻塞的地方)後面執行,而不是重新從同步塊開頭。

深入瞭解

這一節我們探討wait/notify與線程狀態之間的關系。深入瞭解線程的生命周期。

由前面線程的狀態轉化圖可知,當調用wait()方法後,線程會進入WAITING(等待狀態),後續被notify()後,並沒有立即被執行,而是進入等待獲取鎖的阻塞隊列。

對於每個對象來說,都有自己的等待隊列和阻塞隊列。以前面的生產者、消費者為例,我們拿obj對象作為對象鎖,配合圖示。內部流程如下

  • 當線程A(消費者)調用wait()方法後,線程A讓出鎖,自己進入等待狀態,同時加入鎖對象的等待隊列。
  • 線程B(生產者)獲取鎖後,調用notify方法通知鎖對象的等待隊列,使得線程A從等待隊列進入阻塞隊列。
  • 線程A進入阻塞隊列後,直至線程B釋放鎖後,線程A競爭得到鎖繼續從wait()方法後執行。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: