xxl-job如何濫用netty導致的問題及解決方案

netty作為一種高性能的網絡編程框架,在很多開源項目中大放異彩,十分亮眼,但是在有些項目中卻被濫用,導致使用者使用起來非常的難受。

筆者使用的是2.3.0版本的xxl-job,也是當前的最新版本;下面所有的代碼修改全部基於2.3.0版本的xxl-job源代碼

https://github.com/xuxueli/xxl-job/tree/2.3.0

其中,xxl-job-admin對應著項目:https://github.com/xuxueli/xxl-job/tree/2.3.0/xxl-job-admin

spring-boot項目對應著示例項目:https://github.com/xuxueli/xxl-job/tree/master/xxl-job-executor-samples/xxl-job-executor-sample-springboot

一、xxl-job存在的多端口問題

關於xxl-job如何使用的問題,可以參考我的另外一篇文章:分佈式任務調度系統:xxl-job

現在java開發基本上已經離不開spring boot瞭吧,我在spring boot中集成瞭xxl-job-core組件並且已經能夠正常使用,但是一旦部署到測試環境就不行瞭,這是因為測試環境使用瞭docker,spring boot集成xxl-job-core組件之後會額外開啟9999端口號給xxl-job-admin調用使用,如果docker不開啟宿主機到docker的端口映射,xxl-job-admin自然就會調用失敗。這導致瞭以下問題:

  • 每個spring boot程序都要開兩個端口號,意味著同時運行著兩個服務進行端口監聽,浪費計算和內存資源
  • 如果使用docker部署,需要再額外做宿主機和容器的9999端口號的映射,否則外部的xxl-job-admin將無法訪問。

那如果兩個不同的服務都集成瞭xxl-job,但是部署在同一臺機器上,又會發生什麼呢?答案是如果不指定特定端口號,兩個服務肯定都要使用9999端口號,勢必會端口沖突,但是xxl-job已經想到瞭9999端口號被占用的情況,如果9999端口號被占用,則會端口號加一再重試。

xxl-job-core組件額外開啟9999端口號到底合不合理?

舉個例子:spring boot程序集成swagger-ui是很常見的操作吧,也沒見swagger-ui再額外開啟端口號啊,我認為是不合理的。但是,我認為作者這樣做也有他的考慮—並非所有程序都是spring-boot的程序,也有使用其它框架的程序,使用獨立的netty server作為客戶端能夠保證在使用java的任意xxl-job客戶端都能穩定的向xxl-job-admin提供服務。然而java開發者們絕大多數情況下都是使用spirng-boot構建程序,在這種情況下,作者偷懶沒有構建專門在spirng boot框架下使用的xxl-job-core,而是想瞭個類似萬金油的蠢招解決問題,讓所有在spring-boot框架下的開發者都一起難受,實在是令人費解。

二、源碼追蹤

一切的起點要從spring-boot程序集成xxl-job-core說起,集成方式很簡單,隻需要成功創建一個XxlJobSpringExecutor Bean對象即可。

@Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

XxlJobSpringExecutor對象創建完成之後會做一些xxl-job初始化的操作,包含連接xxl-job-admin以及啟動netty server。

展開XxlJobSpringExecutor源碼,可以看到它實現瞭SmartInitializingSingleton接口,這就意味著Bean對象創建完成之後會回調afterSingletonsInstantiated接口

// start
    @Override
    public void afterSingletonsInstantiated() {

        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/

        // init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

super.start();這行代碼中,會調用父類XxlJobExecutor的start方法做初始化

public void start() throws Exception {

        // init logpath
        XxlJobFileAppender.initLogPath(logPath);

        // init invoker, admin-client
        initAdminBizList(adminAddresses, accessToken);


        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        // init executor-server
        initEmbedServer(address, ip, port, appname, accessToken);
    }

initEmbedServer(address, ip, port, appname, accessToken);這行代碼做開啟netty-server的操作

 private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

        // fill ip port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

        // generate address
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // accessToken
        if (accessToken==null || accessToken.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
        }

        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }

可以看到這裡會創建EmbedServer對象,並且使用start方法開啟netty-server,在這裡就能看到熟悉的一大坨瞭

除瞭開啟讀寫空閑檢測之外,就隻做瞭一件事:開啟http服務,也就是說,xxl-job-admin是通過http請求調用客戶端的接口觸發客戶端的任務調度的。最終處理方法在EmbedHttpServerHandler類中,順著EmbedHttpServerHandler類的方法找,可以最終找到處理的方法com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#process

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    // valid
    if (HttpMethod.POST != httpMethod) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    if (accessToken!=null
        && accessToken.trim().length()>0
        && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) {
            return executorBiz.beat();
        } else if ("/idleBeat".equals(uri)) {
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        } else if ("/run".equals(uri)) {
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) {
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
    }
}

從這段代碼的邏輯可以看到

  • 隻接受POST請求
  • 如果有token,則會校驗token
  • 隻提供/beat、/idelBeat、/run、/kill、/log 五個接口,所有請求的處理都會委托給executorBiz處理。

最後,netty將executorBiz處理結果寫回xxl-job-admin,然後請求就結束瞭。這裡netty扮演的角色非常簡單,我認為可以使用spring-mvc非常容易的替換掉它的功能。

三、使用spring-mvc替換netty的功能

1.新增spring-mvc代碼

這裡要修改xxl-job-core的源代碼,首先,加入spring-mvc的依賴

		<!-- spring-web -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
			<version>${spring.version}</version>
			<scope>provided</scope>
		</dependency>

然後新增Controller文件

package com.xxl.job.core.controller;

import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.biz.model.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author kdyzm
 * @date 2021/5/7
 */
@RestController
public class XxlJobController {

    @PostMapping("/beat")
    public ReturnT<String> beat() {
        return new ExecutorBizImpl().beat();
    }

    @PostMapping("/idleBeat")
    public ReturnT<String> idleBeat(@RequestBody IdleBeatParam param) {
        return new ExecutorBizImpl().idleBeat(param);
    }

    @PostMapping("/run")
    public ReturnT<String> run(@RequestBody TriggerParam param) {
        return new ExecutorBizImpl().run(param);
    }

    @PostMapping("/kill")
    public ReturnT<String> kill(@RequestBody KillParam param) {
        return new ExecutorBizImpl().kill(param);
    }

    @PostMapping("/log")
    public ReturnT<LogResult> log(@RequestBody LogParam param) {
        return new ExecutorBizImpl().log(param);
    }
}

2.刪除老代碼&移除netty依賴

之後,就要刪除老的代碼瞭,修改com.xxl.job.core.server.EmbedServer#start方法,清空所有代碼,新增

// start registry
startRegistry(appname, address);

然後刪除EmbedServer類中的以下兩個變量及相關的引用

 private ExecutorBiz executorBiz;
    private Thread thread;

之後刪除netty的依賴

		<!-- ********************** embed server: netty + gson ********************** -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>${netty-all.version}</version>
		</dependency>

將報錯的代碼全部刪除,之後就可以編譯成功瞭,當然這還不行。

3.修改註冊到xxl-job-admin的端口號

註冊的ip地址可以不用改,但是端口號要取spring-boot程序的端口號。

因為要復用springk-boot容器的端口號,所以這裡註冊的端口號要和它保持一致,修改com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer方法,註釋掉

port = port > 0 ? port : NetUtil.findAvailablePort(9999);

然後修改spring-boot的配置文件,xxl-job的端口號配置改成server.port

server.port=8081
xxl.job.executor.port=${server.port}

在創建XxlJobSpringExecutor Bean對象的時候將改值傳遞給它。

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
    logger.info(">>>>>>>>>>> xxl-job config init.");
    XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    xxlJobSpringExecutor.setAppname(appname);
    xxlJobSpringExecutor.setAddress(address);
    xxlJobSpringExecutor.setIp(ip);
    xxlJobSpringExecutor.setPort(port);
    xxlJobSpringExecutor.setAccessToken(accessToken);
    xxlJobSpringExecutor.setLogPath(logPath);
    xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    return xxlJobSpringExecutor;
}

4.將xxl-job-core改造成spring-boot-starter

上面改造完瞭之後已經將邏輯變更為使用spring-mvc,但是spring-boot程序還沒有辦法掃描到xxl-job-core中的controller,可以手動掃描包,這裡推薦使用spring-boot-starter,這樣隻需要將xxl-job-core加入classpath,就可以自動生效。

在 com.xxl.job.core.config包下新建Config類

package com.xxl.job.core.config;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
 * @author kdyzm
 * @date 2021/5/7
 */
@Configuration
@ComponentScan(basePackages = {"com.xxl.job.core.controller"})
public class Config {
}

src/main/resources/META-INF文件夾下新建spring.factories文件,文件內容如下

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.xxl.job.core.config.Config

5.增加特殊前綴匹配

上面修改之後將使用spring mvc接口替代原netty功能提供的http接口,但是暴露出的接口是/run、/beat、/kill這種有可能和宿主服務路徑沖突的接口,為瞭防止出現路徑沖突,做出以下修改

修改com.xxl.job.core.controller.XxlJobController類,添加@RequestMapping("/xxl-job")

@RestController
@RequestMapping("/xxl-job")
public class XxlJobController {
	...
}

修改com.xxl.job.core.biz.client.ExecutorBizClient類,為每個請求添加/xxl-job前綴

package com.xxl.job.core.biz.client;

import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.*;
import com.xxl.job.core.util.XxlJobRemotingUtil;

/**
 * admin api test
 *
 * @author xuxueli 2017-07-28 22:14:52
 */
public class ExecutorBizClient implements ExecutorBiz {

    public ExecutorBizClient() {
    }
    public ExecutorBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }

    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;


    @Override
    public ReturnT<String> beat() {
        return XxlJobRemotingUtil.postBody(addressUrl+"xxl-job/beat", accessToken, timeout, "", String.class);
    }

    @Override
    public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
        return XxlJobRemotingUtil.postBody(addressUrl+"xxl-job/idleBeat", accessToken, timeout, idleBeatParam, String.class);
    }

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/run", accessToken, timeout, triggerParam, String.class);
    }

    @Override
    public ReturnT<String> kill(KillParam killParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/kill", accessToken, timeout, killParam, String.class);
    }

    @Override
    public ReturnT<LogResult> log(LogParam logParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/log", accessToken, timeout, logParam, LogResult.class);
    }

}

這樣,就全部修改完瞭。

四、測試

重啟xxl-job-executor-sample-springboot項目,查看註冊到xxl-job-admin上的信息

可以看到端口號已經不是默認的9999,而是和spring-boot程序保持一致的端口號,然後執行默認的job

可以看到已經執行成功,在查看日志詳情

日志也一切正常,表示一切都改造成功瞭。

完整的代碼修改:https://github.com/kdyzm/xxl-job/commit/449ee5c7bbb659356af25b164c251f960b9a6891

五、實際使用

由於原作者基本上不理睬人,我克隆瞭項目2.3.0版本並且新增瞭2.4.1版本:https://github.com/kdyzm/xxl-job/releases/tag/2.4.1

有需要的可以下載源代碼自己打包xxl-job-core項目上傳私服後就可以使用瞭

以上就是xxl-job如何濫用netty導致的問題及解決方案的詳細內容,更多關於xxl-job濫用netty的資料請關註WalkonNet其它相關文章!

推薦閱讀: