Java多線程並發FutureTask使用詳解
本文基於最新的 OpenJDK 代碼,預計發行版本為 19 。
Java 的多線程機制本質上能夠完成兩件事情,異步計算和並發。並發問題通過解決線程安全的一系列 API 來解決;而異步計算,常見的使用是 Runnable 和 Callable 配合線程使用。
FutureTask 是基於 Runnable 實現的一個可取消的異步調用 API 。
基本使用
- Future 代表瞭異步計算的結果,通過 ExecutorService 的
Future<?> submit(Runnable task)
方法,作為返回值使用:
interface ArchiveSearcher { String search(String target); } class App { ExecutorService executor = ...; ArchiveSearcher searcher = ...; void showSearch(String target) throws InterruptedException { Callable<String> task = () -> searcher.search(target); Future<String> future = executor.submit(task); // 獲取執行結果 displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
- FutureTask類是實現瞭Runnable的Future的實現,因此可以由Executor執行。例如,上述帶有submit的構造可以替換為:
class App { ExecutorService executor = ...; ArchiveSearcher searcher = ...; void showSearch(String target) throws InterruptedException { Callable<String> task = () -> searcher.search(target); // 關鍵兩行替換 FutureTask<String> future = new FutureTask<>(task); executor.execute(future); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
代碼分析
繼承關系
Future
Future 表示異步計算的結果。定義瞭用於檢查計算是否完成、等待計算完成以及檢索計算結果的能力。隻有在計算完成後,才能使用 get 方法檢索結果,在必要時會阻塞線程直到 Future 計算完成。取消是由 cancel 方法執行的。還提供瞭其他方法來確定任務是正常完成還是被取消。一旦計算完成,就不能取消計算。如果為瞭可取消性而使用 Future ,但又不想提供一個可用的結果,你可以聲明形式 Future<?>
並返回 null 作為任務的結果。
在介紹 Future 中定義的能力之前,先瞭解一下它的用來表示 Future 狀態內部類,和狀態檢索方法:
public interface Future<V> { enum State { // The task has not completed. RUNNING, // The task completed with a result. @see Future#resultNow() SUCCESS, //The task completed with an exception. @see Future#exceptionNow() FAILED, // The task was cancelled. @see #cancel(boolean) CANCELLED } default State state() { if (!isDone()) // 根據 isDone() 判斷運行中 return State.RUNNING; if (isCancelled()) // 根據 isCancelled() 判斷已取消 return State.CANCELLED; boolean interrupted = false; try { while (true) { // 死循環輪詢 try { get(); // may throw InterruptedException when done return State.SUCCESS; } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { return State.FAILED; } } } finally { if (interrupted) Thread.currentThread().interrupt(); } } }
Future 的狀態檢索的默認實現是根據 isDone()
、isCancelled()
和不斷輪詢 get()
方法獲取到的返回值判斷的。
當 get()
正常返回結果時, state()
返回 State.SUCCESS
; 當拋出 InterruptedException
時,最終會操作所在的線程執行嘗試中斷的方法;拋出其他異常時,則返回 State.FAILED
。
Future 中定義的其他方法包括:
package java.util.concurrent; public interface Future<V> { // 取消操作 boolean cancel(boolean mayInterruptIfRunning); // 檢查是否取消 boolean isCancelled(); // 檢查是否完成 boolean isDone(); // 獲取計算結果的方法 V get() throws InterruptedException, ExecutionException; // 帶有超時限制的獲取計算結果的方法 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; // 立刻返回結果 default V resultNow() // 立刻拋出異常 default Throwable exceptionNow() }
其中 resultNow()
和 exceptionNow()
是帶有默認實現的:
default V resultNow() { if (!isDone()) throw new IllegalStateException("Task has not completed"); boolean interrupted = false; try { while (true) { try { return get(); } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { throw new IllegalStateException("Task completed with exception"); } catch (CancellationException e) { throw new IllegalStateException("Task was cancelled"); } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }
- Future 仍在運行中,直接拋出 IllegalStateException 。
- 執行一個輪詢,調用
get()
嘗試返回計算結果,如果get()
拋出異常,則根據異常拋出不同消息的 IllegalStateException 或執行中斷線程的操作。
default Throwable exceptionNow() { if (!isDone()) throw new IllegalStateException("Task has not completed"); if (isCancelled()) throw new IllegalStateException("Task was cancelled"); boolean interrupted = false; try { while (true) { try { get(); throw new IllegalStateException("Task completed with a result"); } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { return e.getCause(); } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }
- Future 仍在運行中,直接拋出 IllegalStateException 。
- Future 檢查是否已取消,如果取消瞭拋出 IllegalStateException 。
- 執行輪詢,調用
get()
方法,如果能夠正常執行結束,也拋出 IllegalStateException ,消息是 "Task completed with a result" ;get()
若拋出 InterruptedException ,則執行線程中斷操作;其他異常正常拋出。
這就是 Future 的全貌瞭。
RunnableFuture
RunnableFuture 接口同時實現瞭 Runnable 和 Future 接口 :
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. * 除非已取消,否則將此Future設置為其計算的結果。 */ void run(); }
Runnable 接口是我們常用的用來實現線程操作的,可以說是十分熟悉也十分簡單瞭。
這個接口代表瞭一個可以 Runnable 的 Future ,run 方法的成功執行代表著 Future 執行完成,並可以獲取它的計算結果。
這個接口是 JDK 1.6 後續才有的。
FutureTask
FutureTask 是 RunnableFuture 的直接實現類,它代表瞭一個可取消的異步計算任務。根據我們對 Future 的分析和 Runnable 的熟悉,就可以理解它的作用瞭:可取消並可以檢索運行狀態的一個 Runnable ,配合線程使用可以中斷線程執行。當任務沒有執行完成時會造成阻塞。並且它還可以配合 Executor 使用。
狀態
FutureTask 內部也定義瞭自己的狀態:
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; // 新建 private static final int COMPLETING = 1; // 完成中 private static final int NORMAL = 2; // 正常完成 private static final int EXCEPTIONAL = 3; // 異常的 private static final int CANCELLED = 4; // 已取消 private static final int INTERRUPTING = 5; // 中斷中 private static final int INTERRUPTED = 6; // 已中斷 @Override public State state() { int s = state; while (s == COMPLETING) { // 等待過渡到 NORMAL 或 EXCEPTIONAL Thread.yield(); s = state; } switch (s) { case NORMAL: return State.SUCCESS; case EXCEPTIONAL: return State.FAILED; case CANCELLED: case INTERRUPTING: case INTERRUPTED: return State.CANCELLED; default: return State.RUNNING; } } }
FutureTask 的狀態包括 7 種,最初為 NEW
,隻有在 set、setException 和 cancel 方法中,運行狀態才會轉換為最終狀態。在完成期間,狀態可能為 COMPLETING
(當結果正在設置時) 或 INTERRUPTING
(僅當中斷跑者以滿足cancel(true)
)的瞬態值。
可能存在的狀態轉換是:
NEW -> COMPLETING -> NORMAL // 正常完成 NEW -> COMPLETING -> EXCEPTIONAL // 拋出異常 NEW -> CANCELLED // 取消 NEW -> INTERRUPTING -> INTERRUPTED // 中斷
屬性
下面分析一下它的屬性:
/** 底層的調用;運行後為null */ private Callable<V> callable; /** get()返回的結果或拋出的異常 */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** 等待線程的 Treiber 堆棧 */ private volatile WaitNode waiters;
內部類
先看一看這個 WaitNode ,這是一個 FutureTask 的內部類:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
一個鏈表結構,用來對等待線程進行排序。
構造方法
最後是方法的分析,首先是構造方法:
// Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}. public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * Runnable 成功是返回給定的結果 result */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
FutureTask 接收一個 Callable 或一個 Runnable 作為參數,Runnable 會封裝一下都保存到屬性 callable
,然後更新 FutureTask 的狀態為 NEW
。
從 Future 接口中實現的方法逐個分析:
檢索 FutureTask 狀態
public boolean isCancelled() { return state >= CANCELLED; // 大於等於 4, 已取消、中斷中、已中斷 } public boolean isDone() { return state != NEW; // 不是 new 就代表執行結束瞭 }
取消操作
// mayInterruptIfRunning 表示最終的取消是通過中斷還是通過取消。 public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 嘗試設置 CANCELLED 或 INTERRUPTING return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); // 通過中斷取消任務 } finally { // final state STATE.setRelease(this, INTERRUPTED); // 更新中斷狀態 } } } finally { finishCompletion(); } return true; }
這裡的 finishCompletion()
的作用是通過 LockSupport 喚醒等待的全部線程並從等待列表中移除,然後調用done()
,最後把 callable 置空。相當於取消成功後釋放資源的操作。
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
done()
是個空實現,供子類去自定義的。
protected void done() { }
計算結果
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 異步結果 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
這裡涉及兩個方法:awaitDone
方法和 report
方法 。
awaitDone 方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // The code below is very delicate, to achieve these goals: // - if nanos <= 0L, 及時返回,不需要 allocation 或 nanoTime // - if nanos == Long.MIN_VALUE, don't underflow // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic // and we suffer a spurious wakeup, we will do no worse than // to park-spin for a while long startTime = 0L; // Special value 0L means not yet parked WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { // COMPLETING = 1 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 瞬時態,完成中 // We may have already promised (via isDone) that we are done // so never return empty-handed or throw InterruptedException Thread.yield(); else if (Thread.interrupted()) { removeWaiter(q); // 線程中斷,移除等待的線程 throw new InterruptedException(); } else if (q == null) { if (timed && nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { // 設置超時時間的情況 final long parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } // nanoTime may be slow; recheck before parking if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } else LockSupport.park(this); } }
通過 CAS 和 LockSupport 的掛起/喚醒操作配合,阻塞當前線程,異步地等待計算結果。
這裡有個 removeWaiter
方法,內部就是遍歷 waiters
,刪除超時和中斷的等待線程。
當異步邏輯執行完成後,調用 report 方法:
// 為完成的任務返回結果或拋出異常 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
這裡用到瞭一個 outcome ,它是一個 Object 類型,作為返回結果,通過 set 方法可以對它進行設置:
// 除非該 future 已被設置或取消,否則將該 future 的結果設置為給定值。 // 該方法在成功完成計算後由 run 方法在內部調用。 protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } }
立刻獲取結果或異常
這兩個方法都是通過 outcome
預設的返回值,返回預期的結果或異常。
public V resultNow() { switch (state()) { // Future.State case SUCCESS: @SuppressWarnings("unchecked") V result = (V) outcome; return result; case FAILED: throw new IllegalStateException("Task completed with exception"); case CANCELLED: throw new IllegalStateException("Task was cancelled"); default: throw new IllegalStateException("Task has not completed"); } } @Override public Throwable exceptionNow() { switch (state()) { // Future.State case SUCCESS: throw new IllegalStateException("Task completed with a result"); case FAILED: Object x = outcome; return (Throwable) x; case CANCELLED: throw new IllegalStateException("Task was cancelled"); default: throw new IllegalStateException("Task has not completed"); } }
run 方法組
最後是實現瞭 Runnable 的 run 方法:
public void run() { // 保證 NEW 狀態和 RUNNER 成功設置當前線程 if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 待執行的 Callable if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); // 執行 Callable ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // 為瞭防止並發調用 run ,直到 state 確定之前, runner 必須是非空的 runner = null; // 狀態必須在 runner 置空後重新讀取,以防止泄漏中斷 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
這裡涉及兩個方法,第一個是 setException(ex)
:
// 導致該future報告一個{@link ExecutionException},並將給定的可拋出對象作為其原因,除非該future已經被設置或取消。 protected void setException(Throwable t) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = t; STATE.setRelease(this, EXCEPTIONAL); // final state finishCompletion(); } }
另一個是 handlePossibleCancellationInterrupt 方法:
/** * 確保任何來自可能的 cancel(true) 的中斷隻在 run 或 runAndReset 時交付給任務。 */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // 我們想清除可能從cancel(true)接收到的所有中斷。 // 然而,允許使用中斷作為任務與其調用者通信的獨立機制,並沒有辦法隻清除取消中斷。 // Thread.interrupted(); }
最後是 runAndReset 方法:
protected boolean runAndReset() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return false; boolean ran = false; // flag 表示正常執行結束 int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; // if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; // 當正常執行結束,且 state 一開始就是 NEW 時,表示可以運行並重置。 }
執行計算時不設置其結果,然後將該 future 重置為初始狀態,如果計算遇到異常或被取消,則不這樣做。這是為本質上執行多次的任務設計的。
run 和 runAndReset 都用到瞭一個 RUNNER
, 最後就來揭秘一下這個東西:
private static final VarHandle STATE; private static final VarHandle RUNNER; private static final VarHandle WAITERS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(FutureTask.class, "state", int.class); RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class); WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class; }
MethodHandles.lookup()
創建一個MethodHandles.Lookup
對象,該對象可以創建所有訪問權限的方法,包括public
,protected
,private
,default
。
VarHandle
主要用於動態操作數組的元素或對象的成員變量。VarHandle
通過 MethodHandles
來獲取實例,然後調用 VarHandle
的方法即可動態操作指定數組的元素或指定對象的成員變量。
到此這篇關於Java 多線程並發FutureTask的文章就介紹到這瞭,更多相關Java 多線程並發內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 解析Java異步之call future
- Future cancel迷惑性boolean入參解析
- Java中Future和FutureTask的示例詳解及使用
- java ThreadPoolExecutor線程池拒絕策略避坑
- Java多線程之 FutureTask:帶有返回值的函數定義和調用方式