帶你用Python實現Saga 分佈式事務的方法
銀行跨行轉賬業務是一個典型分佈式事務場景,假設 A 需要跨行轉賬給 B,那麼就涉及兩個銀行的數據,無法通過一個數據庫的本地事務保證轉賬的 ACID,隻能夠通過分佈式事務來解決。
分佈式事務
分佈式事務在分佈式環境下,為瞭滿足可用性、性能與降級服務的需要,降低一致性與隔離性的要求,一方面遵循 BASE 理論:
- 基本業務可用性( Basic Availability )
- 柔性狀態( Soft state )
- 最終一致性( Eventual consistency )
- 另一方面,分佈式事務也部分遵循 ACID 規范:
- 原子性:嚴格遵循
- 一致性:事務完成後的一致性嚴格遵循;事務中的一致性可適當放寬
- 隔離性:並行事務間不可影響;事務中間結果可見性允許安全放寬
- 持久性:嚴格遵循
SAGA
Saga 是這一篇數據庫論文SAGAS提到的一個分佈式事務方案。其核心思想是將長事務拆分為多個本地短事務,由 Saga 事務協調器協調,如果各個本地事務成功完成那就正常完成,如果某個步驟失敗,則根據相反順序一次調用補償操作。
目前可用於 SAGA 的開源框架,主要為 Java 語言,其中以 seata 為代表。我們的例子采用 go 語言,使用的分佈式事務框架為https://github.com/yedf/dtm,它對分佈式事務的支持非常優雅。下面來詳細講解 SAGA 的組成:
DTM 事務框架裡,有 3 個角色,與經典的 XA 分佈式事務一樣:
- AP/應用程序,發起全局事務,定義全局事務包含哪些事務分支
- RM/資源管理器,負責分支事務各項資源的管理
- TM/事務管理器,負責協調全局事務的正確執行,包括 SAGA 正向 /逆向操作的執行
下面看一個成功完成的 SAGA 時序圖,就很容易理解 SAGA 分佈式事務:
SAGA實踐
對於我們要進行的銀行轉賬的例子,我們將在正向操作中,進行轉入轉出,在補償操作中,做相反的調整。
首先我們創建賬戶餘額表:
CREATE TABLE dtm_busi.`user_account` ( `id` int(11) AUTO_INCREMENT PRIMARY KEY, `user_id` int(11) not NULL UNIQUE , `balance` decimal(10,2) NOT NULL DEFAULT '0.00', `create_time` datetime DEFAULT now(), `update_time` datetime DEFAULT now() );
我們先編寫核心業務代碼,調整用戶的賬戶餘額
def saga_adjust_balance(cursor, uid, amount): affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount)) if affected == 0: raise Exception("update error, balance not enough")
下面我們來編寫具體的正向操作 /補償操作的處理函數
@app.post("/api/TransOutSaga") def trans_out_saga(): saga_adjust_balance(c, out_uid, -30) return {"dtm_result": "SUCCESS"} @app.post("/api/TransOutCompensate") def trans_out_compensate(): saga_adjust_balance(c, out_uid, 30) return {"dtm_result": "SUCCESS"} @app.post("/api/TransInSaga") def trans_in_saga(): saga_adjust_balance(c, in_uid, 30) return {"dtm_result": "SUCCESS"} @app.post("/api/TransInCompensate") def trans_in_compensate(): saga_adjust_balance(c, in_uid, -30) return {"dtm_result": "SUCCESS"}
到此各個子事務的處理函數已經 OK 瞭,然後是開啟 SAGA 事務,進行分支調用
# 這是 dtm 服務地址 dtm = "http://localhost:8080/api/dtmsvr" # 這是業務微服務地址 svc = "http://localhost:5000/api" req = {"amount": 30} s = saga.Saga(dtm, utils.gen_gid(dtm)) s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate") s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate") s.submit()
至此,一個完整的 SAGA 分佈式事務編寫完成。
如果您想要完整運行一個成功的示例,那麼參考這個例子yedf/dtmcli-py-sample,將它運行起來非常簡單
# 部署啟動 dtm # 需要 docker 版本 18 以上 git clone https://github.com/yedf/dtm cd dtm docker-compose up # 另起一個命令行 git clone https://github.com/yedf/dtmcli-py-sample cd dtmcli-py-sample pip3 install flask dtmcli requests flask run # 另起一個命令行 curl localhost:5000/api/fireSaga
處理網絡異常
假設提交給 dtm 的事務中,調用轉入操作時,出現短暫的故障怎麼辦?按照 SAGA 事務的協議,dtm 會重試未完成的操作,這時我們要如何處理?故障有可能是轉入操作完成後出網絡故障,也有可能是轉入操作完成中出現機器宕機。如何處理才能夠保障賬戶餘額的調整是正確無問題的?
這類網絡異常的妥當處理,是分佈式事務中的大難題,異常情況包括三類:重復請求、空補償、懸掛,都需要正確處理
DTM 提供瞭子事務屏障功能,保證上述異常情況下的業務邏輯,隻會有一次正確順序下的成功提交。(子事務屏障詳情參考分佈式事務最經典的七種解決方案的子事務屏障環節)
我們把處理函數調整為:
@app.post("/api/TransOutSaga") def trans_out_saga(): with barrier.AutoCursor(conn_new()) as cursor: def busi_callback(c): saga_adjust_balance(c, out_uid, -30) barrier_from_req(request).call(cursor, busi_callback) return {"dtm_result": "SUCCESS"}
這裡的 barrier_from_req(request).call(cursor, busi_callback)調用會使用子事務屏障技術,保證 busi_callback 回調函數僅被提交一次
您可以嘗試多次調用這個 TransIn 服務,僅有一次餘額調整。
處理回滾
假如銀行將金額準備轉入用戶 2 時,發現用戶 2 的賬戶異常,返回失敗,會怎麼樣?我們調整處理函數,讓轉入操作返回失敗
@app.post("/api/TransInSaga") def trans_in_saga(): return {"dtm_result": "FAILURE"}
我們給出事務失敗交互的時序圖
這裡有一點,TransIn 的正向操作什麼都沒有做,就返回瞭失敗,此時調用 TransIn 的補償操作,會不會導致反向調整出錯瞭呢?
不用擔心,前面的子事務屏障技術,能夠保證 TransIn 的錯誤如果發生在提交之前,則補償為空操作;TransIn 的錯誤如果發生在提交之後,則補償操作會將數據提交一次。
您可以將返回錯誤的 TransIn 改成:
@app.post("/api/TransInSaga") def trans_in_saga(): with barrier.AutoCursor(conn_new()) as cursor: def busi_callback(c): saga_adjust_balance(c, in_uid, 30) barrier_from_req(request).call(cursor, busi_callback) return {"dtm_result": "FAILURE"}
最後的結果餘額依舊會是對的,原理可以參考:分佈式事務最經典的七種解決方案的子事務屏障環節
小結
在這篇文章裡,我們介紹瞭 SAGA 的理論知識,也通過一個例子,完整給出瞭編寫一個 SAGA 事務的過程,涵蓋瞭正常成功完成,異常情況,以及成功回滾的情況。相信讀者通過這邊文章,對 SAGA 已經有瞭深入的理解。
文中使用的 dtm 是新開源的 Golang 分佈式事務管理框架,功能強大,支持 TCC 、SAGA 、XA 、事務消息等事務模式,支持 Go 、python 、PHP 、node 、csharp 等語言的。同時提供瞭非常簡單易用的接口。
閱讀完此篇幹貨,歡迎大傢訪問項目https://github.com/yedf/dtm,給顆星星支持!
到此這篇關於帶你用Python實現Saga 分佈式事務的問題的文章就介紹到這瞭,更多相關Python Saga 分佈式事務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 如何用C#實現SAGA分佈式事務
- Node.js自定義對象事件的監聽與發射
- Golang開發中如何解決共享變量問題
- Python OpenCV實現任意角度二維碼矯正
- python區塊鏈簡易版交易實現示例