Java中常見的並發控制手段淺析

前言

單實例的並發控制,主要是針對JVM內,我們常規的手段即可滿足需求,常見的手段大概有下面這些

  • 同步代碼塊
  • CAS自旋
  • 阻塞隊列,令牌桶等

1.1 同步代碼塊

通過同步代碼塊,來確保同一時刻隻會有一個線程執行對應的業務邏輯,常見的使用姿勢如下

public synchronized doProcess() {
    // 同步代碼塊,隻會有一個線程執行
}

一般推薦使用最小區間使用原則,盡量不要直接在方法上加synchronized,比如經典的雙重判定單例模式

public class Single {
  private static volatile Single instance;
  private Single() {}
  public static Single getInstance() {
      if (instance == null) {
          synchronized(Single.class) {
              if (instance == null) instance = new Single();
          }
      }
      return instance;
  }
}

1.2 CAS自旋方式

比如AtomicXXX原子類中的很多實現,就是借助unsafe的CAS來實現的,如下

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}


// unsafe 實現
// cas + 自選,不斷的嘗試更新設置,直到成功為止
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

1.3 鎖

jdk本身提供瞭不少的鎖,為瞭實現單實例的並發控制,我們需要選擇寫鎖;如果支持多讀,單實例寫,則可以考慮讀寫鎖;一般使用姿勢也比較簡單

private void doSome(ReentrantReadWriteLock.WriteLock writeLock) {
    try {
        writeLock.lock();
        System.out.println("持有鎖成功 " + Thread.currentThread().getName());
        Thread.sleep(1000);
        System.out.println("執行完畢! " + Thread.currentThread().getName());
        writeLock.unlock();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Test
public void lock() throws InterruptedException {
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();

    Thread.sleep(20000);
}

1.4 阻塞隊列

借助同步阻塞隊列,也可以實現並發控制的效果,比如隊列中初始化n個元素,每次消費從隊列中獲取一個元素,如果拿不到則阻塞;執行完畢之後,重新塞入一個元素,這樣就可以實現一個簡單版的並發控制

demo版演示,下面指定隊列長度為2,表示最大並發數控制為2;設置為1時,可以實現單線程的訪問控制

AtomicInteger cnt = new AtomicInteger();

private void consumer(LinkedBlockingQueue<Integer> queue) {
    try {
        // 同步阻塞拿去數據
        int val = queue.take();
        Thread.sleep(2000);
        System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 添加數據
        System.out.println("結束 " + Thread.currentThread());
        queue.offer(cnt.getAndAdd(1));
    }
}

@Test
public void blockQueue() throws InterruptedException {
    LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
    queue.add(cnt.getAndAdd(1));
    queue.add(cnt.getAndAdd(1));


    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();

    Thread.sleep(10000);
}

1.5 信號量Semaphore

上面隊列的實現方式,可以使用信號量Semaphore來完成,通過設置信號量,來控制並發數

private void semConsumer(Semaphore semaphore) {
    try {
        //同步阻塞,嘗試獲取信號
        semaphore.acquire(1);
        System.out.println("成功拿到信號,執行: " + Thread.currentThread());
        Thread.sleep(2000);
        System.out.println("執行完畢,釋放信號: " + Thread.currentThread());
        semaphore.release(1);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Test
public void semaphore() throws InterruptedException {
    Semaphore semaphore = new Semaphore(2);

    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();

    Thread.sleep(20_000);
}

1.6 計數器CountDownLatch

計數,應用場景更偏向於多線程的協同,比如多個線程執行完畢之後,再處理某些事情;不同於上面的並發數的控制,它和柵欄一樣,更多的是行為結果的統一

這種場景下的使用姿勢一般如下

重點:countDownLatch 計數為0時放行

@Test
public void countDown() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
        try {
            System.out.println("do something in " + Thread.currentThread());
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    new Thread(() -> {
        try {
            System.out.println("do something in t2: " + Thread.currentThread());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    countDownLatch.await();
    System.out.printf("結束");
}

1.7 柵欄 CyclicBarrier

CyclicBarrier的作用與上面的CountDownLatch相似,區別在於正向計數+1, 隻有達到條件才放行; 且支持通過調用reset()重置計數,而CountDownLatch則不行

一個簡單的demo

private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) {
    // 等待達到條件才放行
    try {
        System.out.println("準備執行: " + Thread.currentThread() + " at: " + LocalDateTime.now());
        Thread.sleep(sleep);
        int index = barrier.await();
        System.out.println("開始執行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Test
public void testCyclicBarrier() throws InterruptedException {
    // 到達兩個工作線程才能繼續往後面執行
    CyclicBarrier barrier = new CyclicBarrier(2);
    // 三秒之後,下面兩個線程的才會輸出 開始執行
    new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start();
    new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start();

    Thread.sleep(4000);
    // 重置,可以再次使用
    barrier.reset();
    new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
    new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
    Thread.sleep(10000);
}

1.8 guava令牌桶

guava封裝瞭非常簡單的並發控制工具類RateLimiter,作為單機的並發控制首選

一個控制qps為2的簡單demo如下:

private void guavaProcess(RateLimiter rateLimiter) {
    try {
        // 同步阻塞方式獲取
        System.out.println("準備執行: " + Thread.currentThread() + " > " + LocalDateTime.now());
        rateLimiter.acquire();
        System.out.println("執行中: " + Thread.currentThread() + " > " + LocalDateTime.now());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Test
public void testGuavaRate() throws InterruptedException {
    // 1s 中放行兩個請求
    RateLimiter rateLimiter = RateLimiter.create(2.0d);
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();

    Thread.sleep(20_000);
}

輸出:

準備執行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263
準備執行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263
準備執行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264
準備執行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264
準備執行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263
準備執行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264
準備執行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263
執行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267
執行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722
執行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225
執行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721
執行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221
執行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720
執行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219

1.9 滑動窗口TimeWindow

沒有找到通用的滑動窗口jar包,一般來講滑動窗口更適用於平滑的限流,解決瞬時高峰問題

一個供參考的實現方式:

固定大小隊列,隊列中每個數據代表一個時間段的計數,

訪問 -》 隊列頭拿數據(註意不出隊)-》判斷是否跨時間段 -》 同一時間段,計數+1 -》跨時間段,新增數據入隊,若

扔不進去,表示時間窗滿,隊尾數據出隊

問題:當流量稀疏時,導致不會自動釋放過期的數據

解決方案:根據時間段設置定時任務,模擬訪問操作,隻是將計數改為 + 0

1.10 小結

本文給出瞭幾種單機版的並發控制的技術手段,主要目的是介紹瞭一些可選的方案,技術細節待後續補全完善,當然如果有其他的建議,歡迎評論交流

到此這篇關於Java中常見的並發控制手段的文章就介紹到這瞭,更多相關Java並發控制手段內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: