詳解Redis分佈式鎖的原理與實現
前言
在單體應用中,如果我們對共享數據不進行加鎖操作,會出現數據一致性問題,我們的解決辦法通常是加鎖。在分佈式架構中,我們同樣會遇到數據共享操作問題,此時,我們就需要分佈式鎖來解決問題,下面我們一起聊聊使用redis來實現分佈式鎖。
使用場景
- 庫存超賣 比如 5個筆記本 A 看 準備買3個 B 買2個 C 4個 一下單 3+2+4 =9
- 防止用戶重復下單
- MQ消息去重
- 訂單操作變更
為什麼要使用分佈式鎖
從業務場景來分析,有一個共性,共享資源的競爭,比如庫存商品,用戶,消息,訂單等,這些資源在同一時間點隻能有一個線程去操作,並且在操作期間,禁止其他線程操作。要達到這個效果,就要實現共享資源互斥,共享資源串行化。其實,就是對共享資源加鎖的問題。在單應用(單進程多線程)中使用鎖,我們可以使用synchronize、ReentrantLock等關鍵字,對共享資源進行加鎖。在分佈式應用(多進程多線程)中,分佈式鎖是控制分佈式系統之間同步訪問共享資源的一種方式。
如何使用分佈式鎖
流程圖
分佈式鎖的狀態
- 客戶端通過競爭獲取鎖才能對共享資源進行操作
- 當持有鎖的客戶端對共享資源進行操作時
- 其他客戶端都不可以對這個資源進行操作
- 直到持有鎖的客戶端完成操作
分佈式鎖的特點
互斥性
在任意時刻,隻有一個客戶端可以持有鎖(排他性)
高可用,具有容錯性
隻要鎖服務集群中的大部分節點正常運行,客戶端就可以進行加鎖解鎖操作
避免死鎖
具備鎖失效機制,鎖在一段時間之後一定會釋放。(正常釋放或超時釋放)
加鎖和解鎖為同一個客戶端
一個客戶端不能釋放其他客戶端加的鎖瞭
分佈式鎖的實現方式(以redis分佈式鎖實現為例)
簡單版本
/** * 簡單版本 * @author:liyajie * @createTime:2022/6/22 15:42 * @version:1.0 */ public class SimplyRedisLock { // Redis分佈式鎖的key public static final String REDIS_LOCK = "redis_lock"; @Autowired StringRedisTemplate template; public String index(){ // 每個人進來先要進行加鎖,key值為"redis_lock",value隨機生成 String value = UUID.randomUUID().toString().replace("-",""); try{ // 加鎖 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value); // 加鎖失敗 if(!flag){ return "搶鎖失敗!"; } System.out.println( value+ " 搶鎖成功"); // 業務邏輯 String result = template.opsForValue().get("001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { int realTotal = total - 1; template.opsForValue().set("001", String.valueOf(realTotal)); // 如果在搶到所之後,刪除鎖之前,發生瞭異常,鎖就無法被釋放, // 釋放鎖操作不能在此操作,要在finally處理 // template.delete(REDIS_LOCK); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件"); return "購買商品成功,庫存還剩:" + realTotal + "件"; } else { System.out.println("購買商品失敗"); } return "購買商品失敗"; }finally { // 釋放鎖 template.delete(REDIS_LOCK); } } }
該種實現方案比較簡單,但是有一些問題。假如服務運行期間掛掉瞭,代碼完成瞭加鎖的處理,但是沒用走的finally部分,即鎖沒有釋放,這樣的情況下,鎖是永遠沒法釋放的。於是就有瞭改進版本。
進階版本
/** * 進階版本 * @author:liyajie * @createTime:2022/6/22 15:42 * @version:1.0 */ public class SimplyRedisLock2 { // Redis分佈式鎖的key public static final String REDIS_LOCK = "redis_lock"; @Autowired StringRedisTemplate template; public String index(){ // 每個人進來先要進行加鎖,key值為"redis_lock",value隨機生成 String value = UUID.randomUUID().toString().replace("-",""); try{ // 加鎖 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS); // 加鎖失敗 if(!flag){ return "搶鎖失敗!"; } System.out.println( value+ " 搶鎖成功"); // 業務邏輯 String result = template.opsForValue().get("001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { int realTotal = total - 1; template.opsForValue().set("001", String.valueOf(realTotal)); // 如果在搶到所之後,刪除鎖之前,發生瞭異常,鎖就無法被釋放, // 釋放鎖操作不能在此操作,要在finally處理 // template.delete(REDIS_LOCK); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件"); return "購買商品成功,庫存還剩:" + realTotal + "件"; } else { System.out.println("購買商品失敗"); } return "購買商品失敗"; }finally { // 釋放鎖 template.delete(REDIS_LOCK); } } }
這種實現方案,對key增加瞭一個過期時間,這樣即使服務掛掉,到瞭過期時間之後,鎖會自動釋放。但是仔細想想,還是有問題。比如key值的過期時間為10s,但是業務處理邏輯需要15s的時間,這樣就會導致某一個線程處理完業務邏輯之後,在釋放鎖,即刪除key的時候,刪除的key不是自己set的,而是其他線程設置的,這樣就會造成數據的不一致性,引起數據的錯誤,從而影響業務。還需要改進。
進階版本2-誰設置的鎖,誰釋放
/** * 進階版本2-誰設置的鎖,誰釋放 * @author:liyajie * @createTime:2022/6/22 15:42 * @version:1.0 */ public class SimplyRedisLock3 { // Redis分佈式鎖的key public static final String REDIS_LOCK = "redis_lock"; @Autowired StringRedisTemplate template; public String index(){ // 每個人進來先要進行加鎖,key值為"redis_lock",value隨機生成 String value = UUID.randomUUID().toString().replace("-",""); try{ // 加鎖 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS); // 加鎖失敗 if(!flag){ return "搶鎖失敗!"; } System.out.println( value+ " 搶鎖成功"); // 業務邏輯 String result = template.opsForValue().get("001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { int realTotal = total - 1; template.opsForValue().set("001", String.valueOf(realTotal)); // 如果在搶到所之後,刪除鎖之前,發生瞭異常,鎖就無法被釋放, // 釋放鎖操作不能在此操作,要在finally處理 // template.delete(REDIS_LOCK); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件"); return "購買商品成功,庫存還剩:" + realTotal + "件"; } else { System.out.println("購買商品失敗"); } return "購買商品失敗"; }finally { // 誰加的鎖,誰才能刪除!!!! if(template.opsForValue().get(REDIS_LOCK).equals(value)){ template.delete(REDIS_LOCK); } } } }
這種方式解決瞭因業務復雜,處理時間太長,超過瞭過期時間,而釋放瞭別人鎖的問題。還會有其他問題嗎?其實還是有的,finally塊的判斷和del刪除操作不是原子操作,並發的時候也會出問題,並發就是要保證數據的一致性,保證數據的一致性,最好要保證對數據的操作具有原子性。於是還是要改進。
進階版本3-Lua版本
/** * 進階版本-Lua版本 * @author:liyajie * @createTime:2022/6/22 15:42 * @version:1.0 */ public class SimplyRedisLock3 { // Redis分佈式鎖的key public static final String REDIS_LOCK = "redis_lock"; @Autowired StringRedisTemplate template; public String index(){ // 每個人進來先要進行加鎖,key值為"redis_lock",value隨機生成 String value = UUID.randomUUID().toString().replace("-",""); try{ // 加鎖 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS); // 加鎖失敗 if(!flag){ return "搶鎖失敗!"; } System.out.println( value+ " 搶鎖成功"); // 業務邏輯 String result = template.opsForValue().get("001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { int realTotal = total - 1; template.opsForValue().set("001", String.valueOf(realTotal)); // 如果在搶到所之後,刪除鎖之前,發生瞭異常,鎖就無法被釋放, // 釋放鎖操作不能在此操作,要在finally處理 // template.delete(REDIS_LOCK); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件"); return "購買商品成功,庫存還剩:" + realTotal + "件"; } else { System.out.println("購買商品失敗"); } return "購買商品失敗"; }finally { // 誰加的鎖,誰才能刪除,使用Lua腳本,進行鎖的刪除 Jedis jedis = null; try{ jedis = RedisUtils.getJedis(); String script = "if redis.call('get',KEYS[1]) == ARGV[1] " + "then " + "return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end"; Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value)); if("1".equals(eval.toString())){ System.out.println("-----del redis lock ok...."); }else{ System.out.println("-----del redis lock error ...."); } }catch (Exception e){ }finally { if(null != jedis){ jedis.close(); } } } } }
這種方式,規定瞭誰上的鎖,誰才能刪除,並且解決瞭刪除操作沒有原子性問題。但還沒有考慮緩存,以及Redis集群部署下,異步復制造成的鎖丟失:主節點沒來得及把剛剛set進來這條數據給從節點,就掛瞭。所以還得改進。
終極進化版
/** * 終極進化版 * @author:liyajie * @createTime:2022/6/22 15:42 * @version:1.0 */ public class SimplyRedisLock5 { // Redis分佈式鎖的key public static final String REDIS_LOCK = "redis_lock"; @Autowired StringRedisTemplate template; @Autowired Redisson redisson; public String index(){ RLock lock = redisson.getLock(REDIS_LOCK); lock.lock(); // 每個人進來先要進行加鎖,key值為"redis_lock" String value = UUID.randomUUID().toString().replace("-",""); try { String result = template.opsForValue().get("001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { // 如果在此處需要調用其他微服務,處理時間較長。。。 int realTotal = total - 1; template.opsForValue().set("001", String.valueOf(realTotal)); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件"); return "購買商品成功,庫存還剩:" + realTotal + "件"; } else { System.out.println("購買商品失敗"); } return "購買商品失敗"; }finally { if(lock.isLocked() && lock.isHeldByCurrentThread()){ lock.unlock(); } } } }
這種實現方案,底層封裝瞭多節點redis實現的分佈式鎖算法,有效防止單點故障,感興趣的可以去研究一下。
總結
分析問題的過程,也是解決問題的過程,也能鍛煉自己編寫代碼時思考問題的方式和角度。
到此這篇關於詳解Redis分佈式鎖的原理與實現的文章就介紹到這瞭,更多相關Redis分佈式鎖內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- SpringBoot集成Redis並實現主從架構的實踐
- 帶你輕松掌握Redis分佈式鎖
- 詳解redis分佈式鎖(優化redis分佈式鎖的過程及Redisson使用)
- Redis鎖完美解決高並發秒殺問題
- Java經典面試題最全匯總208道(六)