詳解Redis分佈式鎖的原理與實現

前言

在單體應用中,如果我們對共享數據不進行加鎖操作,會出現數據一致性問題,我們的解決辦法通常是加鎖。在分佈式架構中,我們同樣會遇到數據共享操作問題,此時,我們就需要分佈式鎖來解決問題,下面我們一起聊聊使用redis來實現分佈式鎖。

使用場景

  • 庫存超賣 比如 5個筆記本 A 看 準備買3個 B 買2個 C 4個 一下單 3+2+4 =9
  • 防止用戶重復下單
  • MQ消息去重
  • 訂單操作變更

為什麼要使用分佈式鎖

從業務場景來分析,有一個共性,共享資源的競爭,比如庫存商品,用戶,消息,訂單等,這些資源在同一時間點隻能有一個線程去操作,並且在操作期間,禁止其他線程操作。要達到這個效果,就要實現共享資源互斥,共享資源串行化。其實,就是對共享資源加鎖的問題。在單應用(單進程多線程)中使用鎖,我們可以使用synchronize、ReentrantLock等關鍵字,對共享資源進行加鎖。在分佈式應用(多進程多線程)中,分佈式鎖是控制分佈式系統之間同步訪問共享資源的一種方式。

如何使用分佈式鎖

流程圖

分佈式鎖的狀態

  • 客戶端通過競爭獲取鎖才能對共享資源進行操作
  • 當持有鎖的客戶端對共享資源進行操作時
  • 其他客戶端都不可以對這個資源進行操作
  • 直到持有鎖的客戶端完成操作

分佈式鎖的特點

互斥性

在任意時刻,隻有一個客戶端可以持有鎖(排他性)

高可用,具有容錯性

隻要鎖服務集群中的大部分節點正常運行,客戶端就可以進行加鎖解鎖操作

避免死鎖

具備鎖失效機制,鎖在一段時間之後一定會釋放。(正常釋放或超時釋放)

加鎖和解鎖為同一個客戶端

一個客戶端不能釋放其他客戶端加的鎖瞭

分佈式鎖的實現方式(以redis分佈式鎖實現為例)

簡單版本

/**
 * 簡單版本
 * @author:liyajie
 * @createTime:2022/6/22 15:42
 * @version:1.0
 */
public class SimplyRedisLock {
    // Redis分佈式鎖的key
    public static final String REDIS_LOCK = "redis_lock";

    @Autowired
    StringRedisTemplate template;

    public String index(){

        // 每個人進來先要進行加鎖,key值為"redis_lock",value隨機生成
        String value = UUID.randomUUID().toString().replace("-","");
        try{
            // 加鎖
            Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value);
            // 加鎖失敗
            if(!flag){
                return "搶鎖失敗!";
            }
            System.out.println( value+ " 搶鎖成功");
            // 業務邏輯
            String result = template.opsForValue().get("001");
            int total = result == null ? 0 : Integer.parseInt(result);
            if (total > 0) {
                int realTotal = total - 1;
                template.opsForValue().set("001", String.valueOf(realTotal));
                // 如果在搶到所之後,刪除鎖之前,發生瞭異常,鎖就無法被釋放,
                // 釋放鎖操作不能在此操作,要在finally處理
                // template.delete(REDIS_LOCK);
                System.out.println("購買商品成功,庫存還剩:" + realTotal + "件");
                return "購買商品成功,庫存還剩:" + realTotal + "件";
            } else {
                System.out.println("購買商品失敗");
            }
            return "購買商品失敗";
        }finally {
            // 釋放鎖
            template.delete(REDIS_LOCK);
        }
    }   
}

該種實現方案比較簡單,但是有一些問題。假如服務運行期間掛掉瞭,代碼完成瞭加鎖的處理,但是沒用走的finally部分,即鎖沒有釋放,這樣的情況下,鎖是永遠沒法釋放的。於是就有瞭改進版本。

進階版本

/**
 * 進階版本
 * @author:liyajie
 * @createTime:2022/6/22 15:42
 * @version:1.0
 */
public class SimplyRedisLock2 {
    // Redis分佈式鎖的key
    public static final String REDIS_LOCK = "redis_lock";

    @Autowired
    StringRedisTemplate template;

    public String index(){

        // 每個人進來先要進行加鎖,key值為"redis_lock",value隨機生成
        String value = UUID.randomUUID().toString().replace("-","");
        try{
            // 加鎖
            Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS);
            // 加鎖失敗
            if(!flag){
                return "搶鎖失敗!";
            }
            System.out.println( value+ " 搶鎖成功");
            // 業務邏輯
            String result = template.opsForValue().get("001");
            int total = result == null ? 0 : Integer.parseInt(result);
            if (total > 0) {
                int realTotal = total - 1;
                template.opsForValue().set("001", String.valueOf(realTotal));
                // 如果在搶到所之後,刪除鎖之前,發生瞭異常,鎖就無法被釋放,
                // 釋放鎖操作不能在此操作,要在finally處理
                // template.delete(REDIS_LOCK);
                System.out.println("購買商品成功,庫存還剩:" + realTotal + "件");
                return "購買商品成功,庫存還剩:" + realTotal + "件";
            } else {
                System.out.println("購買商品失敗");
            }
            return "購買商品失敗";
        }finally {
            // 釋放鎖
            template.delete(REDIS_LOCK);
        }
    }   
}

這種實現方案,對key增加瞭一個過期時間,這樣即使服務掛掉,到瞭過期時間之後,鎖會自動釋放。但是仔細想想,還是有問題。比如key值的過期時間為10s,但是業務處理邏輯需要15s的時間,這樣就會導致某一個線程處理完業務邏輯之後,在釋放鎖,即刪除key的時候,刪除的key不是自己set的,而是其他線程設置的,這樣就會造成數據的不一致性,引起數據的錯誤,從而影響業務。還需要改進。

進階版本2-誰設置的鎖,誰釋放

/**
 * 進階版本2-誰設置的鎖,誰釋放
 * @author:liyajie
 * @createTime:2022/6/22 15:42
 * @version:1.0
 */
public class SimplyRedisLock3 {
    // Redis分佈式鎖的key
    public static final String REDIS_LOCK = "redis_lock";

    @Autowired
    StringRedisTemplate template;

    public String index(){

        // 每個人進來先要進行加鎖,key值為"redis_lock",value隨機生成
        String value = UUID.randomUUID().toString().replace("-","");
        try{
            // 加鎖
            Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS);
            // 加鎖失敗
            if(!flag){
                return "搶鎖失敗!";
            }
            System.out.println( value+ " 搶鎖成功");
            // 業務邏輯
            String result = template.opsForValue().get("001");
            int total = result == null ? 0 : Integer.parseInt(result);
            if (total > 0) {
                int realTotal = total - 1;
                template.opsForValue().set("001", String.valueOf(realTotal));
                // 如果在搶到所之後,刪除鎖之前,發生瞭異常,鎖就無法被釋放,
                // 釋放鎖操作不能在此操作,要在finally處理
                // template.delete(REDIS_LOCK);
                System.out.println("購買商品成功,庫存還剩:" + realTotal + "件");
                return "購買商品成功,庫存還剩:" + realTotal + "件";
            } else {
                System.out.println("購買商品失敗");
            }
            return "購買商品失敗";
        }finally {
            // 誰加的鎖,誰才能刪除!!!!
            if(template.opsForValue().get(REDIS_LOCK).equals(value)){
                template.delete(REDIS_LOCK);
            }
        }
    }   
}

這種方式解決瞭因業務復雜,處理時間太長,超過瞭過期時間,而釋放瞭別人鎖的問題。還會有其他問題嗎?其實還是有的,finally塊的判斷和del刪除操作不是原子操作,並發的時候也會出問題,並發就是要保證數據的一致性,保證數據的一致性,最好要保證對數據的操作具有原子性。於是還是要改進。

進階版本3-Lua版本

/**
 * 進階版本-Lua版本
 * @author:liyajie
 * @createTime:2022/6/22 15:42
 * @version:1.0
 */
public class SimplyRedisLock3 {
    // Redis分佈式鎖的key
    public static final String REDIS_LOCK = "redis_lock";

    @Autowired
    StringRedisTemplate template;

    public String index(){

        // 每個人進來先要進行加鎖,key值為"redis_lock",value隨機生成
        String value = UUID.randomUUID().toString().replace("-","");
        try{
            // 加鎖
            Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS);
            // 加鎖失敗
            if(!flag){
                return "搶鎖失敗!";
            }
            System.out.println( value+ " 搶鎖成功");
            // 業務邏輯
            String result = template.opsForValue().get("001");
            int total = result == null ? 0 : Integer.parseInt(result);
            if (total > 0) {
                int realTotal = total - 1;
                template.opsForValue().set("001", String.valueOf(realTotal));
                // 如果在搶到所之後,刪除鎖之前,發生瞭異常,鎖就無法被釋放,
                // 釋放鎖操作不能在此操作,要在finally處理
                // template.delete(REDIS_LOCK);
                System.out.println("購買商品成功,庫存還剩:" + realTotal + "件");
                return "購買商品成功,庫存還剩:" + realTotal + "件";
            } else {
                System.out.println("購買商品失敗");
            }
            return "購買商品失敗";
        }finally {
            // 誰加的鎖,誰才能刪除,使用Lua腳本,進行鎖的刪除
            Jedis jedis = null;
            try{
                jedis = RedisUtils.getJedis();
                String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
                        "then " +
                        "return redis.call('del',KEYS[1]) " +
                        "else " +
                        "   return 0 " +
                        "end";

                Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
                if("1".equals(eval.toString())){
                    System.out.println("-----del redis lock ok....");
                }else{
                    System.out.println("-----del redis lock error ....");
                }
            }catch (Exception e){

            }finally {
                if(null != jedis){
                    jedis.close();
                }
            }
        }
    }   
}

這種方式,規定瞭誰上的鎖,誰才能刪除,並且解決瞭刪除操作沒有原子性問題。但還沒有考慮緩存,以及Redis集群部署下,異步復制造成的鎖丟失:主節點沒來得及把剛剛set進來這條數據給從節點,就掛瞭。所以還得改進。

終極進化版

/**
 * 終極進化版
 * @author:liyajie
 * @createTime:2022/6/22 15:42
 * @version:1.0
 */
public class SimplyRedisLock5 {
    // Redis分佈式鎖的key
    public static final String REDIS_LOCK = "redis_lock";

    @Autowired
    StringRedisTemplate template;

    @Autowired
    Redisson redisson;

    public String index(){

        RLock lock = redisson.getLock(REDIS_LOCK);
        lock.lock();
        // 每個人進來先要進行加鎖,key值為"redis_lock"
        String value = UUID.randomUUID().toString().replace("-","");
        try {
            String result = template.opsForValue().get("001");
            int total = result == null ? 0 : Integer.parseInt(result);
            if (total > 0) {
                // 如果在此處需要調用其他微服務,處理時間較長。。。
                int realTotal = total - 1;
                template.opsForValue().set("001", String.valueOf(realTotal));
                System.out.println("購買商品成功,庫存還剩:" + realTotal + "件");
                return "購買商品成功,庫存還剩:" + realTotal + "件";
            } else {
                System.out.println("購買商品失敗");
            }
            return "購買商品失敗";
        }finally {
            if(lock.isLocked() && lock.isHeldByCurrentThread()){
                lock.unlock();
            }
        }
    }   
}

這種實現方案,底層封裝瞭多節點redis實現的分佈式鎖算法,有效防止單點故障,感興趣的可以去研究一下。

總結

分析問題的過程,也是解決問題的過程,也能鍛煉自己編寫代碼時思考問題的方式和角度。

到此這篇關於詳解Redis分佈式鎖的原理與實現的文章就介紹到這瞭,更多相關Redis分佈式鎖內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: