Java阻塞隊列的實現及應用
1.手寫生產者消費者模型
所謂生產者消費者模型,可以用我們生活中的例子來類比:我去一個小攤兒買吃的,老板把已經做好的小吃都放在擺盤上,供我挑選。那麼,老板就是生產者;我就是消費者;擺盤就是阻塞隊列,用來當做生產與消費的緩沖區。因此,阻塞隊列在生產者與消費者模型中起著至關重要的緩沖作用。
此次先演示如何手寫阻塞隊列(也可以使用Java庫中自帶的阻塞隊列)。
手寫的阻塞隊列隻實現最基礎的兩個功能:入隊和出隊。之所以叫阻塞隊列,是因為當隊空或者隊滿的時候,都要實現阻塞,直到隊中不空或不滿的時候,才會取消阻塞。
手寫阻塞隊列實現如下:
//阻塞隊列BlockQueue static class BlockQueue{ //該隊列用一個數組來實現,我們讓此隊列的最大容量為10 private int[] items = new int[10]; private int head = 0; private int tail = 0; private int size = 0; private Object locker =new Object(); //入隊 public void put(int item) throws InterruptedException { synchronized(locker) { while (size == items.length) { //入隊時,若隊滿,阻塞 locker.wait(); } items[tail++] = item; //如果到達末尾,重回隊首(實現循環隊列) if (tail >= items.length) { tail = 0; } size++; locker.notify(); } } //出隊 public int back() throws InterruptedException { int ret = 0; synchronized (locker) { while (size == 0) { //出隊時,若隊空,阻塞 locker.wait(); } ret = items[head++]; if (head >= items.length) { head = 0; } size--; locker.notify(); } return ret; } }
用兩個線程充當生產者與消費者:
public static void main(String[] args) throws InterruptedException { BlockQueue blockQueue = new BlockQueue(); //生產者線程 Thread produce = new Thread(){ @Override public void run() { for(int i = 0;i<10000;++i){ try { System.out.println("生產瞭:"+i); blockQueue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }; produce.start(); //消費者線程 Thread customer = new Thread(){ @Override public void run() { while (true) { try { int res = blockQueue.back(); System.out.println("消費瞭:" + res); //每次消費後等1秒,也就是生產的快,消費的慢 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; customer.start(); customer.join(); produce.join(); }
結果如下:可以看到,生產者線程先生產元素,(阻塞隊列容量為10),當隊列滿時,隊列阻塞,消費者線程消費元素,因為消費的慢,所以接下來生產者線程由於阻塞隊列不能快速生產,隻能等待消費者線程消費隊列中的元素,生產者線程才能隨著生產,這就是阻塞隊列的緩沖作用。
2.手寫定時器
先看一下Java包中的定時器。
下面的代碼我們通過調用timer類中的schedule方法來實現定時器功能。schedule方法有兩個參數,第一個參數:要執行的任務,第二個參數:時間。
下面的代碼中,schedule方法中的第一個任務參數:我們創建瞭一個TimerTask實例;重寫裡面的run方法來打印”觸發定時器”這句話。第二個參數:3000;表示3秒後執行這個任務。
import java.util.Timer; import java.util.TimerTask; public class Test{ public static void main(String[] args) { Timer timer = new Timer(); System.out.println("代碼開始執行"); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("觸發定時器"); } },3000); } }
結果如下:
從上面就可以看出來我們手寫定時器需要實現以下兩個方面:
1.一個Task類,用來描述要實現的任務
2.一個Timer類,類中再實現一個schedule方法
Task類實現
//Task類用來描述任務,它繼承Comparable接口是因為要將任務放到優先級阻塞隊列中 static class Task implements Comparable<Task>{ //command表示這個任務是什麼 private Runnable command; //time是一個時間戳 private long time; public Task(Runnable command,long time){ this.command = command; this.time = System.currentTimeMillis()+time; } public void run(){ command.run(); } //因為要將Task任務放到優先級阻塞隊列中,所以要重寫compareTo方法,我們將時間短的任務放到隊頭 @Override public int compareTo(Task o) { return (int)(this.time - o.time); } }
Timer類實現
//Timer類中需要有一個定時器,還需要有一個schedule方法 static class Timer{ //使用優先級阻塞隊列來放這些任務,這樣才能把最接近時鐘的任務放到隊頭,我們每次掃描隊頭任務就行瞭 private PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(); //locker用來解決忙等問題 private Object locker = new Object(); //構造方法中完成定時器功能 public Timer(){ //需要構造一個線程,來不斷地掃描隊頭,來判斷隊頭任務是否到點,也就是是否該開始執行瞭 Thread t = new Thread(){ @Override public void run() { while(true){ //取出隊首任務來判斷是否到時間瞭 try { Task task = queue.take(); long current = System.currentTimeMillis(); //當前時間戳小於時鐘時間戳,表明時間還沒到,那就等待 if (current < task.time){ queue.put(task); synchronized (locker){ locker.wait(task.time-current); } }else{ //否則時間到,開始執行任務 task.run(); } } catch (InterruptedException e) { e.printStackTrace(); break; } } } }; t.start(); } //schedule方法的兩個參數,command為任務,delay為一個時間差例如:3000(單位為毫秒) public void schedule(Runnable command,long delay){ Task task = new Task(command,delay); queue.put(task); synchronized (locker){ locker.notify(); } } }
主線程
public static void main(String[] args) { System.out.println("程序啟動"); Timer timer = new Timer(); timer.schedule(new Runnable() { @Override public void run() { System.out.println("觸發定時器"); } },3000);//3000表示定時時間為3秒 }
結果如下:“程序啟動” 在程序啟動是立刻顯示出來;“觸發定時器”在3秒後顯示出來。
總結
本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!