SpringBoot RedisTemplate分佈式鎖的項目實戰

1.使用場景

想直接獲取加鎖解鎖代碼,請直接到代碼處

在下單場景減庫存時我們一般會將庫存查詢出來,進行庫存的扣除

@GetMapping(value = "order")
public R order() {
    int stock = RedisUtil.getObject("stock", Integer.class);
    if (stock > 0) {
        RedisUtil.set("stock", --stock);
    }
    return R.ok(stock);
}

上述的操作看起來很正常,但是其實是有問題的,試想一下當我們有兩個線程同時訪問這個接口會發生什麼

Thread-1 查詢庫存結果為100

Thread-2 也來查詢庫存,此時Thread-1還沒有執行減少庫存操作,Thread-2 查詢庫存的結果也是100

Thread-1 Set庫存為99

Thread-2 Set庫存為99

這樣就出問題瞭,明天扣瞭兩次庫存,但是庫存僅僅減瞭1次

使用Idea時,我們可以使在斷點處右鍵將Suspend調整為Thread,僅阻斷線程,並使用多個客戶端同時請求接口,即可復現上述過程

多線程調試

2.加鎖解決

synchronized 我們可以用Java提供的synchronized關鍵字將方法分佈式鎖,分佈式鎖的實現方案有很多種, zookeeper,redis,db,這邊我們使用redis來實現以下分佈式鎖

3.分佈式鎖

上述兩個線程同時進行的時候沒有正確扣除庫存正是因為【查詢庫存】和【扣除庫存】不是一個原子操作,我們增加一個鎖的機制,當線程持有鎖的時候才允許進行【查詢庫存】和【扣除庫存】,redis有一個sexNx命令允許當指定的key不存在時才進行set操作,在java中為RedisTemplate的setIfAbsent方法,這個方法保證瞭同時隻能有一個線程set成功,set成功時就表明我們拿到瞭鎖,可以進行原子操作瞭,當我們執行完原子操作時我們也需要將鎖釋放掉,在redis實現中也就是將key刪除,允許下一個線程set值,加鎖和釋放鎖的代碼如下

/**
     * 加鎖
     *
     * @param key   redis主鍵
     * @param value 值
     */
public static boolean lock(String key, String value) {
    final boolean result = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(CacheConstant.LOCK_KEY + key, value));
    if (result) {
        log.info("[redisTemplate redis]設置鎖緩存 緩存  url:{} ", key);
    }
    return result;
}

/**
     * 解鎖
     *
     * @param key redis主鍵
     */
public static boolean unlock(String key) {
    final boolean result = Boolean.TRUE.equals(redisTemplate.delete(CacheConstant.LOCK_KEY + key));
    if (result) {
        log.info("[redisTemplate redis]釋放鎖 緩存  url:{}", key);
    }
    return result;
}

那麼我們將代碼稍微修改一下,來利用鎖來完成接口的改進

@GetMapping(value = "order")
public R order() {
    boolean lock;
    int stock;
    try {
        lock = RedisUtil.lock("stock", "");
        if (!lock) {
            return R.failed("服務繁忙,稍後再試");
        }
        stock = RedisUtil.getObject("stock", Integer.class);
        if (stock > 0) {
            RedisUtil.set("stock", --stock);
        }
    } finally {
        RedisUtil.unlock("stock");
    }
    return R.ok(stock);
}

此時,我們再將斷點放在獲取庫存之後,並先用一個終端請求接口

終端1

然後,我們再從終端2發起請求,可以看到我們終端1沒有結束自己的原子操作時,終端2是無法進行庫存的扣除的

終端2

4.增加失效時間

在上一步中,我們仿佛已經完成瞭需求,同時進行扣除庫存的隻有一個線程,但是試想一下,當線程獲取到鎖之後,服務突然宕機瞭,這時候就算及時重啟機器,那麼鎖也一直得不到釋放,那麼扣除庫存接口始終無法獲取到鎖,這肯定不是我們想要的效果,那麼我們改進一下我們加鎖的方法,增加一下失效時間,即使服務宕機瞭,我們重啟機器之後,鎖也能正常釋放掉不會影響一下個線程獲取到鎖

/**
     * 加鎖
     *
     * @param key   redis主鍵
     * @param value 值
     * @param time  過期時間
     */
public static boolean lock(String key, String value, long time) {
    final boolean result = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(CacheConstant.LOCK_KEY + key, value, time, TimeUnit.SECONDS));
    if (result) {
        log.info("[redisTemplate redis]設置鎖緩存 緩存  url:{} ========緩存時間為{}秒", key, time);
    }
    return result;
}

5.增加線程唯一值

還有一種情況會導致我們可能誤刪除別人的鎖,比如當線程1執行完流程之後準備釋放鎖之時,這時候鎖正好失效瞭,線程2此時獲取到鎖,線程1釋放鎖時並不知道鎖失效瞭,那麼線程1執行釋放操作就會將線程2擁有的鎖釋放掉,這肯定是不對的,那麼我們再對unlock方法改進一下

/**
     * 解鎖
     *
     * @param key redis主鍵
     */
public static boolean unlock(String key, String value) {
    if (Objects.equals(value, redisTemplate.opsForValue().get(CacheConstant.LOCK_KEY))) {
        final boolean result = Boolean.TRUE.equals(redisTemplate.delete(CacheConstant.LOCK_KEY + key));
        if (result) {
            log.info("[redisTemplate redis]釋放鎖 緩存  url:{}", key);
        }
        return result;
    }
    return false;
}

@GetMapping(value = "order")
public R order() {
    boolean lock;
    int stock;
    String uuid = IdUtil.fastUUID();
    try {
        lock = RedisUtil.lock("stock", uuid, 60L);
        if (!lock) {
            return R.failed("服務繁忙,稍後再試");
        }
        stock = RedisUtil.getObject("stock", Integer.class);
        if (stock > 0) {
            RedisUtil.set("stock", --stock);
        }
    } finally {
        // 在此釋放鎖時,判斷鎖是為自己持有才進行釋放
        RedisUtil.unlock("stock", uuid);
    }
    return R.ok(stock);
}

6.Lua腳本

上面我們說瞭為瞭防止誤刪別人的鎖,我們需要在刪除鎖時判斷一下鎖是否為自己持有,那麼問題來瞭,我們這個查詢鎖值和刪除鎖的操作也並不是一個原子操作,也就是說可能你在獲取鎖值時鎖還為自己持有,但是執行刪除時鎖已經不為自己持有瞭,還是會可能誤刪別人的鎖,想要保證釋放鎖的原子性,我們可以通過redis原生支持的lua腳本來實現

/**
     * 解鎖
     *
     * @param key redis主鍵
     * @param value 值
     */
public static boolean unlock(String key, String value) {
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
    Long result = redisTemplate.execute(redisScript, Collections.singletonList(CacheConstant.LOCK_KEY + key), value);
    if (Objects.equals(1L, result)) {
        log.info("[redisTemplate redis]釋放鎖 緩存  url:{}", key);
        return true;
    }
    return false;
}

7.Lua是如何實現原子性的

可以看到Lua腳本的大致意思也是跟我們自己寫的代碼差不多,判斷是否為自己持有如果是才進行刪除,那為什麼Lua腳本可以保證原子性呢

Redis使用同一個Lua解釋器來執行所有命令,同時,Redis保證以一種原子性的方式來執行腳本:當lua腳本在執行的時候,不會有其他腳本和命令同時執行,這種語義類似於 MULTI/EXEC。從別的客戶端的視角來看,一個lua腳本要麼不可見,要麼已經執行完。

然而這也意味著,執行一個較慢的lua腳本是不建議的,由於腳本的開銷非常低,構造一個快速執行的腳本並非難事。但是你要註意到,當你正在執行一個比較慢的腳本時,所以其他的客戶端都無法執行命令。

8.代碼演示

代碼演示

/**
     * 加鎖
     *
     * @param key   redis主鍵
     * @param value 值
     * @param time  過期時間
     */
public static boolean lock(String key, String value, long time) {
    final boolean result = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(CacheConstant.LOCK_KEY + key, value, time, TimeUnit.SECONDS));
    if (result) {
        log.info("[redisTemplate redis]設置鎖緩存 緩存  url:{} ========緩存時間為{}秒", key, time);
    }
    return result;
}

/**
     * 解鎖
     *
     * @param key redis主鍵
     * @param value 值
     */
public static boolean unlock(String key, String value) {
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
    Long result = redisTemplate.execute(redisScript, Collections.singletonList(CacheConstant.LOCK_KEY + key), value);
    if (Objects.equals(1L, result)) {
        log.info("[redisTemplate redis]釋放鎖 緩存  url:{}", key);
        return true;
    }
    return false;
}
@GetMapping(value = "order")
public R order() {
    boolean lock;
    int stock;
    String uuid = IdUtil.fastUUID();
    try {
        lock = RedisUtil.lock("stock", uuid,6000L);
        if (!lock) {
            return R.failed("服務繁忙,稍後再試");
        }
        stock = RedisUtil.getObject("stock", Integer.class);
        if (stock > 0) {
            RedisUtil.set("stock", --stock);
        }
    } finally {
        RedisUtil.unlock("stock", uuid);
    }
    return R.ok(stock);
}

9. 總結

分佈式鎖在使用的過程中還是有挺多的講究的,主要看應用場景例如還需要保證上述流程中可能碰到的鎖失效時間小於代碼執行時間,鎖提前失效的問題,鎖如何保證重入性的問題,歡迎大傢討論

 到此這篇關於SpringBoot RedisTemplate分佈式鎖的項目實戰的文章就介紹到這瞭,更多相關SpringBoot RedisTemplate分佈式鎖內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: