帶你快速搞定java並發庫
一、總覽
計算機程序 = 數據 + 算法。
並發編程的一切根本原因是為瞭保證數據的正確性,線程的效率性。
Java並發庫共分為四個大的部分,如下圖
Executor 和 future 是為瞭保證線程的效率性
Lock 和數據結構 是為瞭維持數據的一致性。
Java並發編程的時候,思考順序為,
對自己的數據要麼加鎖。要麼使用提供的數據結構,保證數據的安全性
調度線程的時候使用Executor更好的調度。
二、Executor總覽
Executor 提供一種將任務提交與每個任務將如何運行的機制(包括線程使用的細節、調度等)分離開來的方法。
相當於manager,老板讓manager去執行一件任務,具體的是誰執行,什麼時候執行,就不管瞭。
看上圖的繼承關系,介紹幾個
內置的線程池基本上都在這裡
newScheduledThreadPool
定時執行的線程池
newCachedThreadPool
緩存使用過的線程
newFixedThreadPool
固定數量的線程池
newWorkStealingPool
將大任務分解為小任務的線程池
三、繼承結構
構造函數
包含一個定時的service
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; }
四、怎麼保證隻有一個線程
定時執行的時候調用這個方法,調用過程如下,註意看其中的註釋,由上往下的調用順序
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 延遲執行 delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { // 加入任務隊列 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 確保執行 ensurePrestart(); } } // 如果worker數量小於corePoolSize,創建新的線程,其他情況不處理 void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
五、怎麼保證時間可以定時執行
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
在每次執行的時候會把下一次執行的時間放進任務中
private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * Returns the trigger time of a delayed action. */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
FutureTask 定時是通過LockSupport.parkNanos(this, nanos);LockSupport.park(this);
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } //註意這裡 LockSupport.parkNanos(this, nanos); } else //註意這裡 LockSupport.park(this); } }
總結:Executor是通過將任務放在隊列中,生成的futureTask。然後將生成的任務在隊列中排序,將時間最近的需要出發的任務做檢查。如果時間不到,就阻塞線程到下次出發時間。
註意:newSingleThreadScheduledExecutor隻會有一個線程,不管你提交多少任務,這些任務會順序執行,如果發生異常會取消下面的任務,線程池也不會關閉,註意捕捉異常
六、使用
ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor(); Runnable runnable1 = () -> { try { Thread.sleep(4000); System.out.println("11111111111111"); } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable runnable2 = () -> { try { Thread.sleep(4000); System.out.println("222"); } catch (InterruptedException e) { e.printStackTrace(); } }; single.scheduleWithFixedDelay(runnable1,0,1, TimeUnit.SECONDS); single.scheduleWithFixedDelay(runnable2,0,2, TimeUnit.SECONDS);
11111111111111 222 11111111111111 222 11111111111111
在項目中要註意關閉線程池
actionService = Executors.newSingleThreadScheduledExecutor(); actionService.scheduleWithFixedDelay(() -> { try { Thread.currentThread().setName("robotActionService"); Integer robotId = robotQueue.poll(); if (robotId == null) { // 關閉線程池 actionService.shutdown(); } else { int aiLv = robots.get(robotId); if (actionQueueMap.containsKey(aiLv)) { ActionQueue actionQueue = actionQueueMap.get(aiLv); actionQueue.doAction(robotId); } } } catch (Exception e) { // 捕捉異常 LOG.error("",e); } }, 1, 1, TimeUnit.SECONDS);
總結
本篇文章就到這裡瞭,希望能給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!
推薦閱讀:
- java高並發ScheduledThreadPoolExecutor類深度解析
- Java中定時任務的6種實現方式
- Java如何固定大小的線程池
- 實現java簡單的線程池
- Java ScheduledExecutorService定時任務案例講解