Redis分佈式鎖升級版RedLock及SpringBoot實現方法
分佈式鎖概覽
在多線程的環境下,為瞭保證一個代碼塊在同一時間隻能由一個線程訪問,Java中我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式。但是現在公司都是流行分佈式架構,在分佈式環境下,如何保證不同節點的線程同步執行呢?因此就引出瞭分佈式鎖,它是控制分佈式系統之間互斥訪問共享資源的一種方式。
在一個分佈式系統中,多臺機器上部署瞭多個服務,當客戶端一個用戶發起一個數據插入請求時,如果沒有分佈式鎖機制保證,那麼那多臺機器上的多個服務可能進行並發插入操作,導致數據重復插入,對於某些不允許有多餘數據的業務來說,這就會造成問題。而分佈式鎖機制就是為瞭解決類似這類問題,保證多個服務之間互斥的訪問共享資源,如果一個服務搶占瞭分佈式鎖,其他服務沒獲取到鎖,就不進行後續操作。大致意思如下圖所示:
分佈式鎖的特點
分佈式鎖一般有如下的特點:
- 互斥性: 同一時刻隻能有一個線程持有鎖
- 可重入性: 同一節點上的同一個線程如果獲取瞭鎖之後能夠再次獲取鎖
- 鎖超時:和J.U.C中的鎖一樣支持鎖超時,防止死鎖
- 高性能和高可用: 加鎖和解鎖需要高效,同時也需要保證高可用,防止分佈式鎖失效
- 具備阻塞和非阻塞性:能夠及時從阻塞狀態中被喚醒
分佈式鎖的實現方式
我們一般實現分佈式鎖有以下幾種方式:
- 基於數據庫
- 基於Redis
- 基於zookeeper
Redis普通分佈式鎖存在的問題
說到Redis分佈式鎖,大部分人都會想到:setnx+lua
(redis保證執行lua腳本時不執行其他操作,保證操作的原子性),或者知道set key value px milliseconds nx
。後一種方式的核心實現命令如下:
- 獲取鎖(unique_value可以是UUID等) SET resource_name unique_value NX PX 30000 - 釋放鎖(lua腳本中,一定要比較value,防止誤解鎖) if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
這種實現方式有3大要點(也是面試概率非常高的地方):
- set命令要用
set key value px milliseconds nx
; - value要具有唯一性;
- 釋放鎖時要驗證value值,不能誤解鎖;
事實上這類鎖最大的缺點就是它加鎖時隻作用在一個Redis節點上,即使Redis通過sentinel保證高可用,如果這個master節點由於某些原因發生瞭主從切換,那麼就會出現鎖丟失的情況:
- 在Redis的master節點上拿到瞭鎖;
- 但是這個加鎖的key還沒有同步到slave節點;
- master故障,發生故障轉移,slave節點升級為master節點;
- 導致鎖丟失。
為瞭避免單點故障問題,Redis作者antirez基於分佈式環境下提出瞭一種更高級的分佈式鎖的實現方式:Redlock。Redlock也是Redis所有分佈式鎖實現方式中唯一能讓面試官高潮的方式。
Redis高級分佈式鎖:Redlock
antirez提出的redlock算法大概是這樣的:
在Redis的分佈式環境中,我們假設有N個Redis master。這些節點完全互相獨立,不存在主從復制或者其他集群協調機制。我們確保將在N個實例上使用與在Redis單實例下相同方法獲取和釋放鎖。現在我們假設有5個Redis master節點,同時我們需要在5臺服務器上面運行這些Redis實例,這樣保證他們不會同時都宕掉。
為瞭取到鎖,客戶端應該執行以下操作:
- 獲取當前Unix時間,以毫秒為單位。
- 依次嘗試從5個實例,使用相同的key和具有唯一性的value(例如UUID)獲取鎖。當向Redis請求獲取鎖時,客戶端應該設置一個網絡連接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間TTL為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免服務器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試去另外一個Redis實例請求獲取鎖。
- 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(N/2+1,這裡是3個節點)的Redis節點都取到鎖,並且使用的時間小於鎖失效時間時,鎖才算獲取成功。
- 如果取到瞭鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)。
- 如果因為某些原因,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過瞭有效時間),客戶端應該在所有的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功,防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。
- 此處不討論時鐘漂移
Redlock源碼
redisson已經有對redlock算法封裝,接下來對其用法進行簡單介紹,並對核心源碼進行分析(假設5個redis實例)。
1. Redlock依賴
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.3.2</version> </dependency>
2. Redlock用法
首先,我們來看一下redission封裝的redlock算法實現的分佈式鎖用法,非常簡單,跟重入鎖(ReentrantLock)有點類似:
Config config = new Config(); config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389") .setMasterName("masterName") .setPassword("password").setDatabase(0); RedissonClient redissonClient = Redisson.create(config); // 還可以getFairLock(), getReadWriteLock() RLock redLock = redissonClient.getLock("REDLOCK_KEY"); boolean isLock; try { isLock = redLock.tryLock(); // 500ms拿不到鎖, 就認為獲取鎖失敗。10000ms即10s是鎖失效時間。 isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS); if (isLock) { //TODO if get lock success, do something; } } catch (Exception e) { } finally { // 無論如何, 最後都要解鎖 redLock.unlock(); }
3. Redlock唯一ID
實現分佈式鎖的一個非常重要的點就是set的value要具有唯一性,redisson的value是怎樣保證value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源碼在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID(); String getLockName(long threadId) { return id + ":" + threadId; }
4. Redlock獲取鎖
獲取鎖的代碼為redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),兩者的最終核心源碼都是下面這段代碼,隻不過前者獲取鎖的默認租約時間(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 獲取鎖時向5個redis實例發送的命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 首先分佈式鎖的KEY不能存在,如果確實不存在,那麼執行hset命令(hset REDLOCK_KEY uuid+threadId 1),並通過pexpire設置失效時間(也是鎖的租約時間) "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果分佈式鎖的KEY已經存在,並且value也匹配,表示是當前線程持有的鎖,那麼重入次數加1,並且設置失效時間 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 獲取分佈式鎖的KEY的失效時間毫秒數 "return redis.call('pttl', KEYS[1]);", // 這三個參數分別對應KEYS[1],ARGV[1]和ARGV[2] Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
獲取鎖的命令中,
- KEYS[1]就是Collections.singletonList(getName()),表示分佈式鎖的key,即REDLOCK_KEY;
- ARGV[1]就是internalLockLeaseTime,即鎖的租約時間,默認30s;
- ARGV[2]就是getLockName(threadId),是獲取鎖時set的唯一值,即UUID+threadId:
5. Redlock釋放鎖
釋放鎖的代碼為redLock.unlock(),核心源碼如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 向5個redis實例都執行如下命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果分佈式鎖KEY不存在,那麼向channel發佈一條消息 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // 如果分佈式鎖存在,但是value不匹配,表示鎖已經被占用,那麼直接返回 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 如果就是當前線程占有分佈式鎖,那麼將重入次數減1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 重入次數減1後的值如果大於0,表示分佈式鎖有重入過,那麼隻設置失效時間,還不能刪除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // 重入次數減1後的值如果為0,表示分佈式鎖隻獲取過1次,那麼刪除這個KEY,並發佈解鎖消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", // 這5個參數分別對應KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
Redis實現的分佈式鎖輪子
下面利用SpringBoot + Jedis + AOP的組合來實現一個簡易的分佈式鎖。
1. 自定義註解
自定義一個註解,被註解的方法會執行獲取分佈式鎖的邏輯
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RedisLock { /** * 業務鍵 * * @return */ String key(); /** * 鎖的過期秒數,默認是5秒 * * @return */ int expire() default 5; /** * 嘗試加鎖,最多等待時間 * * @return */ long waitTime() default Long.MIN_VALUE; /** * 鎖的超時時間單位 * * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
2. AOP攔截器實現
在AOP中我們去執行獲取分佈式鎖和釋放分佈式鎖的邏輯,代碼如下:
@Aspect @Component public class LockMethodAspect { @Autowired private RedisLockHelper redisLockHelper; @Autowired private JedisUtil jedisUtil; private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class); @Around("@annotation(com.redis.lock.annotation.RedisLock)") public Object around(ProceedingJoinPoint joinPoint) { Jedis jedis = jedisUtil.getJedis(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); RedisLock redisLock = method.getAnnotation(RedisLock.class); String value = UUID.randomUUID().toString(); String key = redisLock.key(); try { final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit()); logger.info("isLock : {}",islock); if (!islock) { logger.error("獲取鎖失敗"); throw new RuntimeException("獲取鎖失敗"); } try { return joinPoint.proceed(); } catch (Throwable throwable) { throw new RuntimeException("系統異常"); } } finally { logger.info("釋放鎖"); redisLockHelper.unlock(jedis,key, value); jedis.close(); } } }
3. Redis實現分佈式鎖核心類
@Component public class RedisLockHelper { private long sleepTime = 100; /** * 直接使用setnx + expire方式獲取分佈式鎖 * 非原子性 * * @param key * @param value * @param timeout * @return */ public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) { Long result = jedis.setnx(key, value); // result = 1時,設置成功,否則設置失敗 if (result == 1L) { return jedis.expire(key, timeout) == 1L; } else { return false; } } /** * 使用Lua腳本,腳本中使用setnex+expire命令進行加鎖操作 * * @param jedis * @param key * @param UniqueId * @param seconds * @return */ public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) { String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; List<String> keys = new ArrayList<>(); List<String> values = new ArrayList<>(); keys.add(key); values.add(UniqueId); values.add(String.valueOf(seconds)); Object result = jedis.eval(lua_scripts, keys, values); //判斷是否成功 return result.equals(1L); } /** * 在Redis的2.6.12及以後中,使用 set key value [NX] [EX] 命令 * * @param key * @param value * @param timeout * @return */ public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) { long seconds = timeUnit.toSeconds(timeout); return "OK".equals(jedis.set(key, value, "NX", "EX", seconds)); } /** * 自定義獲取鎖的超時時間 * * @param jedis * @param key * @param value * @param timeout * @param waitTime * @param timeUnit * @return * @throws InterruptedException */ public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException { long seconds = timeUnit.toSeconds(timeout); while (waitTime >= 0) { String result = jedis.set(key, value, "nx", "ex", seconds); if ("OK".equals(result)) { return true; } waitTime -= sleepTime; Thread.sleep(sleepTime); } return false; } /** * 錯誤的解鎖方法—直接刪除key * * @param key */ public void unlock_with_del(Jedis jedis,String key) { jedis.del(key); } /** * 使用Lua腳本進行解鎖操縱,解鎖的時候驗證value值 * * @param jedis * @param key * @param value * @return */ public boolean unlock(Jedis jedis,String key,String value) { String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " + "return redis.call('del',KEYS[1]) else return 0 end"; return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L); } }
4. Controller層控制
定義一個TestController來測試我們實現的分佈式鎖
@RestController public class TestController { @RedisLock(key = "redis_lock") @GetMapping("/index") public String index() { return "index"; } }
站在巨人的肩膀上
1.Redlock:Redis分佈式鎖最牛逼的實現
2.基於Redis的分佈式鎖實現
到此這篇關於Redis分佈式鎖升級版RedLock及SpringBoot實現的文章就介紹到這瞭,更多相關Redis分佈式鎖RedLock內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Redis分佈式鎖Redlock的實現
- 用Go+Redis實現分佈式鎖的示例代碼
- redis分佈式鎖的8大坑總結梳理
- 關於SpringBoot 使用 Redis 分佈式鎖解決並發問題
- 帶你輕松掌握Redis分佈式鎖