Java線程的並發工具類實現原理解析

在JDK的並發包裡提供瞭幾個非常有用的並發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供瞭一種並發流程控制的手段,Exchanger工具類則提供瞭在線程間交換數據的一種手段。本章會配合一些應用場景來介紹如何使用這些工具類。

一、fork/join

1. Fork-Join原理

在必要的情況下,將一個大任務,拆分(fork)成若幹個小任務,然後再將一個個小任務的結果進行匯總(join)。

適用場景:大數據量統計類任務。

2. 工作竊取

Fork/Join在實現上,大任務拆分出來的小任務會被分發到不同的隊列裡面,每一個隊列都會用一個線程來消費,這是為瞭獲取任務時的多線程競爭,但是某些線程會提前消費完自己的隊列。而有些線程沒有及時消費完隊列,這個時候,完成瞭任務的線程就會去竊取那些沒有消費完成的線程的任務隊列,為瞭減少線程競爭,Fork/Join使用雙端隊列來存取小任務,分配給這個隊列的線程會一直從頭取得一個任務然後執行,而竊取線程總是從隊列的尾端拉取task。

3. 代碼實現

我們要使用 ForkJoin 框架,必須首先創建一個 ForkJoin 任務。它提供在任務中執行 fork 和 join 的操作機制,通常我們不直接繼承 ForkjoinTask 類,隻需要直接繼承其子類。

1、RecursiveAction,用於沒有返回結果的任務。
2、RecursiveTask,用於有返回值的任務。

task 要通過 ForkJoinPool 來執行,使用 invoke、execute、submit提交,兩者的區別是:invoke 是同步執行,調用之後需要等待任務完成,才能執行後面的代碼;execute、submit 是異步執行。

示例1:長度400萬的隨機數組求和,使用RecursiveTask 。

/**
 * 隨機產生ARRAY_LENGTH長的的隨機數組
 */
public class MakeArray {
    // 數組長度
    public static final int ARRAY_LENGTH = 4000000;

    public static int[] makeArray() {
        // new一個隨機數發生器
        Random r = new Random();
        int[] result = new int[ARRAY_LENGTH];
        for (int i = 0; i < ARRAY_LENGTH; i++) {
            // 用隨機數填充數組
            result[i] = r.nextInt(ARRAY_LENGTH * 3);
        }
        return result;
    }
}

public class SumArray {
    private static class SumTask extends RecursiveTask<Integer> {

        // 閾值
        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
        private int[] src;
        private int fromIndex;
        private int toIndex;

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

        @Override
        protected Integer compute() {
            // 任務的大小是否合適
            if ((toIndex - fromIndex) < THRESHOLD) {
                System.out.println(" from index = " + fromIndex + " toIndex=" + toIndex);
                int count = 0;
                for (int i = fromIndex; i <= toIndex; i++) {
                    count = count + src[i];
                }
                return count;
            } else {
                // fromIndex....mid.....toIndex
                int mid = (fromIndex + toIndex) / 2;
                SumTask left = new SumTask(src, fromIndex, mid);
                SumTask right = new SumTask(src, mid + 1, toIndex);
                invokeAll(left, right);
                return left.join() + right.join();
            }
        }
    }

    public static void main(String[] args) {

        int[] src = MakeArray.makeArray();
        // new出池的實例
        ForkJoinPool pool = new ForkJoinPool();
        // new出Task的實例
        SumTask innerFind = new SumTask(src, 0, src.length - 1);

        long start = System.currentTimeMillis();

        // invoke阻塞方法
        pool.invoke(innerFind);
        System.out.println("Task is Running.....");

        System.out.println("The count is " + innerFind.join()
                + " spend time:" + (System.currentTimeMillis() - start) + "ms");

    }
}

示例2:遍歷指定目錄(含子目錄)下面的txt文件。

public class FindDirsFiles extends RecursiveAction {

    private File path;

    public FindDirsFiles(File path) {
        this.path = path;
    }

    @Override
    protected void compute() {
        List<FindDirsFiles> subTasks = new ArrayList<>();

        File[] files = path.listFiles();
        if (files!=null){
            for (File file : files) {
                if (file.isDirectory()) {
                    // 對每個子目錄都新建一個子任務。
                    subTasks.add(new FindDirsFiles(file));
                } else {
                    // 遇到文件,檢查。
                    if (file.getAbsolutePath().endsWith("txt")){
                        System.out.println("文件:" + file.getAbsolutePath());
                    }
                }
            }
            if (!subTasks.isEmpty()) {
                // 在當前的 ForkJoinPool 上調度所有的子任務。
                for (FindDirsFiles subTask : invokeAll(subTasks)) {
                    subTask.join();
                }
            }
        }
    }

    public static void main(String [] args){
        try {
            // 用一個 ForkJoinPool 實例調度總任務
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("F:/"));

            // 異步提交
            pool.execute(task);

            // 主線程做自己的業務工作
            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<100;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork=" + otherWork);
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

二、CountDownLatch

閉鎖,CountDownLatch 這個類能夠使一個線程等待其他線程完成各自的工作後再執行。例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有的框架服務之後再執行。

CountDownLatch 是通過一個計數器來實現的,計數器的初始值為初始任務的數量。每當完成瞭一個任務後,計數器的值就會減 1(CountDownLatch.countDown()方法)。當計數器值到達 0 時,它表示所有的已經完成瞭任務,然後在閉鎖上等待 CountDownLatch.await()方法的線程就可以恢復執行任務。

示例代碼:

public class CountDownLatchTest {
    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    private static class BusinessThread extends Thread {
        @Override
        public void run() {
            try {
                System.out.println("BusinessThread " + Thread.currentThread().getName() + " start....");
                Thread.sleep(3000);
                System.out.println("BusinessThread " + Thread.currentThread().getName() + " end.....");
                // 計數器減1
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("main start....");
        new BusinessThread().start();
        new BusinessThread().start();
        // 等待countDownLatch計數器為零後執行後面代碼
        countDownLatch.await();
        System.out.println("main end");
    }

}

註意點:

1、CountDownLatch(2)並不代表對應兩個線程。

2、一個線程中可以多次countDownLatch.countDown(),比如在一個線程中countDown兩次或者多次。

三、CyclicBarrier

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。

CyclicBarrier 默認的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用 await 方法告訴 CyclicBarrier 我已經到達瞭屏障,然後當前線程被阻塞。

CyclicBarrier 還提供一個更高級的構造函數 CyclicBarrie(r int parties,Runnable barrierAction),用於在線程全部到達屏障時,優先執行 barrierAction,方便處理更復雜的業務場景。

示例代碼:

public class CyclicBarrierTest {
    private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());

    /**
     * 存放子線程工作結果的容器
     */
    private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        for (int i = 0; i < 4; i++) {
            Thread thread = new Thread(new SubThread());
            thread.start();
        }
    }

    /**
     * 匯總的任務
     */
    private static class CollectThread implements Runnable {

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }
            System.out.println(" the result = " + result);
            System.out.println("colletThread end.....");
        }
    }

    /**
     * 相互等待的子線程
     */
    private static class SubThread implements Runnable {

        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            resultMap.put(Thread.currentThread().getId() + "", id);
            try {
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " end1.....");
                barrier.await();
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " end2.....");
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

註意: 一個線程中可以多次await();

四、Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。應用場景 Semaphore 可以用於做流量控制,特別是公用資源有限的應用場景,比如數據庫連接池數量。

方法:常用的前4個。

方法 描述
acquire() 獲取連接
release() 歸還連接數
intavailablePermits() 返回此信號量中當前可用的許可證數
intgetQueueLength() 返回正在等待獲取許可證的線程數
void reducePermit(s int reduction) 減少 reduction 個許可證,是個 protected 方法
Collection getQueuedThreads() 返回所有等待獲取許可證的線程集合,是個 protected 方法

示例代碼:模擬數據庫連接池。

/**
 * 數據庫連接
 */
public class SqlConnectImpl implements Connection {

    /**
     * 得到一個數據庫連接
     */
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }
    
    // 省略其他代碼
}
/**
 * 連接池代碼
 */
public class DBPoolSemaphore {

    private final static int POOL_SIZE = 10;
    // 兩個指示器,分別表示池子還有可用連接和已用連接
    private final Semaphore useful;
	private final Semaphore useless;
    // 存放數據庫連接的容器
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    // 初始化池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }

    public DBPoolSemaphore() {
        this.useful = new Semaphore(10);
        this.useless = new Semaphore(0);
    }

    /**
     * 歸還連接
     */
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("當前有" + useful.getQueueLength() + "個線程等待數據庫連接!!"
                    + "可用連接數:" + useful.availablePermits());
            useless.acquire();
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }

    /**
     * 從池子拿連接
     */
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        useless.release();
        return connection;
    }
}
/**
 * 測試代碼
 */
public class AppTest {

    private static DBPoolSemaphore dbPool = new DBPoolSemaphore();

    private static class BusiThread extends Thread {
        @Override
        public void run() {
            // 讓每個線程持有連接的時間不一樣
            Random r = new Random();
            long start = System.currentTimeMillis();
            try {
                Connection connect = dbPool.takeConnect();
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + "_獲取數據庫連接共耗時【" + (System.currentTimeMillis() - start) + "】ms.");
				//模擬業務操作,線程持有連接查詢數據
                Thread.sleep(100 + r.nextInt(100));
                System.out.println("查詢數據完成,歸還連接!");
                dbPool.returnConnect(connect);
            } catch (InterruptedException e) {
            	e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
    }
}

當然,你也可以使用一個 semaphore 來實現,不過需要註意的是 semaphore 的初始數量為10並不是固定的,如果你後面歸還連接時 dbPool.returnConnect(new SqlConnectImpl()); 的話,那麼他的數量會變成 11 。

五、Exchange

Exchanger(交換者)是一個用於線程間協作的工具類。Exchanger 用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過 exchange() 方法交換數據,如果第一個線程先執行 exchange() 方法,它會一直等待第二個線程也執行 exchange() 方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。

但是這種隻能在兩個線程種傳遞,適用面過於狹窄。

六、Callable、Future、FutureTask

  • Runnable 是一個接口,在它裡面隻聲明瞭一個 run()方法,由於 run()方法返回值為 void 類型,所以在執行完任務之後無法返回任何結果。
  • Callable 位於 java.util.concurrent 包下,它也是一個接口,在它裡面也隻聲明瞭一個方法,隻不過這個方法叫做 call(),這是一個泛型接口,call()函數返回的類型就是傳遞進來的 V 類型。
  • Future 就是對於具體的 Runnable 或者 Callable 任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過 get 方法獲取執行結果,該方法會阻塞直到任務返回結果。
  • FutureTask 因為 Future 隻是一個接口,所以是無法直接用來創建對象使用的,因此就有瞭 FutureTask 。

關系圖示:

所以,我們可以通過 FutureTask 把一個 Callable 包裝成 Runnable,然後再通過這個 FutureTask 拿到 Callable 運行後的返回值。

示例代碼:

public class FutureTaskTest {

    private static class CallableTest implements Callable<Integer> {
        private int sum = 0;

        @Override
        public Integer call() throws Exception {
            System.out.println("Callable 子線程開始計算!");
            for (int i = 0; i < 5000; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("Callable 子線程計算任務中斷!");
                    return null;
                }
                sum = sum + i;
                System.out.println("sum=" + sum);
            }
            System.out.println("Callable 子線程計算結束!結果為: " + sum);
            return sum;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CallableTest callableTest = new CallableTest();
        // 包裝
        FutureTask<Integer> futureTask = new FutureTask<>(callableTest);
        new Thread(futureTask).start();

        Random r = new Random();
        if (r.nextInt(100) > 50) {
            // 如果r.nextInt(100) > 50則計算返回結果
            System.out.println("sum = " + futureTask.get());
        } else {
            // 如果r.nextInt(100) <= 50則取消計算
            System.out.println("Cancel...");
            futureTask.cancel(true);
        }
    }
}

都讀到這裡瞭,來個 點贊、評論、關註、收藏 吧!

文章作者:IT王小二
首發地址:https://www.itwxe.com/posts/e4f648cd/
版權聲明:文章內容遵循 署名-非商業性使用-禁止演繹 4.0 國際 進行許可,轉載請在文章頁面明顯位置給出作者與原文鏈接。

到此這篇關於Java線程的並發工具類實現原理解析的文章就介紹到這瞭,更多相關java線程並發內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: