java實現單機限流

何時使用限流:

比如你希望自己的應用程序 QPS不要超過1000,那麼RateLimiter設置1000的速率後,就會每秒往桶裡 扔1000個令牌,RateLimiter經常用於限制對一些物理資源或者邏輯資源的訪 問速率。

簡介:

對於單機版的限流,可以使用Google 開源的 Guava項目,這個項目提供瞭Google在Java項目中使用一些核心庫,包含集合(Collections),緩存(Caching),並發編程庫(Concurrency),常用註解(Common annotations),String操作,I/O操作方面的眾多非常實用的函數。

這個項目也包含瞭限流的功能,其原理是根據令牌桶算法來實現。

提供瞭兩種限流策略:

● 平滑突發限流(SmoothBursty)
● 平滑預熱限流(SmoothWarmingUp)實現。

依賴:

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>29.0-jre</version>
</dependency>

方法描述:

模擬場景(示例):

場景一:

當我們希望某一個接口每秒的訪問量不超過10次

package org.xhs.test;

import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;


/**
 * @Author: hu.chen
 * @Description:
 **/
public class Test {

    /**
     * 存儲接口名和令牌生成器的對應關系
     */
   private static Map<String, RateLimiter> interfaces = new ConcurrentHashMap<>();
   
    /**
     * 線程池
     */
    private static ExecutorService threadPool = new ThreadPoolExecutor(10,15,3,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));

    public static void main(String[] args) throws InterruptedException {

        List<UserRequest> tasks = new ArrayList<UserRequest>();
        // 準備工作,先初始化 10個線程(用戶),這10個用戶同時訪問一個接口
        for (int i = 1; i <= 12; i++) {
            String ip = "127.0.0." + i;
            String userName="chenhu_"+i;
            String interfaceName="user/find_";
            tasks.add(new UserRequest(ip,userName,interfaceName));
        }


        // 先初始化好令牌生成器
        for (UserRequest request : tasks) {
            // 根據接口名限流
            RateLimiter rateLimiter = interfaces.get(request.getInterfaceName());

            if(rateLimiter==null){
                // 創建一個令牌生成器,每秒產生10個令牌
                synchronized (interfaces) {
                    if(rateLimiter==null) {
                        rateLimiter = RateLimiter.create(10);
                        // 將這個令牌生成器和具體的接口進行綁定
                        interfaces.put(request.getInterfaceName(),rateLimiter);
                    }
                }
            }
        }

        // 休眠一秒,讓令牌生成器先生成令牌
        Thread.sleep(1000);

        for (UserRequest request : tasks) {
            // 根據接口名限流
            RateLimiter rateLimiter = interfaces.get(request.getInterfaceName());


            // 獲取令牌桶中一個令牌,如果獲取不到,則等待 timeout 時間,如果還獲取不到,則返回false,反之則返回true
            // timeout設置為0,表示不等待
            if(rateLimiter.tryAcquire(1,0,TimeUnit.SECONDS)){

                // 得到令牌,處理請求
                threadPool.execute(()->{
                    System.err.println("接口:"+request.getInterfaceName()+" 訪問還未達到上限,"+request.getUserName()+"可以訪問");
                });
            }else {
                // 已經等待瞭10秒還獲取不到令牌,進行其他業務處理
                System.err.println("當前時間訪問失敗,"+request.getUserName()+"無法獲取令牌");
            }
        }
    }

    private static class UserRequest {
        /**
         * 請求用戶ip
         */
        private String ip;

        /**
         * 用戶名
         */
        private String userName;

        /**
         * 請求的接口名
         */
        private String interfaceName;
        public UserRequest(String ip, String userName, String interfaceName) {
            this.ip = ip;
            this.userName = userName;
            this.interfaceName = interfaceName;
        }
        public String getIp() {return ip;}

        public String getUserName() { return userName;}
        public String getInterfaceName() {return interfaceName;}
    }
}

場景二:

當我們希望某一個用戶或者ip,每秒的訪問量不超過10

package org.xhs.test;

import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;


/**
 * @Author: hu.chen
 * @Description:
 **/
public class Test {

    /**
     * 存儲用戶名和令牌生成器的對應關系
     */
   private static Map<String, RateLimiter> interfaces = new ConcurrentHashMap<>();

    /**
     * 線程池
     */
    private static ExecutorService threadPool = new ThreadPoolExecutor(10,15,3,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));

    public static void main(String[] args) throws InterruptedException {

        List<UserRequest> tasks = new ArrayList<UserRequest>();
        // 準備工作,先初始化 10個線程(用戶),這10個用戶同時訪問一個接口
        for (int i = 1; i <= 12; i++) {
            String ip = "127.0.0." + i;
            String userName="chenhu_";
            String interfaceName="user/find_"+i;
            tasks.add(new UserRequest(ip,userName,interfaceName));
        }


        // 先初始化好令牌生成器
        for (UserRequest request : tasks) {
            // 根據接口名限流
            RateLimiter rateLimiter = interfaces.get(request.getUserName());

            if(rateLimiter==null){
                // 創建一個令牌生成器,每秒產生5個令牌
                synchronized (interfaces) {
                    if(rateLimiter==null) {
                        rateLimiter = RateLimiter.create(10);
                        // 將這個令牌生成器和具體的接口進行綁定
                        interfaces.put(request.getUserName(),rateLimiter);
                    }
                }
            }
        }

        // 休眠一秒,讓令牌生成器先生成令牌
        Thread.sleep(1000);

        for (UserRequest request : tasks) {
            // 根據接口名限流
            RateLimiter rateLimiter = interfaces.get(request.getUserName());


            // 獲取令牌桶中一個令牌,如果獲取不到,則等待 timeout 時間,如果還獲取不到,則返回false,反之則返回true
            // timeout設置為0,表示不等待
            if(rateLimiter.tryAcquire(1,0,TimeUnit.SECONDS)){

                // 得到令牌,處理請求
                threadPool.execute(()->{
                    System.err.println("用戶:"+request.getUserName()+" 當前時間訪問次數還未達到上限,可以訪問");
                });
            }else {
                // 已經等待瞭10秒還獲取不到令牌,進行其他業務處理
                System.err.println("當前時間訪問失敗,"+request.getUserName()+"無法獲取令牌");
            }
        }
    }

    private static class UserRequest {
        /**
         * 請求用戶ip
         */
        private String ip;

        /**
         * 用戶名
         */
        private String userName;

        /**
         * 請求的接口名
         */
        private String interfaceName;
        public UserRequest(String ip, String userName, String interfaceName) {
            this.ip = ip;
            this.userName = userName;
            this.interfaceName = interfaceName;
        }
        public String getIp() {return ip;}

        public String getUserName() { return userName;}
        public String getInterfaceName() {return interfaceName;}
    }
}

以上就是本文的全部內容,希望對大傢的學習有所幫助,也希望大傢多多支持WalkonNet。

推薦閱讀: