Redis分佈式鎖如何實現續期

Redis分佈式鎖如何續期

Redis分佈式鎖的正確姿勢

據肥朝瞭解,很多同學在用分佈式鎖時,都是直接百度搜索找一個Redis分佈式鎖工具類就直接用瞭.關鍵是該工具類中還充斥著很多System.out.println();等語句.其實Redis分佈式鎖比較正確的姿勢是采用redisson這個客戶端工具.具體介紹可以搜索最大的同性交友網站github.

如何回答

首先如果你之前用Redis的分佈式鎖的姿勢正確,並且看過相應的官方文檔的話,這個問題So easy.我們來看

在這裡插入圖片描述

坦白說,如果你英文棒棒噠那麼看英文文檔可能更好理解

By default lock watchdog timeout is 30 seconds and can be changed through Config.lockWatchdogTimeout setting.

但是你如果看的是中文文檔

看門狗檢查鎖的超時時間默認是30秒

這句話肥朝從語文角度分析就是一個歧義句,他有兩個意思

1.看門狗默認30秒去檢查一次鎖的超時時間

2.看們狗會去檢查鎖的超時時間,鎖的時間時間默認是30秒

看到這裡,我希望大傢不要黑我的小學體育老師,雖然他和語文老師是同個人.語文不行,我們可以源碼來湊!

源碼分析

我們根據官方文檔給出的例子,寫瞭一個最簡單的demo,例子根據上面截圖中Ctr+C和Ctr+V一波操作,如下

public class DemoMain {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        RLock lock = redisson.getLock("anyLock");
        lock.lock();
        //lock.unlock();
    }
}

create

在這裡插入圖片描述

從這裡我們知道,internalLockLeaseTime 和 lockWatchdogTimeout這兩個參數是相等的.

lockWatchdogTimeout默認值如下

public class Config {	
	private long lockWatchdogTimeout = 30 * 1000;		
	public long getLockWatchdogTimeout() {
		return lockWatchdogTimeout;
	}	
	//省略無關代碼
}

從internalLockLeaseTime這個單詞也可以看出,這個加的分佈式鎖的超時時間默認是30秒.但是還有一個問題,那就是這個看門狗,多久來延長一次有效期呢?我們往下看

lock

在這裡插入圖片描述

從我圖中框起來的地方我們就知道瞭,獲取鎖成功就會開啟一個定時任務,也就是watchdog,定時任務會定期檢查去續期renewExpirationAsync(threadId).
這裡定時用的是netty-common包中的HashedWheelTimer,肥朝公眾號已經和各大搜索引擎建立瞭密切的合作關系,你隻需要把這個類在任何搜索引擎一搜,都能知道相關API參數的意義.
從圖中我們明白,該定時調度每次調用的時間差是internalLockLeaseTime / 3.也就10秒.

真相大白

通過源碼分析我們知道,默認情況下,加鎖的時間是30秒.如果加鎖的業務沒有執行完,那麼到 30-10 = 20秒的時候,就會進行一次續期,把鎖重置成30秒.那這個時候可能又有同學問瞭,那業務的機器萬一宕機瞭呢?宕機瞭定時任務跑不瞭,就續不瞭期,那自然30秒之後鎖就解開瞭唄.

Redis分佈式鎖的5個坑

一、鎖未被釋放

這種情況是一種低級錯誤,就是我上邊犯的錯,由於當前線程 獲取到redis 鎖,處理完業務後未及時釋放鎖,導致其它線程會一直嘗試獲取鎖阻塞,例如:用Jedis客戶端會報如下的錯誤信息

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

redis線程池已經沒有空閑線程來處理客戶端命令。

解決的方法也很簡單,隻要我們細心一點,拿到鎖的線程處理完業務及時釋放鎖,如果是重入鎖未拿到鎖後,線程可以釋放當前連接並且sleep一段時間。

public void lock() {
    while (true) {
        boolean flag = this.getLock(key);
        if (flag) {
              TODO .........
        } else {
              // 釋放當前redis連接
              redis.close();
              // 休眠1000毫秒
             sleep(1000);
       }
     }
 }

二、B的鎖被A給釋放瞭

我們知道Redis實現鎖的原理在於 SETNX命令。當 key不存在時將 key的值設為 value ,返回值為 1;若給定的 key已經存在,則 SETNX不做任何動作,返回值為 0 。

SETNX key value

我們來設想一下這個場景:A、B兩個線程來嘗試給key myLock加鎖,A線程先拿到鎖(假如鎖3秒後過期),B線程就在等待嘗試獲取鎖,到這一點毛病沒有。

那如果此時業務邏輯比較耗時,執行時間已經超過redis鎖過期時間,這時A線程的鎖自動釋放(刪除key),B線程檢測到myLock這個key不存在,執行 SETNX命令也拿到瞭鎖。

但是,此時A線程執行完業務邏輯之後,還是會去釋放鎖(刪除key),這就導致B線程的鎖被A線程給釋放瞭。

為避免上邊的情況,一般我們在每個線程加鎖時要帶上自己獨有的value值來標識,隻釋放指定value的key,否則就會出現釋放鎖混亂的場景。

三、數據庫事務超時

emm~ 聊redis鎖咋還扯到數據庫事務上來瞭?別著急往下看,看下邊這段代碼:

 @Transaction
 public void lock() {
      while (true) {
          boolean flag = this.getLock(key);
          if (flag) {
              insert();
          }
      }
 }

給這個方法添加一個@Transaction註解開啟事務,如代碼中拋出異常進行回滾,要知道數據庫事務可是有超時時間限制的,並不會無條件的一直等一個耗時的數據庫操作。

比如:我們解析一個大文件,再將數據存入到數據庫,如果執行時間太長,就會導致事務超時自動回滾。

一旦你的key長時間獲取不到鎖,獲取鎖等待的時間遠超過數據庫事務超時時間,程序就會報異常。

一般為解決這種問題,我們就需要將數據庫事務改為手動提交、回滾事務。

  @Autowired
  DataSourceTransactionManager dataSourceTransactionManager;
  @Transaction
  public void lock() {
      //手動開啟事務
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      try {
          while (true) {
             boolean flag = this.getLock(key);
             if (flag) {
                 insert();
                 //手動提交事務
                 dataSourceTransactionManager.commit(transactionStatus);
             }
         }
     } catch (Exception e) {
         //手動回滾事務
         dataSourceTransactionManager.rollback(transactionStatus);
     }
 }

四、鎖過期瞭,業務還沒執行完

這種情況和我們上邊提到的第二種比較類似,但解決思路上略有不同。

同樣是redis分佈式鎖過期,而業務邏輯沒執行完的場景,不過,這裡換一種思路想問題,把redis鎖的過期時間再弄長點不就解決瞭嗎?

那還是有問題,我們可以在加鎖的時候,手動調長redis鎖的過期時間,可這個時間多長合適?業務邏輯的執行時間是不可控的,調的過長又會影響操作性能。

要是redis鎖的過期時間能夠自動續期就好瞭。

為瞭解決這個問題我們使用redis客戶端redisson,redisson很好的解決瞭redis在分佈式環境下的一些棘手問題,它的宗旨就是讓使用者減少對Redis的關註,將更多精力用在處理業務邏輯上。

redisson對分佈式鎖做瞭很好封裝,隻需調用API即可。

RLock lock = redissonClient.getLock("stockLock");

redisson在加鎖成功後,會註冊一個定時任務監聽這個鎖,每隔10秒就去查看這個鎖,如果還持有鎖,就對過期時間進行續期。默認過期時間30秒。這個機制也被叫做:“看門狗”,這名字。。。

舉例子:假如加鎖的時間是30秒,過10秒檢查一次,一旦加鎖的業務沒有執行完,就會進行一次續期,把鎖的過期時間再次重置成30秒。

通過分析下邊redisson的源碼實現可以發現,不管是加鎖、解鎖、續約都是客戶端把一些復雜的業務邏輯,通過封裝在Lua腳本中發送給redis,保證這段復雜業務邏輯執行的原子性。

@Slf4j
@Service
public class RedisDistributionLockPlus {
   /**
    * 加鎖超時時間,單位毫秒, 即:加鎖時間內執行完操作,如果未完成會有並發現象
    */
   private static final long DEFAULT_LOCK_TIMEOUT = 30;
  private static final long TIME_SECONDS_FIVE = 5 ;
  /**
   * 每個key的過期時間 {@link LockContent}
   */
  private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
  /**
   * redis執行成功的返回
   */
  private static final Long EXEC_SUCCESS = 1L;
  /**
   * 獲取鎖lua腳本, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:超時時間
   */
  private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
          "if redis.call('exists', KEYS[1]) == 0 then " +
             "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
             "for k, v in pairs(t) do " +
               "if v == 'OK' then return tonumber(ARGV[2]) end " +
             "end " +
          "return 0 end";
  /**
   * 釋放鎖lua腳本, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:業務耗時 arg3: 業務開始設置的timeout
   */
  private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
          "local ctime = tonumber(ARGV[2]) " +
          "local biz_timeout = tonumber(ARGV[3]) " +
          "if ctime > 0 then  " +
             "if redis.call('exists', KEYS[2]) == 1 then " +
                 "local avg_time = redis.call('get', KEYS[2]) " +
                 "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
                 "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
                 "else redis.call('del', KEYS[2]) end " +
             "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
          "end " +
          "return redis.call('del', KEYS[1]) " +
          "else return 0 end";
  /**
   * 續約lua腳本
   */
  private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
  private final StringRedisTemplate redisTemplate;
  public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
      this.redisTemplate = redisTemplate;
      ScheduleTask task = new ScheduleTask(this, lockContentMap);
      // 啟動定時任務
      ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
  }
  /**
   * 加鎖
   * 取到鎖加鎖,取不到鎖一直等待知道獲得鎖
   *
   * @param lockKey
   * @param requestId 全局唯一
   * @param expire   鎖過期時間, 單位秒
   * @return
   */
  public boolean lock(String lockKey, String requestId, long expire) {
      log.info("開始執行加鎖, lockKey ={}, requestId={}", lockKey, requestId);
      for (; ; ) {
          // 判斷是否已經有線程持有鎖,減少redis的壓力
          LockContent lockContentOld = lockContentMap.get(lockKey);
          boolean unLocked = null == lockContentOld;
          // 如果沒有被鎖,就獲取鎖
          if (unLocked) {
              long startTime = System.currentTimeMillis();
              // 計算超時時間
              long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
              String lockKeyRenew = lockKey + "_renew";
              RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
              List<String> keys = new ArrayList<>();
              keys.add(lockKey);
              keys.add(lockKeyRenew);
              Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
              if (null != lockExpire && lockExpire > 0) {
                  // 將鎖放入map
                  LockContent lockContent = new LockContent();
                  lockContent.setStartTime(startTime);
                  lockContent.setLockExpire(lockExpire);
                  lockContent.setExpireTime(startTime + lockExpire * 1000);
                  lockContent.setRequestId(requestId);
                  lockContent.setThread(Thread.currentThread());
                  lockContent.setBizExpire(bizExpire);
                 lockContent.setLockCount(1);
                 lockContentMap.put(lockKey, lockContent);
                 log.info("加鎖成功, lockKey ={}, requestId={}", lockKey, requestId);
                 return true;
             }
         }
         // 重復獲取鎖,在線程池中由於線程復用,線程相等並不能確定是該線程的鎖
         if (Thread.currentThread() == lockContentOld.getThread()
                   && requestId.equals(lockContentOld.getRequestId())){
             // 計數 +1
             lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
             return true;
         }
         // 如果被鎖或獲取鎖失敗,則等待100毫秒
         try {
             TimeUnit.MILLISECONDS.sleep(100);
         } catch (InterruptedException e) {
             // 這裡用lombok 有問題
             log.error("獲取redis 鎖失敗, lockKey ={}, requestId={}", lockKey, requestId, e);
             return false;
         }
     }
 }
 /**
  * 解鎖
  *
  * @param lockKey
  * @param lockValue
  */
 public boolean unlock(String lockKey, String lockValue) {
     String lockKeyRenew = lockKey + "_renew";
     LockContent lockContent = lockContentMap.get(lockKey);
     long consumeTime;
     if (null == lockContent) {
         consumeTime = 0L;
     } else if (lockValue.equals(lockContent.getRequestId())) {
         int lockCount = lockContent.getLockCount();
         // 每次釋放鎖, 計數 -1,減到0時刪除redis上的key
         if (--lockCount > 0) {
             lockContent.setLockCount(lockCount);
             return false;
         }
         consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
     } else {
         log.info("釋放鎖失敗,不是自己的鎖。");
         return false;
     }
     // 刪除已完成key,先刪除本地緩存,減少redis壓力, 分佈式鎖,隻有一個,所以這裡不加鎖
     lockContentMap.remove(lockKey);
     RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
     List<String> keys = new ArrayList<>();
     keys.add(lockKey);
     keys.add(lockKeyRenew);
     Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
             Long.toString(lockContent.getBizExpire()));
     return EXEC_SUCCESS.equals(result);
 }
 /**
  * 續約
  *
  * @param lockKey
  * @param lockContent
  * @return true:續約成功,false:續約失敗(1、續約期間執行完成,鎖被釋放 2、不是自己的鎖,3、續約期間鎖過期瞭(未解決))
  */
 public boolean renew(String lockKey, LockContent lockContent) {
     // 檢測執行業務線程的狀態
     Thread.State state = lockContent.getThread().getState();
     if (Thread.State.TERMINATED == state) {
         log.info("執行業務的線程已終止,不再續約 lockKey ={}, lockContent={}", lockKey, lockContent);
         return false;
     }
     String requestId = lockContent.getRequestId();
     long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
     RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
     List<String> keys = new ArrayList<>();
     keys.add(lockKey);
     Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
     log.info("續約結果,True成功,False失敗 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
     return EXEC_SUCCESS.equals(result);
 }
 static class ScheduleExecutor {
     public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
         long delay = unit.toMillis(initialDelay);
         long period_ = unit.toMillis(period);
         // 定時執行
         new Timer("Lock-Renew-Task").schedule(task, delay, period_);
     }
 }
 static class ScheduleTask extends TimerTask {
     private final RedisDistributionLockPlus redisDistributionLock;
     private final Map<String, LockContent> lockContentMap;
     public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
         this.redisDistributionLock = redisDistributionLock;
         this.lockContentMap = lockContentMap;
     }
     @Override
     public void run() {
         if (lockContentMap.isEmpty()) {
             return;
         }
         Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
         for (Map.Entry<String, LockContent> entry : entries) {
             String lockKey = entry.getKey();
             LockContent lockContent = entry.getValue();
             long expireTime = lockContent.getExpireTime();
             // 減少線程池中任務數量
             if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                 //線程池異步續約
                 ThreadPool.submit(() -> {
                     boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                     if (renew) {
                         long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                         lockContent.setExpireTime(expireTimeNew);
                     } else {
                         // 續約失敗,說明已經執行完 OR redis 出現問題
                         lockContentMap.remove(lockKey);
         
           }
                 });
             }
         }
     }
 }
}

五、redis主從復制的坑

redis高可用最常見的方案就是主從復制(master-slave),這種模式也給redis分佈式鎖挖瞭一坑。

redis cluster集群環境下,假如現在A客戶端想要加鎖,它會根據路由規則選擇一臺master節點寫入key mylock,在加鎖成功後,master節點會把key異步復制給對應的slave節點。

如果此時redis master節點宕機,為保證集群可用性,會進行主備切換,slave變為瞭redis master。B客戶端在新的master節點上加鎖成功,而A客戶端也以為自己還是成功加瞭鎖的。

此時就會導致同一時間內多個客戶端對一個分佈式鎖完成瞭加鎖,導致各種臟數據的產生。

至於解決辦法嘛,目前看還沒有什麼根治的方法,隻能盡量保證機器的穩定性,減少發生此事件的概率。

小結一下:上面就是我在使用Redis 分佈式鎖時遇到的一些坑,有點小感慨,經常用一個方法填上這個坑,沒多久就發現另一個坑又出來瞭,其實根本沒有什麼十全十美的解決方案,哪有什麼銀彈,隻不過是在權衡利弊後,選一個在接受范圍內的折中方案而已。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: