解決JDK8的ParallelStream遍歷無序的問題

JDK8的ParallelStream遍歷無序

ParallelStream其實就是一個並行執行的流

它通過默認的ForkJoinPool,可能提高你的多線程任務的速度.

Stream具有平行處理能力,處理的過程會分而治之,也就是將一個大任務切分成多個小任務,這表示每個任務都是一個操作,因此像以下的程式片段:

List    
       list = Arrays.asList(1, 2, 3, 4, 5);
list.parallelStream().forEach(out::println);

你得到的展示順序不一定會是1、2、3、4、5,而可能是任意的順序,就forEach()這個操作來講,如果平行處理時,希望最後順序是按照原來Stream的數據順序,那可以調用forEachOrdered()。

List    
       list = Arrays.asList(1, 2, 3, 4, 5);
list.parallelStream().forEachOrdered(out::println);

parallelStream進行遍歷的坑,以及如何進行避免異步操作中出現的問題

Java8 已經很久瞭,現在都已經Java12版本瞭.

我所在的上傢公司,在寫代碼時候推薦使用lambad來進行操作遍歷集合

也就是像下面一樣

List<Integer> list = new ArrayList<>();
for (int j = 0; j < 1000; j++) {
list.add(j);
}
list.stream().forEach(value -> {
System.out.println(value);
});

這種效率其實和傳統上的使用foreach以及for循環遍歷效果差不多,因為點開forEach方法會發現內部其實使用的是下面的方法進行對集合遍歷的

內部其實使用的還是for進行遍歷,所以兩者相比較其實沒有什麼效率的差異的,當然這也會由於每個公司編程習慣不一樣,有的人更喜歡傳統上的for進行遍歷

因為上面的遍歷方式不會對效率有什麼提升, 所以由此還有一種方式就是

parallelStream()
List<Integer> list = new ArrayList<>();
for (int j = 0; j < 1000; j++) {
list.add(j);
}
list.parallelStream().forEach(value -> {
System.out.println(value);
});

上面的方法其實就是異步的,

這種遍歷方式因為是異步遍歷,會產生一種情況,就是遍歷的順序是無序的,當然也有相應的好處就是,遍歷速度會快,當對生成結果不考慮排序問題而且數據量比較大的時候可以使用.

但是,有利自然有弊,因為異步的所以需要考慮線程的問題,就是生成的結果真的是你想要的麼?

以下面的例子來運行一段代碼:

public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
for (int j = 0; j < 1000; j++) {
list.add(j);
}
System.out.println("最開始生成的集合長度:"+list.size());
//parallelStream遍歷數據的時候會產生丟失的問題
for (int i = 0; i < 10 ; i++) {
  
List<Integer> parseList = new ArrayList<>();
list.parallelStream().forEach(integer -> {
parseList.add(integer);
});
System.out.println("每次遍歷的集合長度:"+ parseList.size());
}
}

我首先創建瞭一個1000長度的集合,之後對這個集合使用多次遍歷,然而呢,會發現,最後遍歷的集合少數據,並且會在多次重復遍歷的時候數組越界..

因為這種情況,之前工作使用parallelStream出現過2次問題, 我一直以為是使用parallelStream本身不夠很安全導致的.實際上今天整理這篇博文突然才發現這個問題,就是遍歷的結果轉為的list是線程安全的麼?

其實當正常進行遍歷的時候, 可以對遍歷出的結果核對,實際上每次遍歷出的結果,仍然是與原來生成的結果一致的.

所以這邊隻能將鍋甩在接收這些數據的list上面瞭

這個時候就需要對list進行包裝

List<Integer> synchronizedList = Collections.synchronizedList(parseList);

這會在看下修改後的代碼以及結果

public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
for (int j = 0; j < 1000; j++) {
list.add(j);
}
System.out.println("最開始生成的集合長度:"+list.size());
//parallelStream遍歷數據的時候會產生丟失的問題
for (int i = 0; i < 10 ; i++) {
  
List<Integer> parseList = new ArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(parseList);
list.parallelStream().forEach(integer -> {
synchronizedList.add(integer);
});
System.out.println("每次遍歷的集合長度:"+ synchronizedList.size());
}
}

這樣每次遍歷的結果也都是一樣的,而且速度也會由於異步的會比之前效率提升好多

同樣的如何創建線程安全的set,map也就可以進行相應的包裝,這樣就避免瞭使用會出新一些明明感覺對,確和自己想要的結果不一致的bug

同理使用parallelStream用StringBuffer 而不適用StringBuilder,因為前者是線程安全的

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: