Redis限流的幾種實現

一、簡單的限流

基本原理

當系統處理能力有限,如何組織計劃外的請求對系統施壓。首先我們先看下一些簡單的限流策略,防止暴力攻擊。比如要對IP訪問,沒5s隻能訪問10次,超過進行攔截。

如上圖,一般使用滑動窗口來統計區間時間內的訪問次數。

使用 zset 記錄 IP 訪問次數,每個 IP 通過 key 保存下來,score 保存當前時間戳,value 唯一用時間戳或者UUID來實現

代碼實現

public class RedisLimiterTest {
    private Jedis jedis;

    public RedisLimiterTest(Jedis jedis) {
        this.jedis = jedis;
    }

    /**
     * @param ipAddress Ip地址
     * @param period    特定的時間內,單位秒
     * @param maxCount  最大允許的次數
     * @return
     */
    public boolean isIpLimit(String ipAddress, int period, int maxCount) {
        String key = String.format("ip:%s", ipAddress);
        // 毫秒時間戳
        long currentTimeMillis = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        // redis事務,保證原子性
        pipe.multi();
        // 存放數據,value 和 score 都使用毫秒時間戳
        pipe.zadd(key, currentTimeMillis, "" + UUID.randomUUID());
        // 移除窗口區間所有的元素
        pipe.zremrangeByScore(key, 0, currentTimeMillis - period * 1000);
        // 獲取時間窗口內的行為數量
        Response<Long> count = pipe.zcard(key);
        // 設置 zset 過期時間,避免冷用戶持續占用內存,這裡寬限1s
        pipe.expire(key, period + 1);
        // 提交事務
        pipe.exec();
        pipe.close();
        // 比較數量是否超標
        return count.get() > maxCount;
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        RedisLimiterTest limiter = new RedisLimiterTest(jedis);
        for (int i = 1; i <= 20; i++) {
            // 驗證IP  10秒鐘之內隻能訪問5次
            boolean isLimit = limiter.isIpLimit("222.73.55.22", 10, 5);
            System.out.println("訪問第" + i + "次, 結果:" + (isLimit ? "限制訪問" : "允許訪問"));
        }
    }
}

執行結果
訪問第1次, 結果:允許訪問
訪問第2次, 結果:允許訪問
訪問第3次, 結果:允許訪問
訪問第4次, 結果:允許訪問
訪問第5次, 結果:允許訪問
訪問第6次, 結果:限制訪問
訪問第7次, 結果:限制訪問
… …

缺點:要記錄時間窗口所有的行為記錄,量很大,比如,限定60s內不能超過100萬次這種場景,不太適合這樣限流,因為會消耗大量的儲存空間。

二、漏鬥限流

基本原理

  • 漏鬥的容量是限定的,如果滿瞭,就裝不進去瞭。
  • 如果將漏嘴放開,水就會往下流,流走一部分之後,就又可以繼續往裡面灌水。
  • 如果漏嘴流水的速率大於灌水的速率,那麼漏鬥永遠都裝不滿。
  • 如果漏嘴流水速率小於灌水的速率,那麼一旦漏鬥滿瞭,灌水就需要暫停並等待漏鬥騰空。

示例代碼

public class FunnelLimiterTest {

    static class Funnel {
        int capacity; // 漏鬥容量
        float leakingRate; // 漏嘴流水速率
        int leftQuota; // 漏鬥剩餘空間
        long leakingTs; // 上一次漏水時間

        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            this.leftQuota = capacity;
            this.leakingTs = System.currentTimeMillis();
        }

        void makeSpace() {
            long nowTs = System.currentTimeMillis();
            long deltaTs = nowTs - leakingTs; // 距離上一次漏水過去瞭多久
            int deltaQuota = (int) (deltaTs * leakingRate); // 騰出的空間 = 時間*漏水速率
            if (deltaQuota < 0) { // 間隔時間太長,整數數字過大溢出
                this.leftQuota = capacity;
                this.leakingTs = nowTs;
                return;
            }
            if (deltaQuota < 1) { // 騰出空間太小 就等下次,最小單位是1
                return;
            }
            this.leftQuota += deltaQuota; // 漏鬥剩餘空間 = 漏鬥剩餘空間 + 騰出的空間
            this.leakingTs = nowTs;
            if (this.leftQuota > this.capacity) { // 剩餘空間不得高於容量
                this.leftQuota = this.capacity;
            }
        }

        boolean watering(int quota) {
            makeSpace();
            if (this.leftQuota >= quota) { // 判斷剩餘空間是否足夠
                this.leftQuota -= quota;
                return true;
            }
            return false;
        }
    }

    // 所有的漏鬥
    private Map<String, Funnel> funnels = new HashMap<>();

    /**
     * @param capacity    漏鬥容量
     * @param leakingRate 漏嘴流水速率 quota/s
     */
    public boolean isIpLimit(String ipAddress, int capacity, float leakingRate) {
        String key = String.format("ip:%s", ipAddress);
        Funnel funnel = funnels.get(key);
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnels.put(key, funnel);
        }
        return !funnel.watering(1); // 需要1個quota
    }

    public static void main(String[] args) throws Exception{
        FunnelLimiterTest limiter = new FunnelLimiterTest();
        for (int i = 1; i <= 50; i++) {
            // 每1s執行一次
            Thread.sleep(1000);
            // 漏鬥容量是2 ,漏嘴流水速率是0.5每秒,
            boolean isLimit = limiter.isIpLimit("222.73.55.22", 2, (float)0.5/1000);
            System.out.println("訪問第" + i + "次, 結果:" + (isLimit ? "限制訪問" : "允許訪問"));
        }
    }
}

執行結果
訪問第1次, 結果:允許訪問    # 第1次,容量剩餘2,執行後1
訪問第2次, 結果:允許訪問    # 第2次,容量剩餘1,執行後0
訪問第3次, 結果:允許訪問    # 第3次,由於過瞭2s, 漏鬥流水剩餘1個空間,所以容量剩餘1,執行後0
訪問第4次, 結果:限制訪問    # 第4次,過瞭1s, 剩餘空間小於1, 容量剩餘0
訪問第5次, 結果:允許訪問    # 第5次,由於過瞭2s, 漏鬥流水剩餘1個空間,所以容量剩餘1,執行後0
訪問第6次, 結果:限制訪問    # 以此類推…
訪問第7次, 結果:允許訪問
訪問第8次, 結果:限制訪問
訪問第9次, 結果:允許訪問
訪問第10次, 結果:限制訪問

我們觀察 Funnel 對象的幾個字段,我們發現可以將 Funnel 對象的內容按字段存儲到一個 hash 結構中,灌水的時候將 hash 結構的字段取出來進行邏輯運算後,再將新值回填到 hash 結構中就完成瞭一次行為頻度的檢測。

但是有個問題,我們無法保證整個過程的原子性。從 hash 結構中取值,然後在內存裡運算,再回填到 hash 結構,這三個過程無法原子化,意味著需要進行適當的加鎖控制。而一旦加鎖,就意味著會有加鎖失敗,加鎖失敗就需要選擇重試或者放棄。
如果重試的話,就會導致性能下降。如果放棄的話,就會影響用戶體驗。同時,代碼的復雜度也跟著升高很多。這真是個艱難的選擇,我們該如何解決這個問題呢?Redis-Cell 救星來瞭!

Redis-Cell

Redis 4.0 提供瞭一個限流 Redis 模塊,它叫 redis-cell。該模塊也使用瞭漏鬥算法,並提供瞭原子的限流指令。
該模塊隻有1條指令cl.throttle,它的參數和返回值都略顯復雜,接下來讓我們來看看這個指令具體該如何使用。

> cl.throttle key:xxx 15 30 60 1

15 : 15 capacity 這是漏鬥容量
30 60 : 30 operations / 60 seconds 這是漏水速率
1 : need 1 quota (可選參數,默認值也是1)

> cl.throttle laoqian:reply 15 30 60
1) (integer) 0   # 0 表示允許,1表示拒絕
2) (integer) 15  # 漏鬥容量capacity
3) (integer) 14  # 漏鬥剩餘空間left_quota
4) (integer) -1  # 如果拒絕瞭,需要多長時間後再試(漏鬥有空間瞭,單位秒)
5) (integer) 2   # 多長時間後,漏鬥完全空出來(left_quota==capacity,單位秒)

在執行限流指令時,如果被拒絕瞭,就需要丟棄或重試。cl.throttle 指令考慮的非常周到,連重試時間都幫你算好瞭,直接取返回結果數組的第四個值進行 sleep 即可,如果不想阻塞線程,也可以異步定時任務來重試。

參考來源

《Redis深度歷險 核心原理與應用實踐》_錢文品

到此這篇關於Redis限流的幾種實現的文章就介紹到這瞭,更多相關Redis限流內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet! 

推薦閱讀: