基於springcloud異步線程池、高並發請求feign的解決方案

ScenTaskTestApplication.java

package com.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
/**
* @author scen
* @version 2018年9月27日 上午11:51:04
*/
@EnableFeignClients
@SpringBootApplication
public class ScenTaskTestApplication {
 public static void main(String[] args) {
  SpringApplication.run(ScenTaskTestApplication.class, args);
 }
}

application.properties

spring.application.name=scen-task-test
server.port=9009
feign.hystrix.enabled=true
#熔斷器失敗的個數==進入熔斷器的請求達到1000時服務降級(之後的請求直接進入熔斷器)
hystrix.command.default.circuitBreaker.requestVolumeThreshold=1000
#回退最大線程數
hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests=50
#核心線程池數量
hystrix.threadpool.default.coreSize=130
#請求處理的超時時間
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=100000
ribbon.ReadTimeout=120000
#請求連接的超時時間
ribbon.ConnectTimeout=130000
eureka.instance.instance-id=${spring.application.name}:${spring.application.instance_id:${server.port}}
eureka.instance.preferIpAddress=true
eureka.client.service-url.defaultZone=http://127.0.0.1:9000/eureka
logging.level.com.test.user.service=debug
logging.level.org.springframework.boot=debug
logging.level.custom=info

AsyncConfig.java

package com.test;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
 * springboot異步線程池配置
 * @author Scen
 * @date 2018/11/7 18:28
 */
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
 
 
 @Override
 public Executor getAsyncExecutor() {
  //定義線程池
  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  //核心線程數
  taskExecutor.setCorePoolSize(20);
  //線程池最大線程數
  taskExecutor.setMaxPoolSize(100);
  //線程隊列最大線程數
  taskExecutor.setQueueCapacity(10);
  //初始化
  taskExecutor.initialize();
  return taskExecutor;
 }
}

DoTaskClass.java

package com.test;
import com.test.pojo.User;
import com.test.pojo.UserEducation;
import com.test.user.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.List;
/**
 * 任務類 定義異步工作任務
 * @author Scen
 * @date 2018/11/7 18:40
 */
@Component
public class DoTaskClass { 
 /**
  * 一個feign的客戶端
  */
 private final UserService userService;
 
 @Autowired
 public DoTaskClass(UserService userService) {
  this.userService = userService;
 }
 
 /**
  * 核心任務
  *
  * @param uid
  */
 @Async
 public void dotask(String uid) {
  /**
   * 模擬復雜工作業務(109個線程同時通過feign請求微服務提供者)
   */
  {
   List<UserEducation> userEducationByUid = userService.findUserEducationByUid(uid);
   List<String> blackList = userService.getBlackList();
   String userSkilled = userService.getUserSkilled(uid);
   String userFollow = userService.getUserFollow(uid);
   User userById = userService.getUserById(uid);
   List<String> followList = userService.getFollowList(uid);
   int userActivityScore = userService.getUserActivityScore(uid);
  }
//  打印線程名稱分辨是否為多線程操作
  System.out.println(Thread.currentThread().getName() + "===任務" + uid + "執行完成===");
 }
}

TestController.java

package com.test;
import com.test.pojo.User;
import com.test.user.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
 * 測試案例
 * @author Scen
 * @date 2018/11/7 18:10
 */
@RestController
public class TestController {
 
 /**
  * 此處僅用此feign客戶端請求微服務獲取核心工作所需參數
  */
 private final UserService userService;
 
 /**
  * 核心工作異步算法
  */
 private final DoTaskClass doTaskClass;
 
 @Autowired
 public TestController(DoTaskClass doTaskClass, UserService userService) {
  this.doTaskClass = doTaskClass;
  this.userService = userService;
 } 
 
 /**
  * 手動觸發工作
  * @throws InterruptedException
  */
 @RequestMapping("/test")
 public void task() throws InterruptedException {
  /*
   取到1000個要執行任務的必備參數
   */
  List<User> userList = userService.findAllLite(1, 1000);
  for (int i = 0; i < userList.size(); i++) {
   try {
//    異步線程開始工作
    doTaskClass.dotask(userList.get(i).getId());
   } catch (Exception e) {
    /*
     若並發線程數達到MaxPoolSize+QueueCapacity=110(參考AsyncConfig配置)會進入catch代碼塊
     i--休眠3秒後重試(嘗試進入線程隊列:當且僅當109個線程有一個或多個線程完成異步任務時重試成功)
     */
    i--;
    Thread.sleep(3000*3);
   }
   System.out.println(i);
  }
 } 
}

相關線程池、超時時間等數量和大小按實際業務配置

補充:SpringCloud關於@FeignClient和Hystrix集成對http線程池監控問題

@FeignClient可以作為Http代理訪問其他微服務節點,可以用apache的httpclient替換@FeignClient原生的URLConnection請求方式,以達到讓http請求走Http線程池的目的。

而@FeignClient和hystrix集成之後,在hystrix dashboard上可以監控到 @FeignClient 中接口調用情況和 @FeignClient 中httpclient中線程池使用狀況。

下面是demo的示例:

1、@FeignClient的接口代碼如下:

@FeignClient(value="service-A", fallback=ServiceClientHystrix.class)
public interface ServiceClient { 
 @RequestMapping(method = RequestMethod.GET, value = "/add/{id}")
 String add(@PathVariable("id") Integer id);
}

2、ServiceClientHystrix.java

@Component
public class ServiceClientHystrix implements ServiceClient{
 @Override
 public String add(Integer id) {
  return "add value from ServiceClientHystrix";
 }
}

3、關於@FeignClient和hystrix

集成後,Http線程池配置如下:

hystrix.threadpool.服務實例ID.參數

例如設置httpclient的線程池最大線程數量

hystrix.threadpool.service-A.coreSize=20//默認是hystrix.threadpool.default.coreSize = 10
hystrix.threadpool.service-A.maximumSize=20//默認是hystrix.threadpool.default.maximumSize = 10

啟動服務後用測試用例連續調用接口測試,用hystrix dashboard

監控得到下圖監控效果:

去掉hystrix.threadpool.服務實例ID.參數配置後,再次用測試用例調用接口得到監控如下圖:

PoolSize的大小取決於hystrix.threadpool.服務實例ID.coreSize大小設置

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。

推薦閱讀: