輕輕松松吃透Java並發fork/join框架

Fork / Join 是一個工具框架 , 其核心思想在於將一個大運算切成多個小份 , 最大效率的利用資源 , 其主要涉及到三個類 : ForkJoinPool / ForkJoinTask / RecursiveTask

一、概述

java.util.concurrent.ForkJoinPool由Java大師Doug Lea主持編寫,它可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合並成最後的計算結果,並進行輸出。本文中對Fork/Join框架的講解,基於JDK1.8+中的Fork/Join框架實現,參考的Fork/Join框架主要源代碼也基於JDK1.8+。

文章將首先先談談recursive task,然後講解Fork/Join框架的基本使用;接著結合Fork/Join框架的工作原理來理解其中需要註意的使用要點;最後再講解使用Fork/Join框架解決一些實際問題。

二、說一說 RecursiveTask

RecursiveTask 是一種 ForkJoinTask 的遞歸實現 , 例如可以用於計算斐波那契數列 :

 class Fibonacci extends RecursiveTask<Integer> {
   final int n;
   Fibonacci(int n) { this.n = n; }
   Integer compute() {
     if (n <= 1)
       return n;
     Fibonacci f1 = new Fibonacci(n - 1);
     f1.fork();
     Fibonacci f2 = new Fibonacci(n - 2);
     return f2.compute() + f1.join();
   }
 }

RecursiveTask 繼承瞭 ForkJoinTask 接口 ,其內部有幾個主要的方法:

// Node 1 : 返回結果 , 存放最終結果
V result;
​
// Node 2 : 抽象方法 compute , 用於計算最終結果
protected abstract V compute();
​
// Node 3 : 獲取最終結果
public final V getRawResult() {
        return result;
}
​
// Node 4 : 最終執行方法 , 這裡是需要調用具體實現類compute
protected final boolean exec() {
    result = compute();
    return true;
}

常見使用方式:

@ 
public class ForkJoinPoolService extends RecursiveTask<Integer> {
​
    private static final int THRESHOLD = 2; //閥值
    private int start;
    private int end;
​
    public ForkJoinPoolService(Integer start, Integer end) {
        this.start = start;
        this.end = end;
    }
​
    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            int middle = (start + end) / 2;
            ForkJoinPoolService leftTask = new ForkJoinPoolService(start, middle);
            ForkJoinPoolService rightTask = new ForkJoinPoolService(middle + 1, end);
            //執行子任務
            leftTask.fork();
            rightTask.fork();
            //等待子任務執行完,並得到其結果
            Integer rightResult = rightTask.join();
            Integer leftResult = leftTask.join();
            //合並子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
​
}

三、 Fork/Join框架基本使用

這裡是一個簡單的Fork/Join框架使用示例,在這個示例中我們計算瞭1-1001累加後的值:

/**
 * 這是一個簡單的Join/Fork計算過程,將1—1001數字相加
 */
public class TestForkJoinPool {

    private static final Integer MAX = 200;

    static class MyForkJoinTask extends RecursiveTask<Integer> {
        // 子任務開始計算的值
        private Integer startValue;

        // 子任務結束計算的值
        private Integer endValue;

        public MyForkJoinTask(Integer startValue , Integer endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }

        @Override
        protected Integer compute() {
            // 如果條件成立,說明這個任務所需要計算的數值分為足夠小瞭
            // 可以正式進行累加計算瞭
            if(endValue - startValue < MAX) {
                System.out.println("開始計算的部分:startValue = " + startValue + ";endValue = " + endValue);
                Integer totalValue = 0;
                for(int index = this.startValue ; index <= this.endValue  ; index++) {
                    totalValue += index;
                }
                return totalValue;
            }
            // 否則再進行任務拆分,拆分成兩個任務
            else {
                MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2);
                subTask1.fork();
                MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue);
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            }
        }
    }

    public static void main(String[] args) {
        // 這是Fork/Join框架的線程池
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> taskFuture =  pool.submit(new MyForkJoinTask(1,1001));
        try {
            Integer result = taskFuture.get();
            System.out.println("result = " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace(System.out);
        }
    }
}

以上代碼很簡單,在關鍵的位置有相關的註釋說明。這裡本文再對以上示例中的要點進行說明。首先看看以上示例代碼的可能執行結果:

開始計算的部分:startValue = 1;endValue = 126
開始計算的部分:startValue = 127;endValue = 251
開始計算的部分:startValue = 252;endValue = 376
開始計算的部分:startValue = 377;endValue = 501
開始計算的部分:startValue = 502;endValue = 626
開始計算的部分:startValue = 627;endValue = 751
開始計算的部分:startValue = 752;endValue = 876
開始計算的部分:startValue = 877;endValue = 1001
result = 501501

四、工作順序圖

下圖展示瞭以上代碼的工作過程概要,但實際上Fork/Join框架的內部工作過程要比這張圖復雜得多,例如如何決定某一個recursive task是使用哪條線程進行運行;再例如如何決定當一個任務/子任務提交到Fork/Join框架內部後,是創建一個新的線程去運行還是讓它進行隊列等待。

所以如果不深入理解Fork/Join框架的運行原理,隻是根據之上最簡單的使用例子觀察運行效果,那麼我們隻能知道子任務在Fork/Join框架中被拆分得足夠小後,並且其內部使用多線程並行完成這些小任務的計算後再進行結果向上的合並動作,最終形成頂層結果。不急,一步一步來,我們先從這張概要的過程圖開始討論。

圖中最頂層的任務使用submit方式被提交到Fork/Join框架中,後者將前者放入到某個線程中運行,工作任務中的compute方法的代碼開始對這個任務T1進行分析。如果當前任務需要累加的數字范圍過大(代碼中設定的是大於200),則將這個計算任務拆分成兩個子任務(T1.1和T1.2),每個子任務各自負責計算一半的數據累加,請參見代碼中的fork方法。如果當前子任務中需要累加的數字范圍足夠小(小於等於200),就進行累加然後返回到上層任務中。

1、ForkJoinPool構造函數

ForkJoinPool有四個構造函數,其中參數最全的那個構造函數如下所示:

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode)
  • parallelism:可並行級別,Fork/Join框架將依據這個並行級別的設定,決定框架內並行執行的線程數量。並行的每一個任務都會有一個線程進行處理,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數量,也不要將這個屬性和ThreadPoolExecutor線程池中的corePoolSize、maximumPoolSize屬性進行比較,因為ForkJoinPool的組織結構和工作方式與後者完全不一樣。而後續的討論中,讀者還可以發現Fork/Join框架中可存在的線程數量和這個參數值的關系並不是絕對的關聯(有依據但並不全由它決定)。
  • factory:當Fork/Join框架創建一個新的線程時,同樣會用到線程創建工廠。隻不過這個線程工廠不再需要實現ThreadFactory接口,而是需要實現ForkJoinWorkerThreadFactory接口。後者是一個函數式接口,隻需要實現一個名叫newThread的方法。在Fork/Join框架中有一個默認的ForkJoinWorkerThreadFactory接口實現:DefaultForkJoinWorkerThreadFactory。
  • handler:異常捕獲處理器。當執行的任務中出現異常,並從任務中被拋出時,就會被handler捕獲。
  • asyncMode:這個參數也非常重要,從字面意思來看是指的異步模式,它並不是說Fork/Join框架是采用同步模式還是采用異步模式工作。Fork/Join框架中為每一個獨立工作的線程準備瞭對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。即是說存在於隊列中的待執行任務,即可以使用先進先出的工作模式,也可以使用後進先出的工作模式。

當asyncMode設置為ture的時候,隊列采用先進先出方式工作;反之則是采用後進先出的方式工作,該值默認為false

......
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
......

ForkJoinPool還有另外兩個構造函數,一個構造函數隻帶有parallelism參數,既是可以設定Fork/Join框架的最大並行任務數量;另一個構造函數則不帶有任何參數,對於最大並行任務數量也隻是一個默認值——當前操作系統可以使用的CPU內核數量(Runtime.getRuntime().availableProcessors())。實際上ForkJoinPool還有一個私有的、原生構造函數,之上提到的三個構造函數都是對這個私有的、原生構造函數的調用。

......
private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
......

如果你對Fork/Join框架沒有特定的執行要求,可以直接使用不帶有任何參數的構造函數。也就是說推薦基於當前操作系統可以使用的CPU內核數作為Fork/Join框架內最大並行任務數量,這樣可以保證CPU在處理並行任務時,盡量少發生任務線程間的運行狀態切換(實際上單個CPU內核上的線程間狀態切換基本上無法避免,因為操作系統同時運行多個線程和多個進程)。

2、fork方法和join方法

Fork/Join框架中提供的fork方法和join方法,可以說是該框架中提供的最重要的兩個方法,它們和parallelism“可並行任務數量”配合工作,可以導致拆分的子任務T1.1、T1.2甚至TX在Fork/Join框架中不同的運行效果。例如TX子任務或等待其它已存在的線程運行關聯的子任務,或在運行TX的線程中“遞歸”執行其它任務,又或者啟動一個新的線程運行子任務……

fork方法用於將新創建的子任務放入當前線程的work queue隊列中,Fork/Join框架將根據當前正在並發執行ForkJoinTask任務的ForkJoinWorkerThread線程狀態,決定是讓這個任務在隊列中等待,還是創建一個新的ForkJoinWorkerThread線程運行它,又或者是喚起其它正在等待任務的ForkJoinWorkerThread線程運行它。

這裡面有幾個元素概念需要註意,ForkJoinTask任務是一種能在Fork/Join框架中運行的特定任務,也隻有這種類型的任務可以在Fork/Join框架中被拆分運行和合並運行。ForkJoinWorkerThread線程是一種在Fork/Join框架中運行的特性線程,它除瞭具有普通線程的特性外,最主要的特點是每一個ForkJoinWorkerThread線程都具有一個獨立的任務等待隊列(work queue),這個任務隊列用於存儲在本線程中被拆分的若幹子任務。

join方法用於讓當前線程阻塞,直到對應的子任務完成運行並返回執行結果。或者,如果這個子任務存在於當前線程的任務等待隊列(work queue)中,則取出這個子任務進行“遞歸”執行。其目的是盡快得到當前子任務的運行結果,然後繼續執行。

五、使用Fork/Join解決實際問題

之前所舉的的例子是使用Fork/Join框架完成1-1000的整數累加。這個示例如果隻是演示Fork/Join框架的使用,那還行,但這種例子和實際工作中所面對的問題還有一定差距。本篇文章我們使用Fork/Join框架解決一個實際問題,就是高效排序的問題。

1.使用歸並算法解決排序問題

排序問題是我們工作中的常見問題。目前也有很多現成算法是為瞭解決這個問題而被發明的,例如多種插值排序算法、多種交換排序算法。而並歸排序算法是目前所有排序算法中,平均時間復雜度較好(O(nlgn)),算法穩定性較好的一種排序算法。它的核心算法思路將大的問題分解成多個小問題,並將結果進行合並。

整個算法的拆分階段,是將未排序的數字集合,從一個較大集合遞歸拆分成若幹較小的集合,這些較小的集合要麼包含最多兩個元素,要麼就認為不夠小需要繼續進行拆分。

那麼對於一個集合中元素的排序問題就變成瞭兩個問題:1、較小集合中最多兩個元素的大小排序;2、如何將兩個有序集合合並成一個新的有序集合。第一個問題很好解決,那麼第二個問題是否會很復雜呢?實際上第二個問題也很簡單,隻需要將兩個集合同時進行一次遍歷即可完成——比較當前集合中最小的元素,將最小元素放入新的集合,它的時間復雜度為O(n):

以下是歸並排序算法的簡單實現:

package test.thread.pool.merge;

import java.util.Arrays;
import java.util.Random;

/**
 * 歸並排序
 * @author yinwenjie
 */
public class Merge1 {

    private static int MAX = 10000;

    private static int inits[] = new int[MAX];

    // 這是為瞭生成一個數量為MAX的隨機整數集合,準備計算數據
    // 和算法本身並沒有什麼關系
    static {
        Random r = new Random();
        for(int index = 1 ; index <= MAX ; index++) {
            inits[index - 1] = r.nextInt(10000000);
        }
    }

    public static void main(String[] args) {
        long beginTime = System.currentTimeMillis();
        int results[] = forkits(inits); 
        long endTime = System.currentTimeMillis();
        // 如果參與排序的數據非常龐大,記得把這種打印方式去掉
        System.out.println("耗時=" + (endTime - beginTime) + " | " + Arrays.toString(results));       
    }

    // 拆分成較小的元素或者進行足夠小的元素集合的排序
    private static int[] forkits(int source[]) {
        int sourceLen = source.length;
        if(sourceLen > 2) {
            int midIndex = sourceLen / 2;
            int result1[] = forkits(Arrays.copyOf(source, midIndex));
            int result2[] = forkits(Arrays.copyOfRange(source, midIndex , sourceLen));
            // 將兩個有序的數組,合並成一個有序的數組
            int mer[] = joinInts(result1 , result2);
            return mer;
        } 
        // 否則說明集合中隻有一個或者兩個元素,可以進行這兩個元素的比較排序瞭
        else {
            // 如果條件成立,說明數組中隻有一個元素,或者是數組中的元素都已經排列好位置瞭
            if(sourceLen == 1
                || source[0] <= source[1]) {
                return source;
            } else {
                int targetp[] = new int[sourceLen];
                targetp[0] = source[1];
                targetp[1] = source[0];
                return targetp;
            }
        }
    }

    /**
     * 這個方法用於合並兩個有序集合
     * @param array1
     * @param array2
     */
    private static int[] joinInts(int array1[] , int array2[]) {
        int destInts[] = new int[array1.length + array2.length];
        int array1Len = array1.length;
        int array2Len = array2.length;
        int destLen = destInts.length;

        // 隻需要以新的集合destInts的長度為標準,遍歷一次即可
        for(int index = 0 , array1Index = 0 , array2Index = 0 ; index < destLen ; index++) {
            int value1 = array1Index >= array1Len?Integer.MAX_VALUE:array1[array1Index];
            int value2 = array2Index >= array2Len?Integer.MAX_VALUE:array2[array2Index];
            // 如果條件成立,說明應該取數組array1中的值
            if(value1 < value2) {
                array1Index++;
                destInts[index] = value1;
            }
            // 否則取數組array2中的值
            else {
                array2Index++;
                destInts[index] = value2;
            }
        }

        return destInts;
    }
}

以上歸並算法對1萬條隨機數進行排序隻需要2-3毫秒,對10萬條隨機數進行排序隻需要20毫秒左右的時間,對100萬條隨機數進行排序的平均時間大約為160毫秒(這還要看隨機生成的待排序數組是否本身的凌亂程度)。可見歸並算法本身是具有良好的性能的。使用JMX工具和操作系統自帶的CPU監控器監視應用程序的執行情況,可以發現整個算法是單線程運行的,且同一時間CPU隻有單個內核在作為主要的處理內核工作:

JMX中觀察到的線程情況:

CPU的運作情況:

2.使用Fork/Join運行歸並算法

但是隨著待排序集合中數據規模繼續增大,以上歸並算法的代碼實現就有一些力不從心瞭,例如以上算法對1億條隨機數集合進行排序時,耗時為27秒左右。

接著我們可以使用Fork/Join框架來優化歸並算法的執行性能,將拆分後的子任務實例化成多個ForkJoinTask任務放入待執行隊列,並由Fork/Join框架在多個ForkJoinWorkerThread線程間調度這些任務。如下圖所示:

以下為使用Fork/Join框架後的歸並算法代碼,請註意joinInts方法中對兩個有序集合合並成一個新的有序集合的代碼,是沒有變化的可以參見本文上一小節中的內容。所以在代碼中就不再贅述瞭:

......
/**
 * 使用Fork/Join框架的歸並排序算法
 * @author yinwenjie
 */
public class Merge2 {

    private static int MAX = 100000000;

    private static int inits[] = new int[MAX];

    // 同樣進行隨機隊列初始化,這裡就不再贅述瞭
    static {
        ......
    }

    public static void main(String[] args) throws Exception {   
        // 正式開始
        long beginTime = System.currentTimeMillis();
        ForkJoinPool pool = new ForkJoinPool();
        MyTask task = new MyTask(inits);
        ForkJoinTask<int[]> taskResult = pool.submit(task);
        try {
            taskResult.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace(System.out);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("耗時=" + (endTime - beginTime));      
    }

    /**
     * 單個排序的子任務
     * @author yinwenjie
     */
    static class MyTask extends RecursiveTask<int[]> {

        private int source[];

        public MyTask(int source[]) {
            this.source = source;
        }

        /* (non-Javadoc)
         * @see java.util.concurrent.RecursiveTask#compute()
         */
        @Override
        protected int[] compute() {
            int sourceLen = source.length;
            // 如果條件成立,說明任務中要進行排序的集合還不夠小
            if(sourceLen > 2) {
                int midIndex = sourceLen / 2;
                // 拆分成兩個子任務
                MyTask task1 = new MyTask(Arrays.copyOf(source, midIndex));
                task1.fork();
                MyTask task2 = new MyTask(Arrays.copyOfRange(source, midIndex , sourceLen));
                task2.fork();
                // 將兩個有序的數組,合並成一個有序的數組
                int result1[] = task1.join();
                int result2[] = task2.join();
                int mer[] = joinInts(result1 , result2);
                return mer;
            } 
            // 否則說明集合中隻有一個或者兩個元素,可以進行這兩個元素的比較排序瞭
            else {
                // 如果條件成立,說明數組中隻有一個元素,或者是數組中的元素都已經排列好位置瞭
                if(sourceLen == 1
                    || source[0] <= source[1]) {
                    return source;
                } else {
                    int targetp[] = new int[sourceLen];
                    targetp[0] = source[1];
                    targetp[1] = source[0];
                    return targetp;
                }
            }
        }

        private int[] joinInts(int array1[] , int array2[]) {
            // 和上文中出現的代碼一致
        }
    }
}

使用Fork/Join框架優化後,同樣執行1億條隨機數的排序處理時間大約在14秒左右,當然這還和待排序集合本身的凌亂程度、CPU性能等有關系。但總體上這樣的方式比不使用Fork/Join框架的歸並排序算法在性能上有30%左右的性能提升。以下為執行時觀察到的CPU狀態和線程狀態:

JMX中的內存、線程狀態:

CPU使用情況:

除瞭歸並算法代碼實現內部可優化的細節處,使用Fork/Join框架後,我們基本上在保證操作系統線程規模的情況下,將每一個CPU內核的運算資源同時發揮瞭出來。

到此這篇關於輕輕松松吃透Java並發fork/join框架的文章就介紹到這瞭,更多相關Java fork/join框架內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: