Java中常見的並發控制手段淺析
前言
單實例的並發控制,主要是針對JVM內,我們常規的手段即可滿足需求,常見的手段大概有下面這些
- 同步代碼塊
- CAS自旋
- 鎖
- 阻塞隊列,令牌桶等
1.1 同步代碼塊
通過同步代碼塊,來確保同一時刻隻會有一個線程執行對應的業務邏輯,常見的使用姿勢如下
public synchronized doProcess() { // 同步代碼塊,隻會有一個線程執行 }
一般推薦使用最小區間使用原則,盡量不要直接在方法上加synchronized,比如經典的雙重判定單例模式
public class Single { private static volatile Single instance; private Single() {} public static Single getInstance() { if (instance == null) { synchronized(Single.class) { if (instance == null) instance = new Single(); } } return instance; } }
1.2 CAS自旋方式
比如AtomicXXX原子類中的很多實現,就是借助unsafe的CAS來實現的,如下
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } // unsafe 實現 // cas + 自選,不斷的嘗試更新設置,直到成功為止 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
1.3 鎖
jdk本身提供瞭不少的鎖,為瞭實現單實例的並發控制,我們需要選擇寫鎖;如果支持多讀,單實例寫,則可以考慮讀寫鎖;一般使用姿勢也比較簡單
private void doSome(ReentrantReadWriteLock.WriteLock writeLock) { try { writeLock.lock(); System.out.println("持有鎖成功 " + Thread.currentThread().getName()); Thread.sleep(1000); System.out.println("執行完畢! " + Thread.currentThread().getName()); writeLock.unlock(); } catch (Exception e) { e.printStackTrace(); } } @Test public void lock() throws InterruptedException { ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); Thread.sleep(20000); }
1.4 阻塞隊列
借助同步阻塞隊列,也可以實現並發控制的效果,比如隊列中初始化n個元素,每次消費從隊列中獲取一個元素,如果拿不到則阻塞;執行完畢之後,重新塞入一個元素,這樣就可以實現一個簡單版的並發控制
demo版演示,下面指定隊列長度為2,表示最大並發數控制為2;設置為1時,可以實現單線程的訪問控制
AtomicInteger cnt = new AtomicInteger(); private void consumer(LinkedBlockingQueue<Integer> queue) { try { // 同步阻塞拿去數據 int val = queue.take(); Thread.sleep(2000); System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 添加數據 System.out.println("結束 " + Thread.currentThread()); queue.offer(cnt.getAndAdd(1)); } } @Test public void blockQueue() throws InterruptedException { LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2); queue.add(cnt.getAndAdd(1)); queue.add(cnt.getAndAdd(1)); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); Thread.sleep(10000); }
1.5 信號量Semaphore
上面隊列的實現方式,可以使用信號量Semaphore來完成,通過設置信號量,來控制並發數
private void semConsumer(Semaphore semaphore) { try { //同步阻塞,嘗試獲取信號 semaphore.acquire(1); System.out.println("成功拿到信號,執行: " + Thread.currentThread()); Thread.sleep(2000); System.out.println("執行完畢,釋放信號: " + Thread.currentThread()); semaphore.release(1); } catch (Exception e) { e.printStackTrace(); } } @Test public void semaphore() throws InterruptedException { Semaphore semaphore = new Semaphore(2); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); Thread.sleep(20_000); }
1.6 計數器CountDownLatch
計數,應用場景更偏向於多線程的協同,比如多個線程執行完畢之後,再處理某些事情;不同於上面的並發數的控制,它和柵欄一樣,更多的是行為結果的統一
這種場景下的使用姿勢一般如下
重點:countDownLatch 計數為0時放行
@Test public void countDown() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(() -> { try { System.out.println("do something in " + Thread.currentThread()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }).start(); new Thread(() -> { try { System.out.println("do something in t2: " + Thread.currentThread()); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }).start(); countDownLatch.await(); System.out.printf("結束"); }
1.7 柵欄 CyclicBarrier
CyclicBarrier的作用與上面的CountDownLatch相似,區別在於正向計數+1, 隻有達到條件才放行; 且支持通過調用reset()重置計數,而CountDownLatch則不行
一個簡單的demo
private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) { // 等待達到條件才放行 try { System.out.println("準備執行: " + Thread.currentThread() + " at: " + LocalDateTime.now()); Thread.sleep(sleep); int index = barrier.await(); System.out.println("開始執行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now()); } catch (Exception e) { e.printStackTrace(); } } @Test public void testCyclicBarrier() throws InterruptedException { // 到達兩個工作線程才能繼續往後面執行 CyclicBarrier barrier = new CyclicBarrier(2); // 三秒之後,下面兩個線程的才會輸出 開始執行 new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start(); new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start(); Thread.sleep(4000); // 重置,可以再次使用 barrier.reset(); new Thread(() -> cyclicBarrierLogic(barrier, 1)).start(); new Thread(() -> cyclicBarrierLogic(barrier, 1)).start(); Thread.sleep(10000); }
1.8 guava令牌桶
guava封裝瞭非常簡單的並發控制工具類RateLimiter,作為單機的並發控制首選
一個控制qps為2的簡單demo如下:
private void guavaProcess(RateLimiter rateLimiter) { try { // 同步阻塞方式獲取 System.out.println("準備執行: " + Thread.currentThread() + " > " + LocalDateTime.now()); rateLimiter.acquire(); System.out.println("執行中: " + Thread.currentThread() + " > " + LocalDateTime.now()); } catch (Exception e) { e.printStackTrace(); } } @Test public void testGuavaRate() throws InterruptedException { // 1s 中放行兩個請求 RateLimiter rateLimiter = RateLimiter.create(2.0d); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); Thread.sleep(20_000); }
輸出:
準備執行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263
準備執行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263
準備執行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264
準備執行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264
準備執行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263
準備執行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264
準備執行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263
執行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267
執行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722
執行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225
執行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721
執行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221
執行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720
執行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219
1.9 滑動窗口TimeWindow
沒有找到通用的滑動窗口jar包,一般來講滑動窗口更適用於平滑的限流,解決瞬時高峰問題
一個供參考的實現方式:
固定大小隊列,隊列中每個數據代表一個時間段的計數,
訪問 -》 隊列頭拿數據(註意不出隊)-》判斷是否跨時間段 -》 同一時間段,計數+1 -》跨時間段,新增數據入隊,若
扔不進去,表示時間窗滿,隊尾數據出隊
問題:當流量稀疏時,導致不會自動釋放過期的數據
解決方案:根據時間段設置定時任務,模擬訪問操作,隻是將計數改為 + 0
1.10 小結
本文給出瞭幾種單機版的並發控制的技術手段,主要目的是介紹瞭一些可選的方案,技術細節待後續補全完善,當然如果有其他的建議,歡迎評論交流
到此這篇關於Java中常見的並發控制手段的文章就介紹到這瞭,更多相關Java並發控制手段內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 一文瞭解Java讀寫鎖ReentrantReadWriteLock的使用
- Java並發編程之常用的輔助類詳解
- 徹底搞懂Java多線程(五)
- Java 讀寫鎖源碼分析
- java並發包中CountDownLatch和線程池的使用詳解