解析Java8 Stream原理

一、前言

首先我們先看一個使用Stream API的示例,具體代碼如下:

這是個很簡單的一個Stream使用例子,我們過濾掉空字符串後,轉成int類型並計算出最大值,這其中包括瞭三個操作:filter、mapToInt、sum。相信大多數人再剛使用Stream API的時候都會有個疑問,Stream是指怎麼實現的,是每一次函數調用就執行一次迭代嗎?答案肯定是否,因為如果真的是每一次函數調用就執行一次迭代,這個效率是很難接受的,Stream也不會那麼受歡迎。

其實Stream內部是通過流水線(Pipeline)的方式來實現的,基本思想是在迭代的時候順著流水線盡可能的執行更多的操作,從而避免多次迭代。為瞭對Stream的操作有更清晰的認識,我們匯總瞭Stream的所有操作。

從上表可以看出Stream將所有操作分為兩類:中間操作和終止操作。其中中間操作分為無狀態和有狀態,終止操作分為非短路操作和短路操作,下面是針對這幾個操作的含義說明:

1、中間操作:中間操作隻是一種標記,隻有結束操作才會觸發實際計算

  • 無狀態:指元素的處理不受前面元素的影響;
  • 有狀態:有狀態的中間操作必須等到所有元素處理之後才知道最終結果,比如排序是有狀態操作,在讀取所有元素之前並不能確定排序結果。

2、終止操作:顧名思義,就是得出最後計算結果的操作

  • 短路操作:指不用處理全部元素就可以返回結果;
  • 非短路操作:指必須處理所有元素才能得到最終結果。

二、Stream流水線解決方案

通過上面的介紹,我們瞭解到Stream在執行中間操作時僅僅是記錄,當用戶調用終止操作時,會在一個迭代裡將已經記錄的操作順著流水線全部執行掉。沿著這個思路,有幾個問題需要解決:

  • 用戶的操作如何記錄?
  • 操作如何疊加?
  • 疊加之後的操作如何執行?

2.1、操作如何記錄

圖1-1

關於操作如何記錄,在JDK源碼註釋中多次用(操作)stage來標識用戶的每一次操作,而通常情況下Stream的操作又需要一個回調函數,所以一個完整的操作是由數據來源、操作、回調函數組成的三元組來表示。而在具體實現中,使用實例化的ReferencePipeline來表示,即圖1-1中的Head、StatelessOp、StatefulOp的實例。接下來我們來看下Stream幾個常用方法的源碼。

code2 Collection.Stream()

code3StreamSupport.stream()

code4 ReferencePipeline.map()

從上面源碼中可以看出來,我們調用stream()方法時最終會創建一個Head實例來表示流操作的頭,當調用map()方法時則會創建無狀態的中間操作實例StatelessOp,同樣調用其他操作對應的方法也會生成一個ReferencePipeline實例,在這裡就不一一列舉。在用戶調用一系列操作後,最終會形成一個雙向鏈表,如下圖所示:

圖1-2

2.2、操作如何疊加

上面我們說明瞭Stream是通過stage記錄操作,但stage隻保存當前操作,它並不知道下個stage如何操作,需要什麼操作。所以要執行的話還需要某種協議將各個stage關聯起來。jdk中就是使用Slink接口來實現的,Slink接口定義begin()、end()、cancellationRequested()、accept()四個方法,如下表所示。

往回看code3 ReferencePipeline.map()的方法,我們會發現我們在創建一個ReferencePipeline實例的時候,需要重寫opWrapSink方法來生成對應Sink實例。而且通過閱讀源碼會發現常用的操作都會創建一個ChainedReference實例。我們可以看下code5 ChainedReference抽象類的源碼實現,因為ChainedReference隻是個抽象實現,不攜帶具體操作的特性,所以是更能體現作者的設計理念。

通過查看源碼可以發現ChainedReference會持有下一個操作的Slink,並在調用begin、end、cancellationRequested方法會調用下一個操作的Slink的相應方法,以此來達到疊加的效果。

code5ChainedReference

2.3、疊加之後的操作如何執行

Sink完美封裝瞭Stream每一步操作,並給出瞭[處理->轉發]的模式來疊加操作。這一連串的齒輪已經咬合,就差最後一步撥動齒輪啟動執行。是什麼啟動這一連串的操作呢?也許你已經想到瞭啟動的原始動力就是結束操作(Terminal Operation),一旦調用某個結束操作,就會觸發整個流水線的執行。

結束操作之後不能再有別的操作,所以結束操作不會創建新的流水線階段(Stage),直觀的說就是流水線的鏈表不會在往後延伸瞭。結束操作會創建一個包裝瞭自己操作的Sink,這也是流水線中最後一個Sink,這個Sink隻需要處理數據而不需要將結果傳遞給下遊的Sink(因為沒有下遊)。對於Sink的[處理->轉發]模型,結束操作的Sink就是調用鏈的出口。

我們再來考察一下上遊的Sink是如何找到下遊Sink的。一種可選的方案是在PipelineHelper中設置一個Sink字段,在流水線中找到下遊Stage並訪問Sink字段即可。但Stream類庫的設計者沒有這麼做,而是設置瞭一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到Sink,該方法的作用是返回一個新的包含瞭當前Stage代表的操作以及能夠將結果傳遞給downstream的Sink對象。為什麼要產生一個新對象而不是返回一個Sink字段?這是因為使用opWrapSink()可以將當前操作與下遊Sink(上文中的downstream參數)結合成新Sink。試想隻要從流水線的最後一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,因為stage0代表數據源,不包含操作),就可以得到一個代表瞭流水線上所有操作的Sink,用代碼表示就是這樣:

code6AbstractPipeline.wrapSink

現在流水線上從開始到結束的所有的操作都被包裝到瞭一個Sink裡,執行這個Sink就相當於執行整個流水線,執行Sink的代碼如下:

code7AbstractPipeline.copyInto

上述代碼首先調用wrappedSink.begin()方法告訴Sink數據即將到來,然後調用spliterator.forEachRemaining()方法對數據進行迭代,最後調用wrappedSink.end()方法通知Sink數據處理結束。邏輯如此清晰。

以上就是解析Java8 Stream原理的詳細內容,更多關於Java8 Stream原理的資料請關註WalkonNet其它相關文章!

推薦閱讀: