Java並行處理的實現

1. 背景

本文是一個短文章,介紹Java 中的並行處理。
說明:10多分鐘讀完的文章我稱之為短文章,適合快速閱讀。

2.知識

並行計算(parallel computing)一般是指許多指令得以同時進行的計算模式。在同時進行的前提下,可以將計算的過程分解成小部分,之後以並發方式來加以解決。

也就是分解為幾個過程:

1、將一個大任務拆分成多個子任務,子任務還可以繼續拆分。
2、各個子任務同時進行運算執行。
3、在執行完畢後,可能會有個 ” 歸納 ” 的任務,比如 求和,求平均等。

再簡化一點的理解就是: 先拆分  –>  在同時進行計算  –> 最後“歸納”
為什麼要“並行”,優點呢?

1、為瞭獲得 “節省時間”,“快”。適合用於大規模運算的場景。從理論上講,在 n 個並行處理的執行速度可能會是在單一處理機上執行的速度的 n 倍。
2、以前的計算機是單核的,現代的計算機Cpu都是多核的,服務器甚至都是多Cpu的,並行計算可以充分利用硬件的性能。

3. Java 中的並行處理

JDK 8 新增的Stream API(java.util.stream)將生成環境的函數式編程引入瞭Java庫中,可以方便開發者能夠寫出更加有效、更加簡潔的代碼。

steam 的另一個價值是創造性地支持並行處理(parallel processing)。示例:

final Collection< Task > tasks = Arrays.asList(
    new Task( Status.OPEN, 5 ),
    new Task( Status.OPEN, 13 ),
    new Task( Status.CLOSED, 8 ) 
);

// 並行執行多個任務,並 求和
final double totalPoints = tasks
   .stream()
   .parallel()
   .map( task -> task.getPoints() ) // or map( Task::getPoints ) 
   .reduce( 0, Integer::sum );
 
System.out.println( "Total points (all tasks): " + totalPoints );

對於上面的tasks集合,上面的代碼計算所有任務的點數之和。
它使用 parallel 方法並行處理所有的task,並使用 reduce 方法計算最終的結果。

4. 擴展

線程池方式實現並行處理

jdk1.5引入瞭並發包,其中包括瞭ThreadPoolExecutor,相關代碼如下:

public class ExecutorServiceTest {
 
    public static final int THRESHOLD = 10_000;
    public static long[] numbers;
 
    public static void main(String[] args) throws Exception {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
        CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor);
        int taskSize = (int) (numbers.length / THRESHOLD);
        for (int i = 1; i <= taskSize; i++) {
            final int key = i;
            completionService.submit(new Callable<Long>() {
 
                @Override
                public Long call() throws Exception {
                    return sum((key - 1) * THRESHOLD, key * THRESHOLD);
                }
            });
        }
        long sumValue = 0;
        for (int i = 0; i < taskSize; i++) {
            sumValue += completionService.take().get();
        }
        // 所有任務已經完成,關閉線程池
        System.out.println("sumValue = " + sumValue);
        executor.shutdown();
    }
 
    private static long sum(int start, int end) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

使用 fork/join框架

分支/合並框架的目的是以遞歸的方式將可以並行的認為拆分成更小的任務,然後將每個子任務的結果合並起來生成整體結果;相關代碼如下:

public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> {
    
    private static final long serialVersionUID = 1L;
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;
 
    public ForkJoinTest(long[] numbers) {
        this(numbers, 0, numbers.length);
    }
 
    private ForkJoinTest(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
        leftTask.fork();
        ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        // 註:join方法會阻塞,因此有必要在兩個子任務的計算都開始之後才執行join方法
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }
 
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
 
    public static void main(String[] args) {
        System.out.println(forkJoinSum(10_000_000));
    }
 
    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinTest(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

上面的代碼實現瞭 遞歸方式拆分子任務,並放入到線程池中執行。

5.參考:

https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97

到此這篇關於Java並行處理的實現的文章就介紹到這瞭,更多相關Java並行處理內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: