SpringBoot整合Redisson實現分佈式鎖

Redisson是架設在redis基礎上的一個Java駐內存數據網格(In-Memory Data Grid)。充分的利用瞭Redis鍵值數據庫提供的一系列優勢,基於Java實用工具包中常用接口,為使用者提供瞭一系列具有分佈式特性的常用工具類。使得原本作為協調單機多線程並發程序的工具包獲得瞭協調分佈式多機多線程並發系統的能力,大大降低瞭設計和研發大規模分佈式系統的難度。同時結合各富特色的分佈式服務,更進一步簡化瞭分佈式環境中程序相互之間的協作。

Github地址:https://github.com/redisson/redisson

一、添加依賴

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
        <!-- springboot整合redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
 
        <!-- springboot整合redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.13.6</version>
        </dependency>
    </dependencies>

二、redis配置文件

server:
  port: 8000
 
spring:
  redis:
    host: localhost
    port: 6379
    password: null
    database: 1
    timeout: 30000

三、新建配置類

@Configuration
public class MyRedissonConfig {
 
    @Value("${spring.redis.host}")
    String redisHost;
 
    @Value("${spring.redis.port}")
    String redisPort;
 
    @Value("${spring.redis.password}")
    String redisPassword;
 
    @Value("${spring.redis.timeout}")
    Integer redisTimeout;
 
    /**
     * Redisson配置
     * @return
     */
    @Bean
    RedissonClient redissonClient() {
      //1、創建配置
        Config config = new Config();
        
        redisHost = redisHost.startsWith("redis://") ? redisHost : "redis://" + redisHost;
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(redisHost + ":" + redisPort)
                .setTimeout(redisTimeout);
        
        if (StringUtils.isNotBlank(redisPassword)) {
            serverConfig.setPassword(redisPassword);
        }
        
        return Redisson.create(config);
    }
    
}
//單機
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
 
 
//主從
 
Config config = new Config();
config.useMasterSlaveServers()
    .setMasterAddress("127.0.0.1:6379")
    .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
    .addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
 
 
//哨兵
Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
    .addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
 
 
//集群
Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // cluster state scan interval in milliseconds
    .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
    .addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

四、使用分佈式鎖

可重入鎖

基於Redis的Redisson分佈式可重入鎖RLock對象實現瞭java.util.concurrent.locks.Lock接口。

    @RequestMapping("/redisson")
    public String testRedisson(){
        //獲取分佈式鎖,隻要鎖的名字一樣,就是同一把鎖
        RLock lock = redissonClient.getLock("lock");
 
        //加鎖(阻塞等待),默認過期時間是無限期
        lock.lock();
        try{
            //如果業務執行過長,Redisson會自動給鎖續期
            Thread.sleep(1000);
            System.out.println("加鎖成功,執行業務邏輯");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //解鎖,如果業務執行完成,就不會繼續續期
            lock.unlock();
        }
 
        return "Hello Redisson!";
    }

如果拿到分佈式鎖的節點宕機,且這個鎖正好處於鎖住的狀態時,會出現鎖死的狀態,為瞭避免這種情況的發生,鎖都會設置一個過期時間。這樣也存在一個問題,一個線程拿到瞭鎖設置瞭30s超時,在30s後這個線程還沒有執行完畢,鎖超時釋放瞭,就會導致問題,Redisson給出瞭自己的答案,就是 watch dog 自動延期機制。
Redisson提供瞭一個監控鎖的看門狗,它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期,也就是說,如果一個拿到鎖的線程一直沒有完成邏輯,那麼看門狗會幫助線程不斷的延長鎖超時時間,鎖不會因為超時而被釋放。
默認情況下,看門狗的續期時間是30s,也可以通過修改Config.lockWatchdogTimeout來另行指定。
另外Redisson 還提供瞭可以指定leaseTime參數的加鎖方法來指定加鎖的時間。超過這個時間後鎖便自動解開瞭,不會延長鎖的有效期。

在RedissonLock類的renewExpiration()方法中,會啟動一個定時任務每隔30/3=10秒給鎖續期。如果業務執行期間,應用掛瞭,那麼不會自動續期,到過期時間之後,鎖會自動釋放。

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
        ee.setTimeout(task);
    }

另外Redisson還提供瞭leaseTime的參數來指定加鎖的時間。超過這個時間後鎖便自動解開瞭。

// 加鎖以後10秒鐘自動解鎖
// 無需調用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);
 
// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

如果指定瞭鎖的超時時間,底層直接調用lua腳本,進行占鎖。如果超過leaseTime,業務邏輯還沒有執行完成,則直接釋放鎖,所以在指定leaseTime時,要讓leaseTime大於業務執行時間。RedissonLock類的tryLockInnerAsync()方法

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
 
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

讀寫鎖

分佈式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處於加鎖狀態。在讀寫鎖中,讀讀共享、讀寫互斥、寫寫互斥。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

讀寫鎖測試類,當訪問write接口時,read接口會被阻塞住。

@RestController
public class TestController {
 
    @Autowired
    RedissonClient redissonClient;
 
    @Autowired
    StringRedisTemplate redisTemplate;
 
    @RequestMapping("/write")
    public String write(){
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
        RLock writeLock = readWriteLock.writeLock();
        String s = UUID.randomUUID().toString();
        writeLock.lock();
        try {
            redisTemplate.opsForValue().set("wr-lock-key", s);
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            writeLock.unlock();
        }
        return s;
    }
 
    @RequestMapping("/read")
    public String read(){
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
        RLock readLock = readWriteLock.readLock();
        String s = "";
        readLock.lock();
        try {
            s = redisTemplate.opsForValue().get("wr-lock-key");
        } finally {
            readLock.unlock();
        }
        return s;
    }
}

信號量(Semaphore)

基於Redis的Redisson的分佈式信號量(Semaphore)Java對象RSemaphore采用瞭與java.util.concurrent.Semaphore類似的接口和用法

關於信號量的使用你們能夠想象一下這個場景,有三個停車位,當三個停車位滿瞭後,其餘車就不停瞭。能夠把車位比做信號,如今有三個信號,停一次車,用掉一個信號,車離開就是釋放一個信號。

咱們用 Redisson 來演示上述停車位的場景。

先定義一個占用停車位的方法:

/**
* 停車,占用停車位
* 總共 3 個車位
*/
@ResponseBody
@RequestMapping("park")
public String park() throws InterruptedException {
  // 獲取信號量(停車場)
  RSemaphore park = redisson.getSemaphore("park");
  // 獲取一個信號(停車位)
  park.acquire();
 
  return "OK";
}

再定義一個離開車位的方法:

/**
 * 釋放車位
 * 總共 3 個車位
 */
@ResponseBody
@RequestMapping("leave")
public String leave() throws InterruptedException {
    // 獲取信號量(停車場)
    RSemaphore park = redisson.getSemaphore("park");
    // 釋放一個信號(停車位)
    park.release();
 
    return "OK";
}

為瞭簡便,我用 Redis 客戶端添加瞭一個 key:“park”,值等於 3,表明信號量為 park,總共有三個值。

 而後用 postman 發送 park 請求占用一個停車位。

而後在 redis 客戶端查看 park 的值,發現已經改成 2 瞭。繼續調用兩次,發現 park 的等於 0,當調用第四次的時候,會發現請求一直處於等待中,說明車位不夠瞭。若是想要不阻塞,能夠用 tryAcquire 或 tryAcquireAsync。

咱們再調用離開車位的方法,park 的值變為瞭 1,表明車位剩餘 1 個。

註意:屢次執行釋放信號量操做,剩餘信號量會一直增長,而不是到 3 後就封頂瞭。

閉鎖(CountDownLatch)

CountDownLatch作用:某一線程,等待其他線程執行完畢之後,自己再繼續執行。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
 
// 在其他線程或其他JVM裡
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

在TestController中添加測試方法,訪問close接口時,調用await()方法進入阻塞狀態,直到有三次訪問release接口時,close接口才會返回。

    @RequestMapping("/close")
    public String close() throws InterruptedException {
        RCountDownLatch close = redissonClient.getCountDownLatch("close");
        close.trySetCount(3);
        close.await();
        return "close";
    }
 
    @RequestMapping("/release")
    public String release(){
        RCountDownLatch close = redissonClient.getCountDownLatch("close");
        close.countDown();
        return "release";
    }

到此這篇關於SpringBoot整合Redisson實現分佈式鎖的文章就介紹到這瞭,更多相關SpringBoot Redisson分佈式鎖內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: