Java 8 Stream流強大的原理

前言:

Stream 使用一種類似用 SQL 語句從數據庫查詢數據的直觀方式來提供一種對 Java 集合運算和表達的高階抽象。

Stream API可以極大提高Java程序員的生產力,讓程序員寫出高效率、幹凈、簡潔的代碼。

1、Stream的組成與特點

Stream(流)是一個來自數據源的元素隊列並支持聚合操作:

  • 元素是特定類型的對象,形成一個隊列。Java中的Stream並_不會_向集合那樣存儲和管理元素,而是按需計算
  • 數據源流的來源可以是集合Collection、數組ArrayI/O channel, 產生器generator
  • 聚合操作類似SQL語句一樣的操作, 比如filter, map, reduce, find, match, sorted

和以前的Collection操作不同, Stream操作還有兩個基礎的特征:

  • Pipelining: 中間操作都會返回流對象本身。這樣多個操作可以串聯成一個管道, 如同流式風格(fluent style)。這樣做可以對操作進行優化, 比如延遲執行(laziness evaluation)和短路( short-circuiting)
  • 內部迭代:以前對集合遍歷都是通過Iterator或者For-Each的方式, 顯式的在集合外部進行迭代, 這叫做外部迭代。Stream提供瞭內部迭代的方式, 通過訪問者模式 (Visitor)實現。

和迭代器又不同的是,Stream 可以並行化操作,迭代器隻能命令式地、串行化操作。顧名思義,當使用串行方式去遍歷時,每個 item 讀完後再讀下一個 item。而使用並行去遍歷時,數據會被分成多個段,其中每一個都在不同的線程中處理,然後將結果一起輸出。

Stream 的並行操作依賴於 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。

Java 的並行 API 演變歷程基本如下:

1.0-1.4 中的 java.lang.Thread

5.0 中的 java.util.concurrent

6.0 中的 Phasers 等

7.0 中的 Fork/Join 框架

8.0 中的 Lambda

Stream具有平行處理能力,處理的過程會分而治之,也就是將一個大任務切分成多個小任務,這表示每個任務都是一個操作:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream()       .forEach(out::println);


可以看到一行簡單的代碼就幫我們實現瞭並行輸出集合中元素的功能,但是由於並行執行的順序是不可控的所以每次執行的結果不一定相同。

如果非得相同可以使用forEachOrdered方法執行終止操作:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream()       .forEachOrdered(out::println);


這裡有一個疑問,如果結果需要有序,是否和我們的並行執行的初衷相悖?是的,這個場景下明顯無需使用並行流,直接用串行流執行即可, 否則性能可能更差,因為最後又強行將所有並行結果進行瞭排序。

OK,下面我們先介紹一下Stream接口的相關知識。

2、BaseStream接口

Stream的父接口是BaseStream,後者是所有流實現的頂層接口,定義如下:

public interface BaseStream<T, S extends BaseStream<T, S>>        extends AutoCloseable {    Iterator<T> iterator();    Spliterator<T> spliterator();    boolean isParallel();    S sequential();    S parallel();    S unordered();    S onClose(Runnable closeHandler);    void close();}


其中,T為流中元素的類型,S為一個BaseStream的實現類,它裡面的元素也是T並且S同樣是自己:

S extends BaseStream<T, S>


是不是有點暈?

其實很好理解,我們看一下接口中對S的使用就知道瞭:如sequential()、parallel()這兩個方法,它們都返回瞭S實例,也就是說它們分別支持對當前流進行串行或者並行的操作,並返回「改變」後的流對象。

如果是並行一定涉及到對當前流的拆分,即將一個流拆分成多個子流,子流肯定和父流的類型是一致的。子流可以繼續拆分子流,一直拆分下去…

也就是說這裡的S是BaseStream的一個實現類,它同樣是一個流,比如StreamIntStreamLongStream等。

3、Stream接口

再來看一下Stream的接口聲明:

public interface Stream<T> extends BaseStream<T, Stream<T>>


參考上面的解釋這裡不難理解:即Stream可以繼續拆分為Stream,我們可以通過它的一些方法來證實:

Stream<T> filter(Predicate<? super T> predicate);<R> Stream<R> map(Function<? super T, ? extends R> mapper);<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);Stream<T> sorted();Stream<T> peek(Consumer<? super T> action);Stream<T> limit(long maxSize);Stream<T> skip(long n);...


這些都是操作流的中間操作,它們的返回結果必須是流對象本身。

4、關閉流操作

BaseStream 實現瞭 AutoCloseable 接口,也就是 close() 方法會在流關閉時被調用。同時,BaseStream 中還給我們提供瞭onClose()方法:

S onClose(Runnable closeHandler);


AutoCloseableclose()接口被調用的時候會觸發調用流對象的onClose()方法,

但有幾點需要註意:

  • onClose() 方法會返回流對象本身,也就是說可以對改對象進行多次調用
  • 如果調用瞭多個onClose() 方法,它會按照調用的順序觸發,但是如果某個方法有異常則隻會向上拋出第一個異常
  • 前一個 onClose() 方法拋出瞭異常不會影響後續 onClose() 方法的使用
  • 如果多個 onClose() 方法都拋出異常,隻展示第一個異常的堆棧,而其他異常會被壓縮,隻展示部分信息

5、並行流和串行流

BaseStream接口中分別提供瞭並行流和串行流兩個方法,這兩個方法可以任意調用若幹次,也可以混合調用,但最終隻會以最後一次方法調用的返回結果為準。

參考parallel()方法的說明:

Returns an equivalent stream that is parallel. May return

itself, either because the stream was already parallel, or because

the underlying stream state was modified to be parallel.

所以多次調用同樣的方法並不會生成新的流,而是直接復用當前的流對象。

下面的例子裡以最後一次調用parallel()為準,最終是並行地計算sum:

stream.parallel()   .filter(...)   .sequential()   .map(...)   .parallel()   .sum();


6、ParallelStream背後的男人:ForkJoinPool

ForkJoin框架是從JDK7中新特性,它同ThreadPoolExecutor一樣,也實現瞭Executor和ExecutorService 接口。它使用瞭一個「無限隊列」來保存需要執行的任務,而線程的數量則是通過構造函數傳入, 如果沒有向構造函數中傳入希望的線程數量,那麼當前計算機可用的CPU數量會被設置為線程數量作為默認值。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm) 來解決問題,典型的應用比如_快速排序算法_。這裡的要點在於,ForkJoinPool需要使用相對少的線程來處理大量的任務。

比如要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500 萬的排序任務和一個針對這兩組500萬數據的合並任務。

以此類推,對於500萬的數據也會做出同樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概2000000+個。

問題的關鍵在於,對於一個任務而言,隻有當它所有的子任務完成之後,它才能夠被執行,想象一下歸並排序的過程。

所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法向 任務隊列中再添加一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的線程創建新的任務,並掛起當前的任務,此時線程就能夠從隊列中選擇子任務執行。

那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼性能的差異呢?

首先,使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有「父子關系」的任務,比如使用4個線程來完成超過200萬個任務。使用ThreadPoolExecutor 時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關系的任務時,也需要200萬個線程,顯然這是不可行的。

Work Stealing原理:

  • 每個工作線程都有自己的工作隊列WorkQueue
  • 這是一個雙端隊列dequeue,它是線程私有的;
  • ForkJoinTask中fork的子任務,將放入運行該任務的工作線程的隊頭,工作線程將以LIFO的順序來處理工作隊列中的任務,即堆棧的方式;
  • 為瞭最大化地利用CPU,空閑的線程將從其它線程的隊列中「竊取」任務來執行
  • 但是是從工作隊列的尾部竊取任務,以減少和隊列所屬線程之間的競爭;
  • 雙端隊列的操作:push()/pop()僅在其所有者工作線程中調用,poll()是由其它線程竊取任務時調用的;
  • 當隻剩下最後一個任務時,還是會存在競爭,是通過CAS來實現的;

7、用ForkJoinPool的眼光來看ParallelStream

Java 8為ForkJoinPool添加瞭一個通用線程池,這個線程池用來處理那些沒有被顯式提交到任何線程池的任務。它是ForkJoinPool類型上的一個靜態元素,它擁有的默認線程數量等於運行計算機上的CPU數量。

當調用Arrays 類上添加的新方法時,自動並行化就會發生。

比如用來排序一個數組的並行快速排序,用來對一個數組中的元素進行並行遍歷。自動並行化也被運用在Java 8新添加的Stream API中。

比如下面的代碼用來遍歷列表中的元素並執行需要的操作:

List<UserInfo> userInfoList =        DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);


對於列表中的元素的操作都會以並行的方式執行。forEach方法會為每個元素的計算操作創建一個任務,該任務會被前文中提到的ForkJoinPool中的commonPool處理。

以上的並行計算邏輯當然也可以使用ThreadPoolExecutor完成,但是就代碼的可讀性和代碼量而言,使用ForkJoinPool明顯更勝一籌。

對於ForkJoinPool通用線程池的線程數量,通常使用默認值就可以瞭,即運行時計算機的處理器數量。也可以通過設置系統屬性: -Djava.util.concurrent .ForkJoinPool.common.parallelism=N (N為線程數量),來調整ForkJoinPool的線程數量。

值得註意的是:當前執行的線程也會被用來執行任務,所以最終的線程個數為N+1,1就是當前的主線程。

這裡就有一個問題,如果你在並行流的執行計算使用瞭_阻塞操作_,如I/O,那麼很可能會導致一些問題:

public static String query(String question) {  List<String> engines = new ArrayList<String>();  engines.add("http://www.google.com/?q=");  engines.add("http://duckduckgo.com/?q=");  engines.add("http://www.bing.com/search?q=");  // get element as soon as it is available  Optional<String> result = engines.stream().parallel().map((base) - {    String url = base + question;    // open connection and fetch the result    return WS.url(url).get();  }).findAny();  return result.get();}


這個例子很典型,讓我們來分析一下:

這個並行流計算操作將由主線程和JVM默認的ForkJoinPool.commonPool()來共同執行。

map中是一個阻塞方法,需要通過訪問HTTP接口並得到它的response,所以任何一個worker線程在執行到這裡的時候都會阻塞並等待結果。

所以當此時再其他地方通過並行流方式調用計算方法的時候,將會受到此處阻塞等待的方法的影響。

目前的ForkJoinPool的實現並未考慮補償等待那些阻塞在等待新生成的線程的工作worker線程,所以最終ForkJoinPool.commonPool()中的線程將備用光並且阻塞等待。

 

正如我們上面那個列子的情況分析得知,lambda的執行並不是瞬間完成的,所有使用parallel streams的程序都有可能成為阻塞程序的源頭, 並且在執行過程中程序中的其他部分將無法訪問這些workers,這意味著任何依賴parallel streams的程序在什麼別的東西占用著common ForkJoinPool時將會變得不可預知並且暗藏危機。

小結:

  • 當需要處理遞歸分治算法時,考慮使用ForkJoinPool
  • 仔細設置不再進行任務劃分的閾值,這個閾值對性能有影響。
  • Java 8中的一些特性會使用到ForkJoinPool中的通用線程池。在某些場合下,需要調整該線程池的默認的線程數量
  • lambda應該盡量避免副作用,也就是說,避免突變基於堆的狀態以及任何IO
  • lambda應該互不幹擾,也就是說避免修改數據源(因為這可能帶來線程安全的問題)
  • 避免訪問在流操作生命周期內可能會改變的狀態

8、並行流的性能

並行流框架的性能受以下因素影響:

  • 數據大小:數據夠大,每個管道處理時間夠長,並行才有意義;
  • 源數據結構:每個管道操作都是基於初始數據源,通常是集合,將不同的集合數據源分割會有一定消耗;
  • 裝箱:處理基本類型比裝箱類型要快;
  • 核的數量:默認情況下,核數量越多,底層fork/join線程池啟動線程就越多;
  • 單元處理開銷:花在流中每個元素身上的時間越長,並行操作帶來的性能提升越明顯;

源數據結構分為以下3組:

  • 性能好:ArrayList、數組或IntStream.range(數據支持隨機讀取,能輕易地被任意分割)
  • 性能一般:HashSetTreeSet(數據不易公平地分解,大部分也是可以的)
  • 性能差:LinkedList(需要遍歷鏈表,難以對半分解)、Stream.iterateBufferedReader.lines(長度未知,難以分解)

註意:下面幾個部分節選自:Streams 的幕後原理,順便感謝一下作者_Brian Goetz_,寫的太通透瞭。

9、NQ模型

要確定並行性是否會帶來提速,需要考慮的最後兩個因素是:可用的數據量和針對每個數據元素執行的計算量。

在我們最初的並行分解描述中,我們采用的概念是拆分來源,直到分段足夠小,以致解決該分段上的問題的順序方法更高效。分段大小必須依賴於所解決的問題,確切的講,取決於每個元素完成的工作量。

例如,計算一個字符串的長度涉及的工作比計算字符串的 SHA-1 哈希值要少得多。為每個元素完成的工作越多,“大到足夠利用並行性” 的閾值就越低。類似地,擁有的數據越多, 拆分的分段就越多,而不會與 “太小” 閾值發生沖突。

一個簡單但有用的並行性能模型是 NQ 模型,其中 N 是數據元素數量,Q 是為每個元素執行的工作量。乘積 N*Q 越大,就越有可能獲得並行提速。對於具有很小的 Q 的問題,比如對數字求和,您通常可能希望看到 N > 10,000 以獲得提速;隨著 Q 增加,獲得提速所需的數據大小將會減小。

並行化的許多阻礙(比如拆分成本、組合成本或遇到順序敏感性)都可以通過 Q 更高的操作來緩解。盡管拆分某個 LinkedList 特征的結果可能很糟糕,但隻要擁有足夠大的 Q,仍然可能獲得並行提速。

10、遇到順序

遇到順序指的是來源分發元素的順序是否對計算至關重要。一些來源(比如基於哈希的集合和映射)沒有有意義的遇到順序。流標志 ORDERED 描述瞭流是否有有意義的遇到順序。

JDK 集合的 spliterator 會根據集合的規范來設置此標志;

一些中間操作可能註入 ORDERED (sorted()) 或清除它 (unordered())。

如果流沒有遇到順序,大部分流操作都必須遵守該順序。對於順序執行,會「自動保留遇到順序」,因為元素會按遇到它們的順序自然地處理。

甚至在並行執行中,許多操作(無狀態中間操作和一些終止操作(比如 reduce())),遵守遇到順序不會產生任何實際成本。

但對於其他操作(有狀態中間操作,其語義與遇到順序關聯的終止操作,比如 findFirst() forEachOrdered()) , 在並行執行中遵守遇到順序的責任可能很重大。

如果流有一個已定義的遇到順序,但該順序對結果沒有意義, 那麼可以通過使用 unordered() 操作刪除 ORDERED 標志,加速包含順序敏感型操作的管道的順序執行。

作為對遇到順序敏感的操作的示例,可以考慮 limit(),它會在指定大小處截斷一個流。在順序執行中實現 limit() 很簡單:保留一個已看到多少元素的計數器,在這之後丟棄任何元素。

但是在並行執行中,實現 limit() 要復雜得多;您需要保留前 N 個元素。此要求大大限制瞭利用並行性的能力;如果輸入劃分為多個部分,您隻有在某個部分之前的所有部分都已完成後,才知道該部分的結果是否將包含在最終結果中。

因此,該實現一般會錯誤地選擇不使用所有可用的核心,或者緩存整個試驗性結果,直到您達到目標長度。

如果流沒有遇到順序,limit() 操作可以自由選擇任何 N 個元素,這讓執行效率變得高得多。知道元素後可立即將其發往下遊, 無需任何緩存,而且線程之間唯一需要執行的協調是發送一個信號來確保未超出目標流長度。

遇到順序成本的另一個不太常見的示例是排序。如果遇到順序有意義,那麼 sorted() 操作會實現一種穩定 排序 (相同的元素按照它們進入輸入時的相同順序出現在輸出中),而對於無序的流,穩定性(具有成本)不是必需的。

distinct() 具有類似的情況:如果流有一個遇到順序,那麼對於多個相同的輸入元素,distinct() 必須發出其中的第一個, 而對於無序的流,它可以發出任何元素 — 同樣可以獲得高效得多的並行實現。

在使用 collect() 聚合時會遇到類似的情形。如果在無序流上執行 collect(groupingBy()) 操作, 與任何鍵對應的元素都必須按它們在輸入中出現的順序提供給下遊收集器。

此順序對應用程序通常沒有什麼意義,而且任何順序都沒有意義。在這些情況下,可能最好選擇一個並發 收集器(比如 groupingByConcurrent() ),它可以忽略遇到順序, 並讓所有線程直接收集到一個共享的並發數據結構中(比如 ConcurrentHashMap),而不是讓每個線程收集到它自己的中間映射中, 然後再合並中間映射(這可能產生很高的成本)。

到此這篇關於Java 8 Stream流強大的原理的文章就介紹到這瞭,更多相關Java 8 Stream流內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: