如何用C#實現SAGA分佈式事務

背景

銀行跨行轉賬業務是一個典型分佈式事務場景,假設 A 需要跨行轉賬給 B,那麼就涉及兩個銀行的數據,無法通過一個數據庫的本地事務保證轉賬的 ACID ,隻能夠通過分佈式事務來解決。

市面上使用比較多的分佈式事務框架,支持 SAGA 的,大部分都是 JAVA 為主的,沒有提供 C# 的對接方式,或者是對接難度大,一定程度上讓人望而卻步。

下面就基於這個框架來實踐一下銀行轉賬的例子。

前置工作

dotnet add package Dtmcli --version 0.3.0

成功的 SAGA

先來看一下一個成功完成的 SAGA 時序圖。

上圖的微服務1,對應我們示例的 OutApi,也就是轉錢出去的那個服務。

微服務2,對應我們示例的 InApi,也就是轉錢進來的那個服務。

下面是兩個服務的正向操作和補償操作的處理。

OutApi

app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) => 
{
    // 進行 數據庫操作
    Console.WriteLine($"用戶【{req.UserId}】轉出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    // 進行 數據庫操作
    Console.WriteLine($"用戶【{req.UserId}】轉出【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

InApi

app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

註:示例為瞭簡單,沒有進行實際的數據庫操作。

到此各個子事務的處理已經 OK 瞭,然後是開啟 SAGA 事務,進行分支調用

var userOutReq = new TransRequest() { UserId = "1", Amount = -30 };
var userInReq = new TransRequest() { UserId = "2", Amount = 30 };

var ct = new CancellationToken();
var gid = await dtmClient.GenGid(ct);
var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
    ;

var flag = await saga.Submit(ct);

Console.WriteLine($"case1, {gid} saga 提交結果 = {flag}");

到這裡,一個完整的 SAGA 分佈式事務就編寫完成瞭。

搭建好 dtm 的環境後,運行上面的例子,會看到下面的輸出。

當然,上面的情況太理想瞭,轉出轉入都是一次性就成功瞭。

但是實際上我們會遇到許許多多的問題,最常見的應該就是網絡故障瞭。

下面來看一個異常的 SAGA 示例

異常的 SAGA

做一個假設,用戶1的轉出是正常的,但是用戶2在轉入的時候出現瞭問題。

由於事務已經提交給 dtm 瞭,按照 SAGA 事務的協議,dtm 會重試未完成的操作。

這個時候用戶2 這邊會出現什麼樣的情況呢?

轉入其實成功瞭,但是 dtm 收到錯誤 (網絡故障等)轉入沒有成功,直接告訴 dtm 失敗瞭 (應用異常等)

無論是那一種,dtm 都會進行重試操作。這個時候會發生什麼呢?我們繼續往下看。

先看一下事務失敗交互的時序圖

再通過調整上面成功的例子,來比較直觀的看看出現的情況。

在 InApi 加多一個轉入失敗的處理接口

app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作--失敗,gid={gid}, branch_id={branch_id}, op={op}");

    //return Results.BadRequest();
    return Results.Ok(TransResponse.BuildFailureResponse());
});

失敗的返回有兩種,一種是狀態碼大於 400,一種是狀態碼是 200 並且響應體包含 FAILURE,上面的例子是第二種

調整一下調用方,把轉入正向操作替換成上面這個返回錯誤的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);

運行結果如下:

在這個例子中,隻考慮補償/重試成功的情況下。

用戶1 轉出的 30 塊錢最終是回到瞭他的帳號上,他沒有出現損失。

用戶2 就有點苦逼瞭,轉入沒有成功,返回瞭失敗,還觸發瞭轉入的補償機制,結果就是把用戶2 還沒進帳的 30 塊錢給多扣瞭,這個就是上面的情況2,常見的空補償問題。

這個時候就要在進行轉入補償的時候做一系列的判斷,轉入有沒有成功,轉出有沒有失敗等等,把業務變的十分復雜。

如果出現瞭上述的情況1,會發生什麼呢?

用戶2 第一次已經成功轉入 30 塊錢,返回的也是成功,但是網絡出瞭點問題,導致 dtm 認為失敗瞭,它就會進行重試,相當於用戶2 還會收到第二個轉入 30 塊錢的請求!也就是說這次轉帳,用戶2 會進賬 60 塊錢,翻倍瞭,也就是說這個請求不是冪等。

同樣的,要處理這個問題,在進行轉入的正向操作中也要進行一系列的判斷,同樣會把復雜度上升一個級別。

前面有提到 dtm 提供瞭子事務屏障的功能,保證瞭冪等、空補償等常見問題。

再來看看這個子事務屏障的功能有沒有幫我們簡化上面異常處理。

子事務屏障

子事務屏障,需要根據 trans_type,gid,branch_id 和 op 四個內容進行創建。

這4個內容 dtm 在回調時會放在 querysting 上面。

客戶端裡面提供瞭 IBranchBarrierFactory 來供我們使用。

空補償

針對上面的異常情況(用戶2 憑空消失 30 塊錢),對轉入的補償進行子事務屏障的改造。

app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        // 轉入失敗的情況下,不應該輸出下面這個
        Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");
        // tx 參數是事務,可和本地事務一起提交回滾
        await Task.CompletedTask;
    });

    Console.WriteLine($"子事務屏障-補償操作,gid={gid}, branch_id={branch_id}, op={op}");
    return Results.Ok(TransResponse.BuildSucceedResponse());
});

Call 方法就是關鍵所在瞭,需要傳入一個 DbConnection 和真正的業務操作,這裡的業務操作就是在控制臺輸出補償操作的信息。

同樣的,我們再調整一下調用方,把轉入補償操作替換成上面帶子事務屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再來運行這個例子。

會發現轉入的補償操作並沒執行,控制臺沒有輸出補償信息,而是輸出瞭

Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False

這個就表明瞭,這個請求是個空補償,是不應該執行業務方法的,既空操作。

再來看一下,轉入成功的,但是 dtm 收到瞭失敗的信號,不斷重試造成重復請求的情況。

冪等

針對用戶2 轉入兩次 30 塊錢的異常情況,對轉入的正向操作進行子事務屏障的改造。

app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】請求來瞭!!! gid={gid}, branch_id={branch_id}, op={op}");

    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        var c = Interlocked.Increment(ref _errCount);

        // 模擬一個超時執行
        if (c > 0 && c < 2) await Task.Delay(10000);

        Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

這裡通過一個超時執行來讓 dtm 進行轉入正向操作的重試。

同樣的,我們再調整一下調用方,把轉入的正向操作也替換成上面帶子事務屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再來運行這個例子。

可以看到轉入的正向操作確實是觸發瞭多次,第一次實際上是成功,隻是響應比較慢,導致 dtm 認為是失敗瞭,觸發瞭第二次請求,但是第二次請求並沒有執行業務操作,而是輸出瞭

Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True

這個就表明瞭,這個請求是個重復請求,是不應該執行業務方法的,保證瞭冪等。

到這裡,可以看出,子事務屏障確實解決瞭冪等和空補償的問題,大大降低瞭業務判斷的復雜度和出錯的可能性。

寫在最後

在這篇文章裡,也通過幾個例子,完整給出瞭編寫一個 SAGA 事務的過程,涵蓋瞭正常成功完成,異常情況,以及成功回滾的情況。希望對研究分佈式事務的您有所幫助。

本文示例代碼: DtmSagaSample

到此這篇關於如何用C#實現SAGA分佈式事務的文章就介紹到這瞭,更多相關C#實現SAGA內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: