關於SpringBoot 使用 Redis 分佈式鎖解決並發問題
問題背景
現在的應用程序架構中,很多服務都是多副本運行,從而保證服務的穩定性。一個服務實例掛瞭,其他服務依舊可以接收請求。但是服務的多副本運行隨之也會引來一些分佈式問題,比如某個接口的處理邏輯是這樣的:接收到請求後,先查詢 DB 看是否有相關的數據,如果沒有則插入數據,如果有則更新數據。在這種場景下如果相同的 N 個請求並發發到後端服務實例,就會出現重復插入數據的情況:
解決方案
針對上面問題,一般的解決方案是使用分佈式鎖來解決。同一個進程內的話用本進程內的鎖即可解決,但是服務多實例部署的話是分佈式的,各自進程獨立,這種情況下可以設置一個全局獲取鎖的地方,各個進程都可以通過某種方式獲取這個全局鎖,獲得到鎖後就可以執行相關業務邏輯代碼,沒有拿到鎖則跳過不執行,這個全局鎖就是我們所說的分佈式鎖。分佈式鎖一般有三種實現方式:1. 數據庫樂觀鎖;2. 基於Redis的分佈式鎖;3. 基於ZooKeeper的分佈式鎖。
我們這裡介紹如何基於 Redis 的分佈式鎖來解決分佈式並發問題:Redis 充當獲取全局鎖的地方,每個實例在接收到請求的時候首先從 Redis 獲取鎖,獲取到鎖後執行業務邏輯代碼,沒爭搶到鎖則放棄執行。
主要實現原理:
Redis 鎖主要利用 Redis 的 setnx 命令:
加鎖命令:SETNX key value,當鍵不存在時,對鍵進行設置操作並返回成功,否則返回失敗。KEY 是鎖的唯一標識,一般按業務來決定命名。Value 一般用 UUID 標識,確保鎖不被誤解。
解鎖命令:DEL key,通過刪除鍵值對釋放鎖,以便其他線程可以通過 SETNX 命令來獲取鎖。
鎖超時:EXPIRE key timeout, 設置 key 的超時時間,以保證即使鎖沒有被顯式釋放,鎖也可以在一定時間後自動釋放,避免資源被永遠鎖住。
可靠性:
為瞭確保分佈式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:
- 互斥性。在任意時刻,保證隻有一臺機器的一個線程可以持有鎖;
- 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他客戶端能加鎖;
- 具備非阻塞性。一旦獲取不到鎖就立刻返回加鎖失敗;
- 加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解瞭;
SpringBoot 集成使用 Redis 分佈式鎖
寫瞭一個 RedisLock 工具類,用於業務邏輯執行前加鎖和業務邏輯執行完解鎖操作。這裡的加鎖操作可能實現的不是很完善,有加鎖和鎖過期兩個操作原子性問題,如果 SpringBoot 版本是2.x的話是可以用註釋中的代碼在加鎖的時候同時設置鎖過期時間,如果 SpringBoot 版本是2.x以下的話建議使用 Lua 腳本來確保操作的原子性,這裡為瞭簡單就先這樣寫:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; /** * @description: Redis分佈式鎖實現工具類 * @author: qianghaohao * @time: 2021/7/19 */ @Component public class RedisLock { @Autowired StringRedisTemplate redisTemplate; /** * 獲取鎖 * * @param lockKey 鎖 * @param identity 身份標識(保證鎖不會被其他人釋放) * @param expireTime 鎖的過期時間(單位:秒) * @return */ public boolean lock(String lockKey, String identity, long expireTime) { // 由於我們目前 springboot 版本比較低,1.5.9,因此還不支持下面這種寫法 // return redisTemplate.opsForValue().setIfAbsent(lockKey, identity, expireTime, TimeUnit.SECONDS); if (redisTemplate.opsForValue().setIfAbsent(lockKey, identity)) { redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS); return true; } return false; } /** * 釋放鎖 * * @param lockKey 鎖 * @param identity 身份標識(保證鎖不會被其他人釋放) * @return */ public boolean releaseLock(String lockKey, String identity) { String luaScript = "if " + " redis.call('get', KEYS[1]) == ARGV[1] " + "then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(); redisScript.setResultType(Boolean.class); redisScript.setScriptText(luaScript); List<String> keys = new ArrayList<>(); keys.add(lockKey); Object result = redisTemplate.execute(redisScript, keys, identity); return (boolean) result; } }
使用示例
這裡隻貼出關鍵的使用代碼,註意:鎖的 key 根據自己的業務邏輯命名,能唯一標示同一個請求即可。value 這裡設置為 UUID,為瞭確保釋放鎖的時候能正確釋放(隻釋放自己加的鎖)。
@Autowired private RedisLock redisLock; // redis 分佈式鎖
String redisLockKey = String.format("%s:docker-image:%s", REDIS_LOCK_PREFIX, imageVo.getImageRepository()); String redisLockValue = UUID.randomUUID().toString(); try { if (!redisLock.lock(redisLockKey, redisLockValue, REDIS_LOCK_TIMEOUT)) { logger.info("redisLockKey [" + redisLockKey + "] 已存在,不執行鏡像插入和更新"); result.setMessage("新建鏡像頻繁,稍後重試,鎖占用"); return result; } ... // 執行業務邏輯 catch (Execpion e) { ... // 異常處理 } finally { // 釋放鎖 if (!redisLock.releaseLock(redisLockKey, redisLockValue)) { logger.error("釋放redis鎖 [" + redisLockKey + "] 失敗); } else { logger.error("釋放redis鎖 [" + redisLockKey + "] 成功"); } }
參考文檔
https://www.jianshu.com/p/6c2f85e2c586
https://xiaomi-info.github.io/2019/12/17/redis-distributed-lock/
到此這篇關於SpringBoot 使用 Redis 分佈式鎖解決並發問題的文章就介紹到這瞭,更多相關SpringBoot Redis 分佈式鎖內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 詳解RedisTemplate下Redis分佈式鎖引發的系列問題
- redis深入淺出分佈式鎖實現下篇
- SpringBoot RedisTemplate分佈式鎖的項目實戰
- 分佈式面試分佈式鎖實現及應用場景
- Spring Boot 防止接口惡意刷新和暴力請求的實現