Xxl-Job 2.3.0源码分析


Xxl-Job源码分析

导读

​ 在日常开发的过程中,我们总会遇到需要使用定时功能的业务,Spring虽然提供了定时器,但是其本身具有很大的局限性。

Spring自带的定时任务框架的缺点:

  1. 不支持集群:为避免重复执行的问题
  2. 不支持生命周期统一管理:不重启服务情况下关闭,启动任务
  3. 不支持分片任务:处理有序数据时,多机器分片执行任务处理不同数据
  4. 不支持动态调整:不重启服务的情况下修改任务参数
  5. 无报警机制:任务失败之后没有报警机制
  6. 不支持失败重试:出现异常后任务中介,不能根据执行状态控制任务重新执行
  7. 任务数据统计难以统计:任务数据量大时,对于任务执行情况无法高效的统计执行情况

​ 因为Spring本身的局限性问题,所以很多时候它并不能满足我们的业务需求,因此就有了任务调度框架来专门解决这些问题。而目前主流的任务调度框架有quartzxxl-job 。quartz 和 xxl-job 都是任务调度框架,任务调度相关功能都可以借助这两个框架实现。

​ 这两个框架都通过自己的方式实现了这个功能,区别在于对任务的创建、修改、删除、触发以及监控的操作成本上,quartz 对于这些操作直接提供了 api,这意味着开发人员拥有最大的操作权,也带来了更高的灵活性,然而对于不需要很高灵活性的系统,或调度任务的操控由非开发人员负责的系统,需要额外对 api 调用做一层封装,隔离 api 操作;而 xxl-job 提供这些控制的方式是提供了一个单独的可视化调度中心,这意味着任务的状态控制可以和系统分离,通过更易操作的网页界面的形式,降低了对操控者的门槛。

两者的详细对比可参展以下图片:

本文重点分析xxl-job框架的源码。

1、系统概括

​ XXL-JOB项目是一个轻量级、开箱即用、完全开源的分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。项目采用Spring boot + Mybatis的框架组合来组织代码,代码整体风格比较好,模块化清晰,代码逻辑遵行Web的MVC架构。

xxl-job版本:2.3.0

xxl-job的项目地址:xxl-job项目地址 xxl-job文档地址

xxl-job的项目架构图:

分为执行器和调度中心两大组件:

执行器运行在业务进程中,负责向调度中心进行注册并接受调度中心的调度指令。

调度中心依赖mysql作为数据源(数据存储和分布式锁),可集群部署,各节点无状态,集群部署时需要部署负载均衡器(如nginx)。一方面向用户提供可视化的控制台服务,另一方面处理执行器的注册请求并向执行器发送指令进行调度。调度中心是无状态的。调度中心和执行器之间的相互交互都是通过HTTP。

xxl-job项目代码总体分为三部分:

  • xxl-job-admin:调度中心 , 负责调度分布式多节点任务的执行工作
  • xxl-job-core:公共依赖,核心代码,调度中心以及任务客户端都依赖核心 jar。从业务角度去分析这个模块是没有意义的,很容易一脑雾水,因为这个模块不是独立的服务,它只是为xxl-job-admin和xxl-job-executors-sample提供了功能模块。
  • xxl-job-executor-samples:执行器 Sample 示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)xxl-job-executor-sample-springboot:Springboot 版本,通过 Springboot 管理执行器,推荐这种方式xxl-job-executor-sample-frameless:无框架版本

具体安装使用以及配置说明,仔细阅读: xxl-job文档

2、Admin(调度中心)解析

2.1 调度中心的启动

启动时首先先加载XxlJobAdminConfig文件:com.xxl.job.admin.core.conf.XxlJobAdminConfig:主要的配置类,启动从此开始加载;实现了InitializingBean,因此加载完配置文件后直接从afterPropertiesSet() 方法开始:

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
        return adminConfig;
    }

    // ---------------------- XxlJobScheduler ----------------------

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 1. 加载 XxlJobAdminConfig
        adminConfig = this;

        // 2. 启动任务扫描 
        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }
	....
}

2.2 初始化xxlJobScheduler:

public class XxlJobScheduler  {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);


    public void init() throws Exception {
        //2.1 init i18n  初始化新增任务页面中的阻塞处理策略相关的国际化信息。国际化提供中文、英文两种可选语言,默认为中文
        initI18n();

        //2.2 admin trigger pool start  初始化快慢线程池(核心线程10)
        JobTriggerPoolHelper.toStart();

        //2.3 admin registry monitor run  初始化一个上下线线程池;启动守护线程:清理下线的执行器(3次心跳),增加上线的执行器
        JobRegistryHelper.getInstance().start();

        //2.4 admin fail-monitor run 读取mysql失败的实例,根据配置进行重试/告警等操作(守护线程运行)
        JobFailMonitorHelper.getInstance().start();

        //2.5 admin lose-monitor run ( depend on JobTriggerPoolHelper )  初始化一个回调线程池(2,20,30L,TimeUnit.SECONDS),然后一个守护线程:定期扫描10min没有完成且执行器失去心跳的实例,设置为失败(守护线程运行)
        JobCompleteHelper.getInstance().start();

        //2.6 admin log report start  启动一个日志处理线程
        JobLogReportHelper.getInstance().start();

        //2.7 start-schedule  ( depend on JobTriggerPoolHelper )  启动两个守护线程进行任务调度(执行任务,缓存任务)
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
	......
}

初始化处启动了一系列线程;线程作用见代码注释;

回调线程池主要是处理代码的回调请求进行实例的更新;

2.3 定时扫描线程

​ com.xxl.job.admin.core.thread.JobScheduleHelper –> start() 主要的定时器扫描线程

public void start(){

    // schedule thread
    scheduleThread = new Thread(new Runnable() {
        @Override
        public void run() {

            try {
                TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
            } catch (InterruptedException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

            // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
            int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

            while (!scheduleThreadToStop) {

                // Scan Job
                long start = System.currentTimeMillis();

                Connection conn = null;
                Boolean connAutoCommit = null;
                PreparedStatement preparedStatement = null;

                boolean preReadSuc = true;
                try {

                    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                    connAutoCommit = conn.getAutoCommit();
                    conn.setAutoCommit(false);

                    preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                    preparedStatement.execute();

                    // tx start

                    // 1、pre read
                    long nowTime = System.currentTimeMillis();
                    List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                    if (scheduleList!=null && scheduleList.size()>0) {
                        // 2、push time-ring
                        for (XxlJobInfo jobInfo: scheduleList) {

                            // time-ring jump
                            if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                // 1、misfire match
                                MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                    // FIRE_ONCE_NOW 》 trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                }

                                // 2、fresh next
                                refreshNextValidTime(jobInfo, new Date());

                            } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                // 1、trigger
                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                // 2、fresh next
                                refreshNextValidTime(jobInfo, new Date());

                                // next-trigger-time in 5s, pre-read again
                                if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            } else {
                                // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                // 1、make ring second
                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                // 2、push time ring
                                pushTimeRing(ringSecond, jobInfo.getId());

                                // 3、fresh next
                                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                            }

                        }

                        // 3、update trigger info
                        for (XxlJobInfo jobInfo: scheduleList) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                        }

                    } else {
                        preReadSuc = false;
                    }

                    // tx stop


                } catch (Exception e) {
                    if (!scheduleThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                    }
                } finally {

                    // commit
                    if (conn != null) {
                        try {
                            conn.commit();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                        try {
                            conn.setAutoCommit(connAutoCommit);
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                        try {
                            conn.close();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                    // close PreparedStatement
                    if (null != preparedStatement) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                }
                long cost = System.currentTimeMillis()-start;


                // Wait seconds, align second
                if (cost < 1000) {  // scan-overtime, not wait
                    try {
                        // pre-read period: success > scan each second; fail > skip this period;
                        TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
        }
    });
    scheduleThread.setDaemon(true);
    scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
    scheduleThread.start();


    // ring thread
    ringThread = new Thread(new Runnable() {
        @Override
        public void run() {

            while (!ringThreadToStop) {

                // align second
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                try {
                    // second data
                    List<Integer> ringItemData = new ArrayList<>();
                    int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                    for (int i = 0; i < 2; i++) {
                        List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                        if (tmpData != null) {
                            ringItemData.addAll(tmpData);
                        }
                    }

                    // ring trigger
                    logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                    if (ringItemData.size() > 0) {
                        // do trigger
                        for (int jobId: ringItemData) {
                            // do trigger
                            JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                        }
                        // clear
                        ringItemData.clear();
                    }
                } catch (Exception e) {
                    if (!ringThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
        }
    });
    ringThread.setDaemon(true);
    ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
    ringThread.start();
}

1.守护线程-1scheduleThread:每5s一次根据线程池配置取出待执行的JobList,此处使用了forupdate作为悲观锁防止了多线程风险;

使用三种策略判断Jobinfo;

触发事件 < now -5s 的,根据配置策略:忽略,立即执行等等,然后触发时间加一个周期;触发时间第一次通过配置的Cron表达式解析得出每个任务的触发时间

触发事件 <= now的,立即执行并且出发时间加一个周期;

触发事件 > now的,根据秒数放入一个时间轮map中;触发事件加一个周期;

如果任务太多的话,会第一批取完休眠1s再取5s,知道没有任务为止;休眠5s;

2.守护线程-2ringThread

sleep到整秒;——设计巧妙,避免了ms级处理后重复触发;

处理事件轮,每一秒取出当前秒数和上一秒的所有需要触发的job,直接触发;

2.4 执行作业触发

com.xxl.job.admin.core.thread.JobTriggerPoolHelper –> addTrigger()

/**
     * add trigger
     */
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

        // choose thread pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }

        // trigger
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {

                long start = System.currentTimeMillis();

                try {
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {

                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) {
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    }

                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }

                }

            }
        });
    }

1.判断该job超时次数(大于500ms的次数),根据此放入快慢线程池

2.执行Job

2.5 启动任务

public static void trigger(int jobId,
                               TriggerTypeEnum triggerType,
                               int failRetryCount,
                               String executorShardingParam,
                               String executorParam,
                               String addressList) {

        // load data
        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
        if (jobInfo == null) {
            logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
            return;
        }
        if (executorParam != null) {
            jobInfo.setExecutorParam(executorParam);
        }
        int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
        XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

        // cover addressList
        if (addressList!=null && addressList.trim().length()>0) {
            group.setAddressType(1);
            group.setAddressList(addressList.trim());
        }

        // sharding param
        int[] shardingParam = null;
        if (executorShardingParam!=null){
            String[] shardingArr = executorShardingParam.split("/");
            if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                shardingParam = new int[2];
                shardingParam[0] = Integer.valueOf(shardingArr[0]);
                shardingParam[1] = Integer.valueOf(shardingArr[1]);
            }
        }
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                && shardingParam==null) {
            for (int i = 0; i < group.getRegistryList().size(); i++) {
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
            }
        } else {
            if (shardingParam == null) {
                shardingParam = new int[]{0, 1};
            }
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
        }

    }

1.进行一系列参数的check;

2.分片参数处理:这个自动调用情况下分片参数为空的?需要定制化一下

3.任务分发:分片就分片分发;普通任务就找第一个地址触发;

2.6 触发任务

com.xxl.job.admin.core.trigger.XxlJobTrigger –> processTrigger() 触发任务

/**
     * @param group                     job group, registry list may be empty
     * @param jobInfo
     * @param finalFailRetryCount
     * @param triggerType
     * @param index                     sharding index
     * @param total                     sharding index
     */
    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

        // param
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

        // 1、save log-id
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());
        jobLog.setTriggerTime(new Date());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、init trigger-param
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
        triggerParam.setExecutorParams(jobInfo.getExecutorParam());
        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
        triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
        triggerParam.setLogId(jobLog.getId());
        triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
        triggerParam.setGlueType(jobInfo.getGlueType());
        triggerParam.setGlueSource(jobInfo.getGlueSource());
        triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
        triggerParam.setBroadcastIndex(index);
        triggerParam.setBroadcastTotal(total);

        // 3、init address
        String address = null;
        ReturnT<String> routeAddressResult = null;
        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                if (index < group.getRegistryList().size()) {
                    address = group.getRegistryList().get(index);
                } else {
                    address = group.getRegistryList().get(0);
                }
            } else {
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                    address = routeAddressResult.getContent();
                }
            }
        } else {
            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
        }

        // 4、trigger remote executor
        ReturnT<String> triggerResult = null;
        if (address != null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }

        // 5、collection trigger info
        StringBuffer triggerMsgSb = new StringBuffer();
        triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
        if (shardingParam != null) {
            triggerMsgSb.append("("+shardingParam+")");
        }
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

        triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

        // 6、save log trigger-info
        jobLog.setExecutorAddress(address);
        jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
        jobLog.setExecutorParam(jobInfo.getExecutorParam());
        jobLog.setExecutorShardingParam(shardingParam);
        jobLog.setExecutorFailRetryCount(finalFailRetryCount);
        //jobLog.setTriggerTime();
        jobLog.setTriggerCode(triggerResult.getCode());
        jobLog.setTriggerMsg(triggerMsgSb.toString());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    }

1.初始化一系列参数

2.调用rpc客户端向执行器发送请求;客户端放在一个map内复用;

3.保存一系列参数;

2.7 封装的http请求

public class AdminBizClient implements AdminBiz {

    public AdminBizClient() {
    }
    public AdminBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }

    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;


    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
    }

}

3、执行器源码解析

3.1 基本说明

XXL-Job由调度中心与执行器组成;调度中心把任务派发给到每个执行器,执行器作为一个服务端接收调度中心的请求并进行执行;

xxl-job-core:执行器的核心代码!执行器必须要导入此依赖并且将依赖一起打包部署到服务器

3.2 Excutor

com.xxl.job.executor.sample.frameless.FramelessApplication —-执行器的入口程序:包括初始化与启动

public static void main(String[] args) {

        try {
            // start
            // 初始化XxlJobSimpleExecutor;并启动:
            FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();

            // Blocks until interrupted
            while (true) {
                try {
                    TimeUnit.HOURS.sleep(1);
                } catch (InterruptedException e) {
                    break;
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            // destory 调用XxlJobSimpleExecutor的Destory方法
            FrameLessXxlJobConfig.getInstance().destoryXxlJobExecutor();
        }
    }
/**
 * init
 */
public void initXxlJobExecutor() {

    // load executor prop
    Properties xxlJobProp = loadProperties("xxl-job-executor.properties");

    // init executor
    xxlJobExecutor = new XxlJobSimpleExecutor();
    xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
    //设置验证Token
    xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
    //设置appname,调度器通过此进行调度器的分组;
    xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
    //设置网卡的ip与port;不设置随机选择网卡
    xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
    xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
    xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
    xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
    xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));

    //注册执行方法------handler
    // registry job bean
    xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));

    // start executor
    try {
        //启动执行器
        xxlJobExecutor.start();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}

1.这部分是用户需要实现的部分;可以使用自带的实例实现简单的功能;也可以只改方法的执行部分,其他不变更;

2.appname 很重要,XxlJob使用此进行执行器的分组;通过每一个job指定执行器,实现一个job的分组执行的效果;

3.4 启动Executor

com.xxl.job.core.executor.impl.XxlJobSimpleExecutor –> start() 启动Executor

初始化bean
public void start() {

       // init JobHandler Repository (for method)   List的信息为每个handler方法的object;将每个方法放入
       initJobHandlerMethodRepository(xxlJobBeanList);

       // super start
       try {
           //启动调度器
           super.start();
       } catch (Exception e) {
           throw new RuntimeException(e);
       }
   }
private void initJobHandlerMethodRepository(List<Object> xxlJobBeanList) {
        if (xxlJobBeanList==null || xxlJobBeanList.size()==0) {
            return;
        }

        // init job handler from method:
        for (Object bean: xxlJobBeanList) {
            // method 取出方法----handler  使用@XxlJob注释的;
            Method[] methods = bean.getClass().getDeclaredMethods();
            if (methods==null || methods.length==0) {
                continue;
            }
            for (Method executeMethod : methods) {

                // anno
                XxlJob xxlJob = executeMethod.getAnnotation(XxlJob.class);
                if (xxlJob == null) {
                    continue;
                }

                String name = xxlJob.value();
                if (name.trim().length() == 0) {
                    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                }
                //检查是否已经注册在map jobHandlerRepository 中了;单线程操作
                if (loadJobHandler(name) != null) {
                    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                }

                // execute method
                /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                    throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }
                if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                    throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }*/
                //关闭执行的java语法检查
                executeMethod.setAccessible(true);

                // init and destory
                Method initMethod = null;
                Method destroyMethod = null;

                if (xxlJob.init().trim().length() > 0) {
                    try {
                        initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                        initMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }
                if (xxlJob.destroy().trim().length() > 0) {
                    try {
                        destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                        destroyMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }

                // registry jobhandler ===== put in jobHandlerRepository
                registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

            }
        }
    }

1 将每个方法放入XxlJobExecutor的jobHandlerRepository;通过反射获取所有使用了XxlJob注解的方法;

2 调用父类的启动方法;

这部分很灵活,可以自行继承XxlJobExecutor进行实现,满足一些复杂的业务场景

启动

com.xxl.job.core.executor.XxlJobExecutor –> start() 启动

// ---------------------- start + stop ----------------------
   public void start() throws Exception {

       // init logpath 如果不存在:创建日志路径与/gluesource
       XxlJobFileAppender.initLogPath(logPath);

       // init invoker, admin-client :把配置的调度器对象放入List
       initAdminBizList(adminAddresses, accessToken);


       // init JobLogFileCleanThread :启动一个守护线程清除日志;1天一次清除----需要改代码变更;
       JobLogFileCleanThread.getInstance().start(logRetentionDays);

       // init TriggerCallbackThread : 启动两个回调守护线程;包括回调与回调重试线程
       TriggerCallbackThread.getInstance().start();

       // init executor-server:初始化server服务
       initEmbedServer(address, ip, port, appname, accessToken);
   }

1.这里需要注意:清楚日志的线程按照日志文件夹的名称来进行时间判断的:yyyy-MM-dd

2.回调失败重试使用文件判断,如果一台机起多个进程共享日志文件可能由异常风险

// callback
        triggerCallbackThread = new Thread(new Runnable() {

            @Override
            public void run() {

                // normal callback
                while(!toStop){
                    try {
                        //阻塞的取值:这是一个阻塞队列
                        HandleCallbackParam callback = getInstance().callBackQueue.take();
                        if (callback != null) {

                            // callback list param
                            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                            //当List操作时进行此操作会出问题;
                            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                            callbackParamList.add(callback);

                            // callback, will retry if error
                            if (callbackParamList!=null && callbackParamList.size()>0) {
                                doCallback(callbackParamList);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }

                // last callback
                try {
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    //todo 不懂为什么用size,直接用写入的数量表示不好吗?
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                        doCallback(callbackParamList);
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

            }
        });
        triggerCallbackThread.setDaemon(true);
        triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
        triggerCallbackThread.start();


        // retry callback what failed; 根据失败文件来进行判断是否失败
        triggerRetryCallbackThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while(!toStop){
                    try {
                        //通过检测回调失败日志检测,调用回调方法回调
                        retryFailCallbackFile();
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
            }
        });
        triggerRetryCallbackThread.setDaemon(true);
        triggerRetryCallbackThread.start();

com.xxl.job.core.thread.TriggerCallbackThread#doCallback

/**
     * do callback, will retry if error
     * @param callbackParamList
     */
    private void doCallback(List<HandleCallbackParam> callbackParamList){
        boolean callbackRet = false;
        // callback, will retry if error
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                //adminBiz进行回调
                ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
                if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                    logger.info("com.xxl.job.core.thread.TriggerCallbackThread.doCallback:===="+callbackParamList.get(0));
                    callbackRet = true;
                    break;
                } else {
                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
                }
            } catch (Exception e) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
            }
        }
        if (!callbackRet) {
            //回调失败,写回调失败日志,写入回调失败的参数
            appendFailCallbackFile(callbackParamList);
        }
    }

3.5注册到调度器并启动服务

com.xxl.job.core.executor.XxlJobExecutor –> initEmbedServer ()

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

        // fill ip port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

        // generate address
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // accessToken
        if (accessToken==null || accessToken.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
        }

        // start 前面全是准备address和token
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }
注册执行器并启动注册器

com.xxl.job.core.server.EmbedServer –> start(),创建一个Serverchannel进行请求的处理;

public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {

        @Override
        public void run() {

            // param
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });


            try {
                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind todo 暂时看不懂
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // start registry
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                if (e instanceof InterruptedException) {
                    logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                } else {
                    logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                }
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        }

    });
    thread.zhi(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}
  1. 创建一个容量为2000的线程池;
  2. 以线程池为基准进行一个Channel的创建,绑定指定的端口
  3. 启动注册与心跳线程—-注册多个调度器;
  4. 同步等待closeFuture,达到线程的阻塞;
注册与心跳线程

com.xxl.job.core.thread.ExecutorRegistryThread –> start()

public void start(final String appname, final String address){

    // valid
    if (appname==null || appname.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
        return;
    }
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
        return;
    }

    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {

            // registry
            while (!toStop) {
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registry(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    if (!toStop) {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    }
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }
            }

            // registry remove
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }

                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

        }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
}
  1. 参数check
  2. 运行情况下进行注册/心跳
  3. tostop情况下撤掉心跳

3.6 调度任务的处理

com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler 静态内部类

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

            // valid
            if (HttpMethod.POST != httpMethod) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
            }
            if (uri==null || uri.trim().length()==0) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
            }
            if (accessToken!=null
                    && accessToken.trim().length()>0
                    && !accessToken.equals(accessTokenReq)) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
            }

            // services mapping;调用指定的方法
            try {
                if ("/beat".equals(uri)) {
                    return executorBiz.beat();
                } else if ("/idleBeat".equals(uri)) {
                    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                    return executorBiz.idleBeat(idleBeatParam);
                } else if ("/run".equals(uri)) {
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                    return executorBiz.run(triggerParam);
                } else if ("/kill".equals(uri)) {
                    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                    return executorBiz.kill(killParam);
                } else if ("/log".equals(uri)) {
                    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                    return executorBiz.log(logParam);
                } else {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
            }
        }
  1. 进行规则check;
  2. 进行方法mapping:此处分析run请求的流程

3.7 运行

com.xxl.job.core.biz.impl.ExecutorBizImpl –> run() 真正的run方法

@Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobId + jobThread  从jobThreadRepository获取
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
       // 获取到当前线程中执行的handler;
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        //整个判断主要干一件事:获取执行的handler;
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler 更新handler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler通过目录加载新的一个handler:调度器会把代码放在json里面传过来。。。
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        if (jobThread == null) {
            //注册一个jobId的执行线程 并启动
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }
  1. 获取jobId对应的执行线程
  2. 获取到执行的handler:GLUE模式是把代码直接放入json传到执行器;注定只能调试使用无法承担麻烦的逻辑
  3. 更新JobHandler,如果没有的话创建jobHandler
  4. 把data放入线程的队列:一个阻塞式队列

总结:一共两个线程池:ServerChannel有一个2000的线程池;每一个jobId在执行器会起一个线程执行job;

3.8 封装http请求

Xxl自写的RPC:com.xxl.job.core.biz.client.AdminBizClient—封装了http请求:

/**
 * admin api test
 *
 * @author xuxueli 2017-07-28 22:14:52
 */
public class AdminBizClient implements AdminBiz {

    public AdminBizClient() {
    }
    public AdminBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }

    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;


    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
    }

}

通过XxlJobRemotingUtil封装了post的请求;通过http请求向调度中心发送请求;

com.xxl.job.core.thread.JobThread#run 执行方法
@Override
public void run() {

   	// init
   	try {
		handler.init();
	} catch (Throwable e) {
   		logger.error(e.getMessage(), e);
	}

	// execute
	while(!toStop){
		running = false;
		idleTimes++;

           TriggerParam triggerParam = null;
           try {
			// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
			triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
			if (triggerParam!=null) {
				running = true;
				idleTimes = 0;
				triggerLogIdSet.remove(triggerParam.getLogId());

				// log filename, like "logPath/yyyy-MM-dd/9999.log"
				String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
				XxlJobContext xxlJobContext = new XxlJobContext(
						triggerParam.getJobId(),
						triggerParam.getExecutorParams(),
						logFileName,
						triggerParam.getBroadcastIndex(),
						triggerParam.getBroadcastTotal());

				// init job context
				XxlJobContext.setXxlJobContext(xxlJobContext);

				// execute
				XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());

				if (triggerParam.getExecutorTimeout() > 0) {
					// limit timeout
					Thread futureThread = null;
					try {
						FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
							@Override
							public Boolean call() throws Exception {

								// init job context
								XxlJobContext.setXxlJobContext(xxlJobContext);

								handler.execute();
								return true;
							}
						});
						futureThread = new Thread(futureTask);
						futureThread.start();

						Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
					} catch (TimeoutException e) {

						XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
						XxlJobHelper.log(e);

						// handle result
						XxlJobHelper.handleTimeout("job execute timeout ");
					} finally {
						futureThread.interrupt();
					}
				} else {
					// just execute
					handler.execute();
				}

				// valid execute handle data
				if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
					XxlJobHelper.handleFail("job handle result lost.");
				} else {
					String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
					tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
							?tempHandleMsg.substring(0, 50000).concat("...")
							:tempHandleMsg;
					XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
				}
				XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
						+ XxlJobContext.getXxlJobContext().getHandleCode()
						+ ", handleMsg = "
						+ XxlJobContext.getXxlJobContext().getHandleMsg()
				);

			} else {
				if (idleTimes > 30) {
					if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
						XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
					}
				}
			}
		} catch (Throwable e) {
			if (toStop) {
				XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
			}

			// handle result
			StringWriter stringWriter = new StringWriter();
			e.printStackTrace(new PrintWriter(stringWriter));
			String errorMsg = stringWriter.toString();

			XxlJobHelper.handleFail(errorMsg);

			XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
		} finally {
               if(triggerParam != null) {
                   // callback handler info
                   if (!toStop) {
                       // commonm
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                       		triggerParam.getLogId(),
							triggerParam.getLogDateTime(),
							XxlJobContext.getXxlJobContext().getHandleCode(),
							XxlJobContext.getXxlJobContext().getHandleMsg() )
					);
                   } else {
                       // is killed
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                       		triggerParam.getLogId(),
							triggerParam.getLogDateTime(),
							XxlJobContext.HANDLE_COCE_FAIL,
							stopReason + " [job running, killed]" )
					);
                   }
               }
           }
       }

	// callback trigger request in queue
	while(triggerQueue !=null && triggerQueue.size()>0){
		TriggerParam triggerParam = triggerQueue.poll();
		if (triggerParam!=null) {
			// is killed
			TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
					triggerParam.getLogId(),
					triggerParam.getLogDateTime(),
					XxlJobContext.HANDLE_COCE_FAIL,
					stopReason + " [job not executed, in the job queue, killed.]")
			);
		}
	}

	// destroy
	try {
		handler.destroy();
	} catch (Throwable e) {
		logger.error(e.getMessage(), e);
	}

	logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
  1. 取出待执行队列中的任务参数
  2. 调用jobhanduler的init execute 方法来执行具体的任务逻辑;如果带延时的话,用一个回调的FutureTask来做回调确认;
  3. 执行完成后把参数放入回调队列;通过回调线程调度中心获取到任务执行状态;
  4. 如果线程队列没有任务的话就会移除当前线程。

此处直接执行execute;没有取出参数什么的,但是调用回调给定制化留下了大量操作空间

3.9 回调分析

当执行器回调到调度中心后:com.xxl.job.admin.controller.JobApiController –> api

@RequestMapping("/{uri}")
   @ResponseBody
   @PermissionLimit(limit=false)
   public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {

       // valid
       if (!"POST".equalsIgnoreCase(request.getMethod())) {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
       }
       if (uri==null || uri.trim().length()==0) {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
       }
       if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
               && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
               && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
       }

       // services mapping
       if ("callback".equals(uri)) {
           List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
           return adminBiz.callback(callbackParamList);
       } else if ("registry".equals(uri)) {
           RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
           return adminBiz.registry(registryParam);
       } else if ("registryRemove".equals(uri)) {
           RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
           return adminBiz.registryRemove(registryParam);
       } else {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
       }
   }

调用到 com.xxl.job.admin.core.thread.JobCompleteHelper#callback(java.util.List<com.xxl.job.core.biz.model.HandleCallbackParam>)

// ---------------------- helper ----------------------

	public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {

		callbackThreadPool.execute(new Runnable() {
			@Override
			public void run() {
				for (HandleCallbackParam handleCallbackParam: callbackParamList) {
					ReturnT<String> callbackResult = callback(handleCallbackParam);
					logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
							(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
				}
			}
		});

		return ReturnT.SUCCESS;
	}

	private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
		// valid log item
		XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
		if (log == null) {
			return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
		}
		if (log.getHandleCode() > 0) {
			return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
		}

		// handle msg
		StringBuffer handleMsg = new StringBuffer();
		if (log.getHandleMsg()!=null) {
			handleMsg.append(log.getHandleMsg()).append("<br>");
		}
		if (handleCallbackParam.getHandleMsg() != null) {
			handleMsg.append(handleCallbackParam.getHandleMsg());
		}

		// success, save log
		log.setHandleTime(new Date());
		log.setHandleCode(handleCallbackParam.getHandleCode());
		log.setHandleMsg(handleMsg.toString());
		XxlJobCompleter.updateHandleInfoAndFinish(log);

		return ReturnT.SUCCESS;
	}

进行完成处理:

/**
 * common fresh handle entrance (limit only once)
 *
 * @param xxlJobLog
 * @return
 */
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {

    // finish 此处会进行依赖任务的触发
    finishJob(xxlJobLog);

    // text最大64kb 避免长度过长
    if (xxlJobLog.getHandleMsg().length() > 15000) {
        xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );
    }

    // fresh handle
    return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}

文章作者: wmg
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wmg !
  目录