Kotlin線程同步的幾種實現方法

面試的時候經常會被問及多線程同步的問題,例如:
“ 現有 Task1、Task2 等多個並行任務,如何等待全部執行完成後,執行 Task3。”
在 Kotlin 中我們有多種實現方式,本文將所有這些方式做瞭整理,建議收藏。
1. Thread.join
2. Synchronized
3. ReentrantLock
4. BlockingQueue
5. CountDownLatch
6. CyclicBarrier
7. CAS
8. Future
9. CompletableFuture
10. Rxjava
11. Coroutine
12. Flow

我們先定義三個Task,模擬上述場景, Task3 基於 Task1、Task2 返回的結果拼接字符串,每個 Task 通過 sleep 模擬耗時:

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}

val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}

val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin 兼容 Java,Java 的所有線程工具默認都可以使用。其中最簡單的線程同步方式就是使用 Thread 的 join() :

@Test
fun test_join() {
    lateinit var s1: String
    lateinit var s2: String

    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    
    task3(s1, s2)

}

2. Synchronized

使用 synchronized 鎖進行同步

 @Test
    fun test_synchrnoized() {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        s2 = task2()

        synchronized(Unit) {
            task3(s1, s2)
        }
    }

但是如果超過三個任務,使用 synchrnoized 這種寫法就比較別扭瞭,為瞭同步多個並行任務的結果需要聲明n個鎖,並嵌套n個 synchronized。

3. ReentrantLock

ReentrantLock 是 JUC 提供的線程鎖,可以替換 synchronized 的使用

 @Test
    fun test_ReentrantLock() {

        lateinit var s1: String
        lateinit var s2: String

        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        s2 = task2()

        lock.lock()
        task3(s1, s2)
        lock.unlock()
    }

ReentrantLock 的好處是,當有多個並行任務時是不會出現嵌套 synchrnoized 的問題,但仍然需要創建多個 lock 管理不同的任務,

4. BlockingQueue

阻塞隊列內部也是通過 Lock 實現的,所以也可以達到同步鎖的效果

 @Test
    fun test_blockingQueue() {

        lateinit var s1: String
        lateinit var s2: String

        val queue = SynchronousQueue<Unit>()

        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()

        s2 = task2()

        queue.take()
        task3(s1, s2)
    }

當然,阻塞隊列更多是使用在生產/消費場景中的同步。

5. CountDownLatch

JUC 中的鎖大都基於 AQS 實現的,可以分為獨享鎖和共享鎖。ReentrantLock 就是一種獨享鎖。相比之下,共享鎖更適合本場景。 例如 CountDownLatch,它可以讓一個線程一直處於阻塞狀態,直到其他線程的執行全部完成:

 @Test
    fun test_countdownlatch() {

        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()

        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()

        cd.await()
        task3(s1, s2)
    }

共享鎖的好處是不必為瞭每個任務都創建單獨的鎖,即使再多並行任務寫起來也很輕松

6. CyclicBarrier

CyclicBarrier 是 JUC 提供的另一種共享鎖機制,它可以讓一組線程到達一個同步點後再一起繼續運行,其中任意一個線程未達到同步點,其他已到達的線程均會被阻塞。
與 CountDownLatch 的區別在於 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置後重復使用,這也正是 Cyclic 的命名由來,可以循環使用

 @Test
    fun test_CyclicBarrier() {

        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)

        Thread {
            s1 = task1()
            cb.await()
        }.start()

        Thread() {
            s2 = task1()
            cb.await()
        }.start()

        cb.await()
        task3(s1, s2)
    }

7. CAS

AQS 內部通過自旋鎖實現同步,自旋鎖的本質是利用 CompareAndSwap 避免線程阻塞的開銷。
因此,我們可以使用基於 CAS 的原子類計數,達到實現無鎖操作的目的。

  @Test
    fun test_cas() {

        lateinit var s1: String
        lateinit var s2: String

        val cas = AtomicInteger(2)

        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()

        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()

        while (cas.get() != 0) {}

        task3(s1, s2)
    }

while 循環空轉看起來有些浪費資源,但是自旋鎖的本質就是這樣,所以 CAS 僅僅適用於一些cpu密集型的短任務同步。

volatile

看到 CAS 的無鎖實現,也許很多人會想到 volatile, 是否也能實現無鎖的線程安全?

  @Test
    fun test_Volatile() {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            s1 = task1()
            cnt--
        }.start()

        Thread {
            s2 = task2()
            cnt--
        }.start()

        while (cnt != 0) {
        }

        task3(s1, s2)
    }

註意,這種寫法是錯誤的
volatile 能保證可見性,但是不能保證原子性,cnt– 並非線程安全,需要加鎖操作

8. Future

上面無論有鎖操作還是無鎖操作,都需要定義兩個變量s1、s2記錄結果非常不方便。
Java 1.5 開始,提供瞭 Callable 和 Future ,可以在任務執行結束時返回結果。

@Test
fun test_future() {

    val future1 = FutureTask(Callable(task1))
    val future2 = FutureTask(Callable(task2))

    Executors.newCachedThreadPool().execute(future1)
    Executors.newCachedThreadPool().execute(future2)

    task3(future1.get(), future2.get())

}

通過 future.get(),可以同步等待結果返回,寫起來非常方便

9. CompletableFuture

future.get() 雖然方便,但是會阻塞線程。 Java 8 中引入瞭 CompletableFuture  ,他實現瞭 Future 接口的同時實現瞭 CompletionStage 接口。 CompletableFuture 可以針對多個 CompletionStage 進行邏輯組合、實現復雜的異步編程。 這些邏輯組合的方法以回調的形式避免瞭線程阻塞:

@Test
fun test_CompletableFuture() {
    CompletableFuture.supplyAsync(task1)
        .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
             task3(p1, p2)
        }.join()
}

10. RxJava

RxJava 提供的各種操作符以及線程切換能力同樣可以幫助我們實現需求:
zip 操作符可以組合兩個 Observable 的結果;subscribeOn 用來啟動異步任務

@Test
fun test_Rxjava() {

    Observable.zip(
        Observable.fromCallable(Callable(task1))
            .subscribeOn(Schedulers.newThread()),
        Observable.fromCallable(Callable(task2))
            .subscribeOn(Schedulers.newThread()),
        BiFunction(task3)
    ).test().awaitTerminalEvent()

}

11. Coroutine

前面講瞭那麼多,其實都是 Java 的工具。 Coroutine 終於算得上是 Kotlin 特有的工具瞭:

@Test
fun test_coroutine() {

    runBlocking {
        val c1 = async(Dispatchers.IO) {
            task1()
        }

        val c2 = async(Dispatchers.IO) {
            task2()
        }

        task3(c1.await(), c2.await())
    }
}

寫起來特別舒服,可以說是集前面各類工具的優點於一身。

12. Flow

Flow 就是 Coroutine 版的 RxJava,具備很多 RxJava 的操作符,例如 zip:

@Test
fun test_flow() {

    val flow1 = flow<String> { emit(task1()) }
    val flow2 = flow<String> { emit(task2()) }
        
    runBlocking {
         flow1.zip(flow2) { t1, t2 ->
             task3(t1, t2)
        }.flowOn(Dispatchers.IO)
        .collect()
    }
}

flowOn 使得 Task 在異步計算並發射結果。

總結

上面這麼多方式,就像茴香豆的“茴”字的四種寫法,沒必要都掌握。作為結論,在 Kotlin 上最好用的線程同步方案首推協程!

到此這篇關於Kotlin線程同步的幾種實現方法的文章就介紹到這瞭,更多相關Kotlin線程同步內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: