Java8 中的ParallelStreams

前言:

並行編程勢不可擋,Java從1.7開始就提供瞭Fork/Join 支持並行處理。java1.8 進一步加強。

並行處理就是將任務拆分子任務,分發給多個處理器同時處理,之後合並。

1、Stream API

Java 8 引入瞭許多特性,Stream API是其中重要的一部分。區別 InputStream OutputStreamStream API 是處理對象流而不是字節流。

執行原理如下,流分串行和並行兩種執行方式

// 串行執行流
stream().filter(e -> e > 10).count();
// 並行執行流
.parallelStream().filter(e -> e > 10).count()

2、ParallelStreams執行原理

並行執行時,java將流劃分為多個子流,分散在不同CPU並行處理,然後進行合並。

並行一定比串行更快嗎?這不一定,取決於兩方面條件:

  • 處理器核心數量,並行處理核心數越多自然處理效率會更高。
  • 處理的數據量越大,優勢越強。這也很好理解,比如十個人幹一個人就能完成的活兒會比它自己幹更便宜?

3、ParallelStreams註意事項

使用並行流時,不要使用collectors.groupingBy,collectors.toMap,替代為

collectors.groupingByConcurrent , collectors.toConcurrentMap,或直接使用串行流。

原因,並行流執行時,通過操作Key來合並多個map的操作比較昂貴。詳細大傢可以查看官網介紹。

https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html#concurrent_reduction

Map<String, List<Person>> byGender = 
  roster
   .stream()
   .collect(Collectors.groupingBy(Person::getGender));

ConcurrentMap<String, List<Person>> byGender =         
  roster
   .parallelStream()
   .collect(Collectors.groupingByConcurrent(Person::getGender));

ParallelStreams 默認使用 ForkJoinPool.commonPool()線程池。

註意:默認情況下,你寫的 ParallelStreams 都是通過該線程池調度執行,整個應用程序都共享這個線程池。

看一個例子,我們查詢一批新聞數據,可以利用並行化來處理遠程新聞下載。

public List<News> queryNews(Stream<String> ids) {
     return ids.parallel()
            .map(this::getNews) // 網絡操作,新聞下載
            .collect(toList());
}

因為是網絡操作,存在很多不確定性,假如某個任務運行時間較長,導致線程池資源占據,阻塞其它線程,這樣就阻止瞭其他的並行流任務正常進行。

如果解決這個問題的其中一種方式,進行線程池隔離。那麼如何自定義並行流的線程池呢?

ForkJoinPool 構造參數我們默認設置為CPU核心數。

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool
  .submit(() -> roster.parallelStream().reduce(0, Integer::sum)).get();


總結:

Java 1.8 提供的Stream API簡化瞭代碼,很好用。不過在使用過程中應該註意以上問題。

到此這篇關於Java8 中的並行流 ParallelStreams的文章就介紹到這瞭,更多相關Java8 ParallelStreams內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: