SpringCloud Alibaba使用Seata處理分佈式事務的技巧

Seata簡介

在傳統的單體項目中,我們使用@Transactional註解就能實現基本的ACID事務瞭。
但是前提是:
1) 數據庫支持事務(如:MySQL的innoDB引擎)
2) 所有業務都在同一個數據庫中執行

隨著微服務架構的引入,需要對數據庫進行分庫分表,每個服務擁有自己的數據庫,這樣傳統的事務就不起作用瞭,那麼我們如何保證多個服務中數據的一致性呢?

在這裡插入圖片描述

這樣就出現瞭分佈式事務,而Seata就是為微服務架構而生的一種高性能、易於使用的分佈式事務解決方案。

在這裡插入圖片描述

Seata 中有三個基礎組件:

  1. Transaction Coordinator(TC協調者):維護全局和分支事務的狀態,驅動全局提交或回滾。
  2. Transaction Manager(TM事務管理):定義全局事務的范圍,開啟、提交或回滾一個全局事務。
  3. Resource Manager(RM資源管理):管理分支事務資源,與 TC 通訊並報告分支事務狀態,管理本地事務的提交與回滾。

在這裡插入圖片描述

可以這麼說一個分佈式事務就是全局事務GlobalTransaction,而全局事務是由一個個的分支事務組成的,每個分支事務就是一個本地事務。

在這裡插入圖片描述

Seata的生命周期

  1. TM 要求 TC 生成一個全局事務,並由 TC 生成一個全局事務XID 返回。
  2. XID 通過微服務調用鏈傳播。
  3. RM 向 TC 註冊本地事務,將其註冊到 ID 為 XID 的全局事務中。
  4. TM 要求 TC 提交或回滾XID 對應的全局事務。
  5. TC 驅動 XID 對應的全局事務對應的所有的分支事務提交或回滾。

在這裡插入圖片描述

Seata安裝和配置

安裝nacos,本案例使用瞭nacos作為註冊中心
https://github.com/alibaba/nacos/releases
下載nacos,本文使用的是windows版本1.4.0
使用命令行進入bin目錄,以單機模式啟動nacos

startup -m standalone

在這裡插入圖片描述

安裝和配置Seata

http://seata.io/zh-cn/blog/download.html
下載Seata,這裡是Windows版本的1.4.0
解壓後,進入conf目錄,配置file.conf和registry.conf兩個文件

在這裡插入圖片描述

file.conf主要是數據庫的配置,配置如下

在這裡插入圖片描述

registry.conf 是註冊中心的配置

在這裡插入圖片描述

另外conf目錄中還需要一個腳本文件:nacos-config.sh 用於對nacos進行初始化配置
在seata1.4.0中是沒有的,需要自行創建,內容如下:

#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

while getopts ":h:p:g:t:u:w:" opt
do
  case $opt in
  h)
    host=$OPTARG
    ;;
  p)
    port=$OPTARG
    ;;
  g)
    group=$OPTARG
    ;;
  t)
    tenant=$OPTARG
    ;;
  u)
    username=$OPTARG
    ;;
  w)
    password=$OPTARG
    ;;
  ?)
    echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
    exit 1
    ;;
  esac
done

urlencode() {
  for ((i=0; i < ${#1}; i++))
  do
    char="${1:$i:1}"
    case $char in
    [a-zA-Z0-9.~_-]) printf $char ;;
    *) printf '%%%02X' "'$char" ;;
    esac
  done
}

if [[ -z ${host} ]]; then
    host=localhost
fi
if [[ -z ${port} ]]; then
    port=8848
fi
if [[ -z ${group} ]]; then
    group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
    tenant=""
fi
if [[ -z ${username} ]]; then
    username=""
fi
if [[ -z ${password} ]]; then
    password=""
fi

nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"

echo "set nacosAddr=$nacosAddr"
echo "set group=$group"

failCount=0
tempLog=$(mktemp -u)
function addConfig() {
  curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$(urlencode $1)&group=$group&content=$(urlencode $2)&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
  if [[ -z $(cat "${tempLog}") ]]; then
    echo " Please check the cluster status. "
    exit 1
  fi
  if [[ $(cat "${tempLog}") =~ "true" ]]; then
    echo "Set $1=$2 successfully "
  else
    echo "Set $1=$2 failure "
    (( failCount++ ))
  fi
}

count=0
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
  (( count++ ))
  key=${line%%=*}
    value=${line#*=}
  addConfig "${key}" "${value}"
done

echo "========================================================================="
echo " Complete initialization parameters,  total-count:$count ,  failure-count:$failCount "
echo "========================================================================="

if [[ ${failCount} -eq 0 ]]; then
  echo " Init nacos config finished, please start seata-server. "
else
  echo " init nacos config fail. "
fi

在seata的根目錄,與conf同級的目錄下,還需要config.txt 配置文件,默認也是沒有的

在這裡插入圖片描述

隻需要對mysql的配置進行修改

在這裡插入圖片描述

完整文件:

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=true
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
store.mode=file
store.lock.mode=file
store.session.mode=file
store.publicKey=xx
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=xx
store.redis.sentinel.sentinelHosts=xx
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=xx
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

在conf目錄中,使用Git Bash進入命令行,輸入

sh nacos-config.sh 127.0.0.1

在這裡插入圖片描述

這是對Seata進行初始化配置,上圖表示所有配置都成功設置瞭
在nacos中可以看到出現瞭seata相關的配置

在這裡插入圖片描述

接下來在seata數據庫中,新建三個表

在這裡插入圖片描述

drop table if exists `global_table`;
create table `global_table` (
  `xid` varchar(128)  not null,
  `transaction_id` bigint,
  `status` tinyint not null,
  `application_id` varchar(32),
  `transaction_service_group` varchar(32),
  `transaction_name` varchar(128),
  `timeout` int,
  `begin_time` bigint,
  `application_data` varchar(2000),
  `gmt_create` datetime,
  `gmt_modified` datetime,
  primary key (`xid`),
  key `idx_gmt_modified_status` (`gmt_modified`, `status`),
  key `idx_transaction_id` (`transaction_id`)
);


drop table if exists `branch_table`;
create table `branch_table` (
  `branch_id` bigint not null,
  `xid` varchar(128) not null,
  `transaction_id` bigint ,
  `resource_group_id` varchar(32),
  `resource_id` varchar(256) ,
  `lock_key` varchar(128) ,
  `branch_type` varchar(8) ,
  `status` tinyint,
  `client_id` varchar(64),
  `application_data` varchar(2000),
  `gmt_create` datetime,
  `gmt_modified` datetime,
  primary key (`branch_id`),
  key `idx_xid` (`xid`)
);


drop table if exists `lock_table`;
create table `lock_table` (
  `row_key` varchar(128) not null,
  `xid` varchar(96),
  `transaction_id` long ,
  `branch_id` long,
  `resource_id` varchar(256) ,
  `table_name` varchar(32) ,
  `pk` varchar(36) ,
  `gmt_create` datetime ,
  `gmt_modified` datetime,
  primary key(`row_key`)
);

在項目相關的數據庫中,新建表undo_log 用於記錄撤銷日志

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

最後在bin目錄中,啟動命令行,執行seata-server.bat 啟動Seata服務

在這裡插入圖片描述

項目應用Seata

在這裡插入圖片描述

SpringCloud項目中有兩個服務:訂單服務和庫存服務,基本業務是:

  • 購買商品
  • 插入訂單
  • 減少庫存

訂單詳情表

DROP TABLE IF EXISTS `tb_order_detail`;
CREATE TABLE `tb_order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '訂單詳情id ',
  `order_id` bigint(20) NOT NULL COMMENT '訂單id',
  `sku_id` bigint(20) NOT NULL COMMENT 'sku商品id',
  `num` int(11) NOT NULL COMMENT '購買數量',
  `title` varchar(256) NOT NULL COMMENT '商品標題',
  `own_spec` varchar(1024) DEFAULT '' COMMENT '商品動態屬性鍵值集',
  `price` bigint(20) NOT NULL COMMENT '價格,單位:分',
  `image` varchar(128) DEFAULT '' COMMENT '商品圖片',
  PRIMARY KEY (`id`),
  KEY `key_order_id` (`order_id`) USING BTREE
) ENGINE=MyISAM AUTO_INCREMENT=131 DEFAULT CHARSET=utf8 COMMENT='訂單詳情表';

庫存表

DROP TABLE IF EXISTS `tb_stock`;
CREATE TABLE `tb_stock` (
  `sku_id` bigint(20) NOT NULL COMMENT '庫存對應的商品sku id',
  `seckill_stock` int(9) DEFAULT '0' COMMENT '可秒殺庫存',
  `seckill_total` int(9) DEFAULT '0' COMMENT '秒殺總數量',
  `stock` int(9) NOT NULL COMMENT '庫存數量',
  PRIMARY KEY (`sku_id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COMMENT='庫存表,代表庫存,秒殺庫存等信息';

父項目定義瞭springboot、springcloud、springcloud-alibaba的版本

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.10.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>0.9.0.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>2.2.1.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR8</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

子項目的依賴定義瞭nacos和seata客戶端

<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <scope>runtime</scope>
</dependency>

<dependency>
   <groupId>com.baomidou</groupId>
   <artifactId>mybatis-plus-boot-starter</artifactId>
   <version>3.3.2</version>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

<dependency>
   <groupId>com.alibaba.cloud</groupId>
   <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
   <exclusions>
       <exclusion>
           <groupId>io.seata</groupId>
           <artifactId>seata-spring-boot-starter</artifactId>
       </exclusion>
   </exclusions>
</dependency>
<dependency>
   <groupId>io.seata</groupId>
   <artifactId>seata-spring-boot-starter</artifactId>
   <version>1.2.0</version>
</dependency>

子項目配置文件

在這裡插入圖片描述

完整配置

server:
  port: 8001
spring:
  application:
    name: stock-service
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
    alibaba:
      seata:
        enabled: true
        enable-auto-data-source-proxy: true
        tx-service-group: my_test_tx_group
        registry:
          type: nacos
          nacos:
            application: seata-server
            server-addr: 127.0.0.1:8848
            username: nacos
            password: nacos
        config:
          type: nacos
          nacos:
            server-addr: 127.0.0.1:8848
            group: SEATA_GROUP
            username: nacos
            password: nacos
        service:
          vgroup-mapping:
            my_test_tx_group: default
          disable-global-transaction: false
        client:
          rm:
            report-success-enable: false
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/eshop?serverTimezone=UTC&useUnicode=true&useSSL=false&characterEncoding=utf8
    username: root
    password: 123456

庫存服務定義瞭減庫存的方法

@RestController
public class StockController {

    @Autowired
    private IStockService stockService;

    @PutMapping("/stock")
    public ResponseEntity<Stock> reduceSkuStock(@RequestParam("skuId")Long skuId,
                                                @RequestParam("number")Integer number){
        Stock stock = stockService.getById(skuId);
        if(stock.getStock() < number){
            throw new RuntimeException("庫存不足,SkuId:" + skuId);
        }
        stock.setStock(stock.getStock() - number);
        stockService.updateById(stock);
        return ResponseEntity.ok(stock);
    }
}

訂單服務在插入訂單後,使用Feign調用瞭減庫存的服務

@Service
public class OrderDetailServiceImpl extends ServiceImpl<OrderDetailMapper, OrderDetail> implements IOrderDetailService {

    //庫存服務Feign
    @Autowired
    private StockFeignClient stockFeignClient;

//    @Transactional
    @GlobalTransactional(rollbackFor = {Exception.class})
    @Override
    public void makeOrder(OrderDetail orderDetail) {
        this.save(orderDetail); //保存訂單
        int x = 11 / 0; //拋出異常
        //減庫存
        stockFeignClient.reduceSkuStock(orderDetail.getSkuId(),orderDetail.getNum()); 
    }
}

插訂單和減庫存屬於兩個服務,傳統的@Transactional已經不能保證它們的原子性瞭
這裡使用瞭Seata提供的@GlobalTransactional全局事務註解,出現任何異常後都能實現業務回滾。
測試用例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class OrderServiceApplicationTests {

    @Autowired
    private IOrderDetailService orderDetailService;

    @Test
    public void testOrder() {
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setNum(100);
        orderDetail.setOrderId(9999L);
        orderDetail.setPrice(9999L);
        orderDetail.setSkuId(27359021728L);
        orderDetail.setTitle(UUID.randomUUID().toString());
        orderDetailService.makeOrder(orderDetail);
    }

}

運行後看到啟動瞭全局事務,發生異常後,兩個服務也都能成功回滾。

在這裡插入圖片描述

以上就是SpringCloud Alibaba使用Seata 分佈式事務的詳細內容,更多關於SpringCloud Alibaba分佈式事務的資料請關註WalkonNet其它相關文章!

推薦閱讀: