Redisson分佈式信號量RSemaphore的使用超詳細講解

本篇文章基於redisson-3.17.6版本源碼進行分析

一、RSemaphore的使用

@Test
public void testRSemaphore() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    RedissonClient redissonClient = Redisson.create(config);
    RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
    // 設置5個許可,模擬五個停車位
    rSemaphore.trySetPermits(5);
    // 創建10個線程,模擬10輛車過來停車
    for (int i = 1; i <= 10; i++) {
        new Thread(() -> {
            try {
                rSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "進入停車場...");
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
                System.out.println(Thread.currentThread().getName() + "離開停車場...");
                rSemaphore.release();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "A" + i).start();
    }
    try {
        TimeUnit.MINUTES.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

二、RSemaphore設置許可數量

初始化RSemaphore,需要調用trySetPermits()設置許可數量:

/**
 * 嘗試設置許可數量,設置成功,返回true,否則返回false
 */
boolean trySetPermits(int permits);

trySetPermits()內部調用瞭trySetPermitsAsync():

// 異步設置許可
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 判斷分佈式信號量的key是否存在,如果不存在,才設置
            "local value = redis.call('get', KEYS[1]); " +
                    "if (value == false) then "
                    // set "semaphore" permits
                    // 使用String數據結構設置信號量的許可數
                    + "redis.call('set', KEYS[1], ARGV[1]); "
                    // 發佈一條消息到redisson_sc:{semaphore}通道
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    // 設置成功,返回1
                    + "return 1;"
                    + "end;"
                    // 否則返回0
                    + "return 0;",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(r -> {
            if (r) {
                log.debug("permits set, permits: {}, name: {}", permits, getName());
            } else {
                log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
            }
        });
    }
    return future;
}

可以看到,設置許可數量底層使用LUA腳本,實際上就是使用redis的String數據結構,保存瞭我們指定的許可數量。如下圖:

參數說明:

  • KEYS[1]: 我們指定的分佈式信號量key,例如redissonClient.getSemaphore("semaphore")中的"semaphore")
  • KEYS[2]: 釋放鎖的channel名稱,redisson_sc:{分佈式信號量key},在本例中,就是redisson_sc:{semaphore}
  • ARGV[1]: 設置的許可數量

總結設置許可執行流程為:

  • get semaphore,獲取到semaphore信號量的當前的值
  • 第一次數據為0, 然後使用set semaphore 3,將這個信號量同時能夠允許獲取鎖的客戶端的數量設置為3。(註意到,如果之前設置過瞭信號量,將無法再次設置,直接返回0。想要更改信號量總數可以使用addPermits方法)
  • 然後redis發佈一些消息,返回1

三、RSemaphore的加鎖流程

許可數量設置好之後,我們就可以調用acquire()方法獲取瞭,如果未傳入許可數量,默認獲取一個許可。

public void acquire() throws InterruptedException {
    acquire(1);
}
public void acquire(int permits) throws InterruptedException {
    // 嘗試獲取鎖成功,直接返回
    if (tryAcquire(permits)) {
        return;
    }
    // 對於沒有獲取鎖的那些線程,訂閱redisson_sc:{分佈式信號量key}通道的消息
    CompletableFuture<RedissonLockEntry> future = subscribe();
    semaphorePubSub.timeout(future);
    RedissonLockEntry entry = commandExecutor.getInterrupted(future);
    try {
        // 不斷循環嘗試獲取許可
        while (true) {
            if (tryAcquire(permits)) {
                return;
            }
            entry.getLatch().acquire();
        }
    } finally {
        // 取消訂閱
        unsubscribe(entry);
    }
//        get(acquireAsync(permits));
}

可以看到,獲取許可的核心邏輯在tryAcquire()方法中,如果tryAcquire()返回true說明獲取許可成功,直接返回;如果返回false,說明當前沒有許可可以使用,則對於沒有獲取鎖的那些線程,訂閱redisson_sc:{分佈式信號量key}通道的消息,並通過死循環不斷嘗試獲取鎖。

我們看一下tryAcquire()方法的邏輯,內部調用瞭tryAcquireAsync()方法:

// 異步獲取許可
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>(true);
    }
    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              // 獲取當前剩餘的許可數量
              "local value = redis.call('get', KEYS[1]); " +
              // 許可不為空,並且許可數量 大於等於 當前線程申請的許可數量        
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                  // 通過decrby減少剩餘可用許可    
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                  // 返回1    
                  "return 1; " +
              "end; " +
              // 其它情況,返回0        
              "return 0;",
              Collections.<Object>singletonList(getRawName()), permits);
}

從源碼可以看到,獲取許可就是操作redis中的數據,首先獲取到redis中剩餘的許可數量,隻有當剩餘的許可數量大於線程申請的許可數量時,才獲取成功,返回1;否則獲取失敗,返回0;

總結加鎖執行流程為:

  • get semaphore,獲取到一個當前的值,比如說是3,3 > 1
  • decrby semaphore 1,將信號量允許獲取鎖的客戶端的數量遞減1,變成2
  • decrby semaphore 1
  • decrby semaphore 1
  • 執行3次加鎖後,semaphore值為0
  • 此時如果再來進行加鎖則直接返回0,然後進入死循環去獲取鎖

四、RSemaphore的解鎖流程

通過前面對RSemaphore獲取鎖的分析,我們很容易能猜到,釋放鎖,無非就是歸還許可數量到redis中。我們查看具體的源碼:

public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>((Void) null);
    }
    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            // 通過incrby增加許可數量
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                    // 發佈一條消息到redisson_sc:{semaphore}中
                    "redis.call('publish', KEYS[2], value); ",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(o -> {
            log.debug("released, permits: {}, name: {}", permits, getName());
        });
    }
    return future;
}

到此這篇關於Redisson分佈式信號量RSemaphore的使用超詳細講解的文章就介紹到這瞭,更多相關Redisson RSemaphore內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: