Flutter如何保證數據操作原子性詳解

前言

Flutter 是單線程架構,按道理理說,Flutter 不會出現 Java 的多線程相關的問題。

但在我使用 Flutter 過程中,卻發現 Flutter 依然會存在數據操作原子性的問題。

其實 Flutter 中存在多線程的(Isolate 隔離池),隻是 Flutter 中的多線程更像 Java 中的多進程,因為 Flutter 中線程不能像 Java 一樣,可以兩個線程去操作同一個對象。

我們一般將計算任務放在 Flutter 單獨的線程中,例如一大段 Json 數據的解析,可以將解析計算放在單獨的線程中,然後將解析完後的 Map<String, dynamic> 返回到主線程來用。

Flutter單例模式

在 Java 中,我們一般喜歡用單例模式來理解 Java 多線程問題。這裡我們也以單例來舉例,我們先來一個正常的:

class FlutterSingleton {
  static FlutterSingleton? _instance;

  /// 將構造方法聲明成私有的
  FlutterSingleton._();

  static FlutterSingleton getInstance() {
    if (_instance == null) {
      _instance = FlutterSingleton._();
    }
    return _instance!;
  }
}

由於 Flutter 是單線程架構的, 所以上述代碼是沒有問題的。

問題示例

但是, 和 Java 不同的是, Flutter 中存在異步方法。

做 App 開發肯定會涉及到數據持久化,Android 開發應該都熟悉 SharedPreferences,Flutter 中也存在 SharedPreferences 庫,我們就以此來舉例。同樣實現單例模式,隻是這次無可避免的需要使用 Flutter 中的異步:

class SPSingleton {
  static SPSingleton? _instance;

  String? data;

  /// 將構造方法聲明成私有的
  SPSingleton._fromMap(Map<String, dynamic> map) : data = map['data'];

  static Future<SPSingleton> _fromSharedPreferences() async {
    // 模擬從 SharedPreferences 中讀取數據, 並以此來初始化當前對象
    Map<String, String> map = {'data': 'mockData'};
    await Future.delayed(Duration(milliseconds: 10));
    return SPSingleton._fromMap(map);
  }

  static Future<SPSingleton> getInstance() async {
    if (_instance == null) {
      _instance = await SPSingleton._fromSharedPreferences();
    }
    return _instance!;
  }
}

void main() async {
  SPSingleton.getInstance().then((value) {
    print('instance1.hashcode = ${value.hashCode}');
  });
  SPSingleton.getInstance().then((value) {
    print('instance2.hashcode = ${value.hashCode}');
  });
}

運行上面的代碼,打印日志如下:

instance1.hashcode = 428834223
instance2.hashcode = 324692380

可以發現,我們兩次調用 SPSingleton.getInstance() 方法,分別創建瞭兩個對象,說明上面的單例模式實現有問題。

我們來分析一下 getInstance() 方法:

static Future<SPSingleton> getInstance() async {
  if (_instance == null) { // 1
    _instance = await SPSingleton._fromSharedPreferences(); //2
  }
  return _instance!;
}

當第一次調用 getInstance() 方法時,代碼在運行到 1 處時,發現 _instance 為 null, 就會進入 if 語句裡面執行 2 處, 並因為 await 關鍵字掛起, 並交出代碼的執行權, 直到被 await 的 Future 執行完畢,最後將創建的 SPSingleton 對象賦值給 _instance 並返回。

當第二次調用 getInstance() 方法時,代碼在運行到 1 處時,可能會發現 _instance 還是為 null (因為 await SPSingleton._fromSharedPreferences() 需要 10ms 才能返回結果), 然後和第一次調用 getInstance() 方法類似, 創建新的 SPSingleton 對象賦值給 _instance。

最後導致兩次調用 getInstance() 方法, 分別創建瞭兩個對象。

解決辦法

問題原因知道瞭,那麼該怎樣解決這個問題呢?

究其本質,就是 getInstance() 方法的執行不具有原子性,即:在一次 getInstance() 方法執行結束前,不能執行下一次 getInstance() 方法。

幸運的是, 我們可以借助 Completer 來將異步操作原子化,下面是借助 Completer 改造後的代碼:

import 'dart:async';

class SPSingleton {
  static SPSingleton? _instance;
  static Completer<bool>? _monitor;

  String? data;

  /// 將構造方法聲明成私有的
  SPSingleton._fromMap(Map<String, dynamic> map) : data = map['data'];

  static Future<SPSingleton> _fromSharedPreferences() async {
    // 模擬從 SharedPreferences 中讀取數據, 並以此來初始化當前對象
    Map<String, String> map = {'data': 'mockData'};
    await Future.delayed(Duration(milliseconds: 10));
    return SPSingleton._fromMap(map);
  }

  static Future<SPSingleton> getInstance() async {
    if (_instance == null) {
      if (_monitor == null) {
        _monitor = Completer<bool>();
        _instance = await SPSingleton._fromSharedPreferences();
        _monitor!.complete(true);
      } else {
        // Flutter 的 Future 支持被多次 await
        await _monitor!.future;
        _monitor = null;
      }
    }
    return _instance!;
  }
}

void main() async {
  SPSingleton.getInstance().then((value) {
    print('instance1.hashcode = ${value.hashCode}');
  });
  SPSingleton.getInstance().then((value) {
    print('instance2.hashcode = ${value.hashCode}');
  });
}

我們再次分析一下 getInstance() 方法:

static Future<SPSingleton> getInstance() async {
  if (_instance == null) { // 1
    if (_monitor == null) { // 2
      _monitor = Completer<bool>(); // 3
      _instance = await SPSingleton._fromSharedPreferences(); // 4
      _monitor!.complete(true); // 5
    } else {
      // Flutter 的 Future 支持被多次 await
      await _monitor!.future; //6
      _monitor = null;
    }
  }
  return _instance!; // 7
}

當第一次調用 getInstance() 方法時, 1 處和 2 處都會判定為 true, 然後進入執行到 3 處創建一個的 Completer 對象, 然後在 4 的 await 處掛起, 並交出代碼的執行權, 直到被 await 的 Future 執行完畢。

此時第二次調用的 getInstance() 方法開始執行,1 處同樣會判定為 true, 但是到 2 處時會判定為 false, 從而進入到 else, 並因為 6 處的 await 掛起, 並交出代碼的執行權;

此時, 第一次調用 getInstance() 時的 4 處執行完畢, 並執行到 5, 並通過 Completer 通知第二次調用的 getInstance() 方法可以等待獲取代碼執行權瞭。

最後,兩次調用 getInstance() 方法都會返回同一個 SPSingleton 對象,以下是打印日志:

instance1.hashcode = 786567983
instance2.hashcode = 786567983

由於 Flutter 的 Future 是支持多次 await 的, 所以即便是連續 n 次調用 getInstance() 方法, 從第 2 到 n 次調用會 await 同一個 Completer.future, 最後也能返回同一個對象。

Flutter任務隊列

雖然我們經常拿單例模式來解釋說明 Java 多線程問題,可這並不代表著 Java 隻有在單例模式時才有多線程問題。

同樣的,也並不代表著 Flutter 隻有在單例模式下才有原子操作問題。

問題示例

我們同樣以數據持久化來舉例,隻是這次我們以數據庫操作來舉例。

我們在操作數據庫時,經常會有這樣的需求:如果數據庫表中存在這條數據,就更新這條數據,否則就插入這條數據。

為瞭實現這樣的需求,我們可能會先從數據庫表中查詢數據,查詢到瞭就更新,沒查詢到就插入,代碼如下:

class Item {
  int id;
  String data;
  Item({
    required this.id,
    required this.data,
  });
}

class DBTest {
  DBTest._();
  static DBTest instance = DBTest._();
  bool _existsData = false;
  Future<void> insert(String data) async {
    // 模擬數據庫插入操作,10毫秒過後,數據庫中才有數據
    await Future.delayed(Duration(milliseconds: 10));
    _existsData = true;
    print('執行瞭插入');
  }

  Future<void> update(String data) async {
    // 模擬數據庫更新操作
    await Future.delayed(Duration(milliseconds: 10));
    print('執行瞭更新');
  }

  Future<Item?> selected(int id) async {
    // 模擬數據庫查詢操作
    await Future.delayed(Duration(milliseconds: 10));
    if (_existsData) {
      // 數據庫中有數據才返回
      return Item(id: 1, data: 'mockData');
    } else {
      // 數據庫沒有數據時,返回null
      return null;
    }
  }

  /// 先從數據庫表中查詢數據,查詢到瞭就更新,沒查詢到就插入
  Future<void> insertOrUpdate(int id, String data) async {
    Item? item = await selected(id);
    if (item == null) {
      await insert(data);
    } else {
      await update(data);
    }
  }
}

void main() async {
  DBTest.instance.insertOrUpdate(1, 'data');
  DBTest.instance.insertOrUpdate(1, 'data');
}

我們期望的輸出日志為:

執行瞭插入
執行瞭更新

但不幸的是, 輸出的日志為:

執行瞭插入
執行瞭插入

原因也是異步方法操作數據, 不是原子操作, 導致邏輯異常。

也許我們也可以效仿單例模式的實現,利用 Completer 將 insertOrUpdate() 方法原子化。

但對於數據庫操作是不合適的,因為我們可能還有其它需求,比如說:調用插入數據的方法,然後立即從數據庫中查詢這條數據,發現找不到。

如果強行使用 Completer,那麼到最後,可能這個類中會出現一大堆的 Completer ,代碼難以維護。

解決辦法

其實我們想要的效果是,當有異步方法在操作數據庫時,別的操作數據的異步方法應該阻塞住,也就是同一時間隻能有一個方法來操作數據庫。我們其實可以使用任務隊列來實現數據庫操作的需求。

我這裡利用 Completer 實現瞭一個任務隊列:

import 'dart:async';
import 'dart:collection';

/// TaskQueue 不支持 submit await submit, 以下代碼就存在問題
///
/// TaskQueue taskQueue = TaskQueue();
/// Future<void> task1(String arg)async{
///   await Future.delayed(Duration(milliseconds: 100));
/// }
/// Future<void> task2(String arg)async{
///   在這裡submit時, 任務會被添加到隊尾, 且當前方法任務不會結束
///   添加到隊尾的任務必須等到當前方法任務執行完畢後, 才能繼續執行
///   而隊尾的任務必須等當前任務執行完畢後, 才能執行
///   這就導致相互等待, 使任務無法進行下去
///   解決辦法是, 移除當前的 await, 讓當前任務結束
///   await taskQueue.submit(task1, arg);
/// }
///
/// taskQueue.submit(task2, arg);
///
/// 總結:
/// 被 submit 的方法的內部如果調用 submit 方法, 此方法不能 await, 否則任務隊列會被阻塞住
///
/// 如何避免此操作, 可以借鑒以下思想:
/// 以數據庫操作舉例, 有個save方法的邏輯是插入或者更新(先查詢數據庫select,再進行下一步操作);
/// sava方法內部submit,並且select也submit, 就容易出現submit await submit的情況
///
/// 我們可以這樣操作,假設當前類為 DBHelper:
/// 將數據庫的增,刪,查,改操作封裝成私有的 async 方法, 且私有方法不能使用submit
/// DBHelper的公有方法, 可以調用自己的私有 async 方法, 但不能調用自己的公有方法, 公有方法可以使用submit
/// 這樣就不會存在submit await submit的情況瞭
class TaskQueue {
  /// 提交任務
  Future<O> submit<A, O>(Function fun, A? arg) async {
    if (!_isEnable) {
      throw Exception('current TaskQueue is recycled.');
    }
    Completer<O> result = new Completer<O>();

    if (!_isStartLoop) {
      _isStartLoop = true;
      _startLoop();
    }

    _queue.addLast(_Runnable<A, O>(
      fun: fun,
      arg: arg,
      completer: result,
    ));
    if (!(_emptyMonitor?.isCompleted ?? true)) {
      _emptyMonitor?.complete();
    }

    return result.future;
  }

  /// 回收 TaskQueue
  void recycle() {
    _isEnable = false;
    if (!(_emptyMonitor?.isCompleted ?? true)) {
      _emptyMonitor?.complete();
    }
    _queue.clear();
  }

  Queue<_Runnable> _queue = Queue<_Runnable>();
  Completer? _emptyMonitor;
  bool _isStartLoop = false;
  bool _isEnable = true;

  Future<void> _startLoop() async {
    while (_isEnable) {
      if (_queue.isEmpty) {
        _emptyMonitor = new Completer();
        await _emptyMonitor!.future;
        _emptyMonitor = null;
      }

      if (!_isEnable) {
        // 當前TaskQueue不可用時, 跳出循環
        return;
      }

      _Runnable runnable = _queue.removeFirst();
      try {
        dynamic result = await runnable.fun(runnable.arg);
        runnable.completer.complete(result);
      } catch (e) {
        runnable.completer.completeError(e);
      }
    }
  }
}

class _Runnable<A, O> {
  final Completer<O> completer;
  final Function fun;
  final A? arg;

  _Runnable({
    required this.completer,
    required this.fun,
    this.arg,
  });
}

由於 Flutter 中的 future 不支持暫停操作, 一旦開始執行, 就隻能等待執行完。

所以這裡的任務隊列實現是基於方法的延遲調用來實現的。

TaskQueue 的用法示例如下:

void main() async {
  Future<void> test1(String data) async {
    await Future.delayed(Duration(milliseconds: 20));
    print('執行瞭test1');
  }

  Future<String> test2(Map<String, dynamic> args) async {
    await Future.delayed(Duration(milliseconds: 10));
    print('執行瞭test2');
    return 'mockResult';
  }

  TaskQueue taskQueue = TaskQueue();
  taskQueue.submit(test1, '1');
  taskQueue.submit(test2, {
    'data1': 1,
    'data2': '2',
  }).then((value) {
    print('test2返回結果:${value}');
  });

  await Future.delayed(Duration(milliseconds: 200));
  taskQueue.recycle();
}
/*
執行輸出結果如下:

執行瞭test1
執行瞭test2
test2返回結果:mockResult
*/

值得註意的是: 這裡的 TaskQueue 不支持 submit await submit, 原因及示例代碼已在註釋中說明,這裡不再贅述。

為瞭避免出現 submit await submit 的情況,我代碼註釋中也做出瞭建議(假設當前類為 DBHelper):

  • 將數據庫的增、刪、查、改操作封裝成私有的異步方法, 且私有異步方法不能使用 submit;

  • DBHelper 的公有方法, 可以調用自己的私有異步方法, 但不能調用自己的公有異步方法, 公有異步方法可以使用 submit;

這樣就不會出現 submit await submit 的情況瞭。

於是,上述的數據庫操作示例代碼就變成瞭以下的樣子:

class Item {
  int id;
  String data;
  Item({
    required this.id,
    required this.data,
  });
}

class DBTest {
  DBTest._();
  static DBTest instance = DBTest._();
  TaskQueue _taskQueue = TaskQueue();
  bool _existsData = false;
  Future<void> _insert(String data) async {
    // 模擬數據庫插入操作,10毫秒過後,數據庫才有數據
    await Future.delayed(Duration(milliseconds: 10));
    _existsData = true;
    print('執行瞭插入');
  }

  Future<void> insert(String data) async {
    await _taskQueue.submit(_insert, data);
  }

  Future<void> _update(String data) async {
    // 模擬數據庫更新操作
    await Future.delayed(Duration(milliseconds: 10));
    print('執行瞭更新');
  }

  Future<void> update(String data) async {
    await _taskQueue.submit(_update, data);
  }

  Future<Item?> _selected(int id) async {
    // 模擬數據庫查詢操作
    await Future.delayed(Duration(milliseconds: 10));
    if (_existsData) {
      // 數據庫中有數據才返回
      return Item(id: 1, data: 'mockData');
    } else {
      // 數據庫沒有數據時,返回null
      return null;
    }
  }

  Future<Item?> selected(int id) async {
    return await _taskQueue.submit(_selected, id);
  }

  /// 先從數據庫表中查詢數據,查詢到瞭就更新,沒查詢到就插入
  Future<void> _insertOrUpdate(Map<String, dynamic> args) async {
    int id = args['id'];
    String data = args['data'];
    Item? item = await _selected(id);
    if (item == null) {
      await _insert(data);
    } else {
      await _update(data);
    }
  }

  Future<Item?> insertOrUpdate(int id, String data) async {
    return await _taskQueue.submit(_insertOrUpdate, {
      'id': id,
      'data': data,
    });
  }
}

void main() async {
  DBTest.instance.insertOrUpdate(1, 'data');
  DBTest.instance.insertOrUpdate(1, 'data');
}

輸出日志也變成瞭我們期望的樣子:

執行瞭插入
執行瞭更新

總結

  • Flutter 異步方法修改數據時, 一定要註意數據操作的原子性, 不能因為 Flutter 是單線程架構,就忽略多個異步方法競爭導致數據異常的問題。

  • Flutter 保證數據操作的原子性,也有可行辦法,當邏輯比較簡單時,可直接使用 Completer,當邏輯比較復雜時,可以考慮使用任務隊列。

另外,本文中的任務隊列實現有很大的缺陷,不支持 submit await submit,否則整個任務隊列會被阻塞住。

到此這篇關於Flutter如何保證數據操作原子性的文章就介紹到這瞭,更多相關Flutter數據操作原子性內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: