Quartz集群分析

阿凡达2018-07-09 09:56

一、问题背景

严选openapi系统是承接外部渠道和严选内部系统的桥梁,每天都有大量的信息需要推送给外部渠道。这些信息被要求正确地送达外部渠道,同时要求具备负载均衡特性。

问题分析

这一问题可以被细化成如下几方面:

  1. 如何推送给外部渠道?
  2. 如何保证正确送达?
  3. 需要做负载均衡,提高吞吐率。

如何推送可以使用http来作为信息交流手段;正确送达着眼于信息落地和错误重试;负载均衡更多地考虑减轻openapi系统的压力。

技术选型

针对上述问题,我们选用基于Quartz来实现整个推送系统。Quartz天然支持集群,任务落地,定时任务等,这些支持也是我们需要的。下面就Quartz集群实现的一些细节和坑分析一二。

本文基于Quartz2.2.1


二、Quartz集群分析

在作分析之前,首先介绍下Quartz的基本概念:

  • Job: 任务描述
  • Scheduler:任务调度
  • Trigger: 任务的触发时间描述
  • OperableTrigger: 任务的运行时数据

一个典型的Quartz集群结构图如下所示:


Quartz集群中每个节点都是一个独立的Quartz应用,通过共享DB来实现任务调度。
单个Quartz应用维护三种重要的线程来实现任务调度,集群检测和任务执行,分别对应QuartzSchedulerThread、ClusterManager、JobRunShell。

JobRunShell

任务执行线程,线程的执行由可配置的线程池(org.quartz.threadPool相关)来维护,由QuartzSchedulerThread线程来启动。

QuartzSchedulerThread

任务调度线程,由QuartzScheduler创建,启动,维护。它的任务就是轮询地找到fired的trigger,然后创建JobRunShell线程来执行相关任务。

整个Job执行的流程图如下所示:

 

整个执行流程看来比较简单,具体的代码分别在QuartzSchedulerThread和JobRunShell类中。下面就关键流程的代码做下分析。

Job执行流程说明

1、QuartzSchedulerThread检索待触发的trigger

/**
 * QuartzSchedulerThread线程执行, 从TRIGGERS表中将要触发的OperableTrigger。关键SQL如下: 
 *
 * SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM TRIGGERS WHERE SCHED_NAME = 'scheduler_name' AND TRIGGER_STATE = ? 
 * AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) 
 * ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
 *
 * SQL说明如下:
 *    1、OperableTrigger.TRIGGER_TYPE == WAITING
 *    2、NEXT_FIRE_TIME <= org.quartz.scheduler.idleWaitTime(默认30秒) + 当前时间 + org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow(默认0秒)
 *  3、非missfire状态的OperableTrigger必须满足: NEXT_FIRE_TIME >= 当前时间 - org.quartz.jobStore.misfireThreshold(默认60秒) 
 *     4、单次最多取Math.min(线程池剩余线程数, org.quartz.scheduler.batchTriggerAcquisitionMaxCount(默认1))个OperableTrigger
 */
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

如果将org.quartz.scheduler.idleWaitTime配置成无限大的数,那任一OperableTrigger都可以能被检索出来,造成OperableTrigger被提前执行的问题。针对这一情况,Quartz做了如下处理:

now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
    synchronized (sigLock) {
        if (halted.get()) {
            break;
        }
        if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
            try {
                // we could have blocked a long while
                // on 'synchronize', so we must recompute
                now = System.currentTimeMillis();
                timeUntilTrigger = triggerTime - now;
                if(timeUntilTrigger >= 1)
                    sigLock.wait(timeUntilTrigger);
            } catch (InterruptedException ignore) {

            }
        }
    }
    if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
        break;
    }
    now = System.currentTimeMillis();
    timeUntilTrigger = triggerTime - now;
}

上述代码会对检索出的第一个OperableTrigger进行触发时间检查。

特别注意:

org.quartz.scheduler.batchTriggerAcquisitionMaxCount和线程池配置可以共同决定检索出的OperableTrigger数量,

触发时间检查只查了第一个OperableTrigger, 对于其他OperableTrigger也会有提前执行的情况存在。

2、QuartzSchedulerThread执行触发trigger操作

再讲触发操作之前,引入一个重要接口JobStore。它是SQL操作和Quartz业务的桥梁,通过org.quartz.jobStore.class配置具体事例。

特别注意:

如果使用spring集成的SchedulerFactoryBean,且设置了datasource属性。Quartz会使用LocalDataSourceJobStore覆盖掉org.quartz.jobStore.class配置。

具体的参见SchedulerFactoryBean中的initSchedulerFactory方法

以JobStoreTX为例,触发操作代码如下:

// call triggered - to update the trigger's next-fire-time state...
trigger.triggered(cal);

String state = STATE_WAITING;
boolean force = true;

if (job.isConcurrentExectionDisallowed()) {
    state = STATE_BLOCKED;
    force = false;
    try {
        getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
            STATE_BLOCKED, STATE_WAITING);
        getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
            STATE_BLOCKED, STATE_ACQUIRED);
        getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
            STATE_PAUSED_BLOCKED, STATE_PAUSED);
    } catch (SQLException e) {
        throw new JobPersistenceException(
                "Couldn't update states of blocked triggers: "
                        + e.getMessage(), e);
    }
} 

if (trigger.getNextFireTime() == null) {
    state = STATE_COMPLETE;
    force = true;
}

storeTrigger(conn, trigger, job, true, state, force, false);

job.getJobDataMap().clearDirtyFlag();

触发操作主要做了2个事情,修改OperableTrigger.nextFireTime和state。

特别注意: 对于循环执行的任务,如果任务类没有添加DisallowConcurrentExecution注解,会把state修改成WAITING状态。 因此会出现如下情况:

对于集群单节点来说:一个执行时间大于循环间隔时间的任务,会出现上一次循环还没执行完毕,下一次就开始执行的情况。

对于集群来说:一个循环任务会同时跑在不同的节点上。

3、QuartzSchedulerThread创建JobRunShell和具体的Job

这步操作比较明确,初始化JobRunShell线程,初始化我们自定义的Job实例。
单列是想描述Job实例的初始化是发生在QuartzSchedulerThread线程,不存在多线程的安全性问题。

4、JobRunShell执行Job

从这里开始,对于QuartzSchedulerThread线程来说,就是异步操作了。它运行在JobRunShell线程中,主要也是执行Job.execute方法。

5、JobRunShell执行完成trigger操作

主要是一些状态的复原操作,核心代码如下:

do {
    ......

    /**
     * 执行Job任务
     * 
     * 收集JobExecutionException
     */
    try {
        job.execute(jec);
    } catch (JobExecutionException jee) {
        endTime = System.currentTimeMillis();
        jobExEx = jee;
        getLog().info("Job " + jobDetail.getKey() +
            " threw a JobExecutionException: ", jobExEx);
    } catch (Throwable e) {
        endTime = System.currentTimeMillis();
        getLog().error("Job " + jobDetail.getKey() +
            " threw an unhandled Exception: ", e);
        SchedulerException se = new SchedulerException(
            "Job threw an unhandled exception.", e);
        qs.notifySchedulerListenersError("Job ("
            + jec.getJobDetail().getKey()
            + " threw an exception.", se);
        jobExEx = new JobExecutionException(se, false);
    }

    ......

    /**
     * 将JobExecutionException转发成下一步操作执行指令CompletedExecutionInstruction
     */
    CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
    ......

    instCode = trigger.executionComplete(jec, jobExEx);

    ......

    // update job/trigger or re-execute job
    if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
        jec.incrementRefireCount();
        try {
            complete(false);
        } catch (SchedulerException se) {
            qs.notifySchedulerListenersError("Error executing Job ("
                + jec.getJobDetail().getKey()
                + ": couldn't finalize execution.", se);
        }
        continue;
    }

    ......

    /**
     * QuartzScheduler根据instCode执行相关操作清理工作。
     * 
     * 修改Job.state也在这里
     */
    qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
    break;
} while (true);

想象下这样一个需求,循环任务满足某个条件之后需要把任务删除。针对这样一个需求有2中处理办法:

1、Job.execute方法中直接删除任务
2、Job.execute抛出设置unscheduleTrigg为true的JobExecutionException异常
使用方法2,比较契合Job的完整执行流程。但是Job不是立即删除,只是把state置为STATE_COMPLETE。集群下做集群检查时才会删除;非集群下会在Quartz应用重新启动时会删除。

特别注意: Job可以通过抛出JobExecutionException来影响任务的调度,这个影响是通过OperableTrigger来转换成CompletedExecutionInstruction实现的。 

6、锁机制

从上述流程中可以知道怎么任务执行是跨线程的,那必然存在相关的锁机制来保证相关数据的线程安全。前文提到JobStore接口是对底层SQL的封装层,并对上层Quartz业务提供支持。在Quartz设计中,锁机制是实现在具体的JobStore实例中的。以JobStoreTX为例,锁机制的流程如下图所示。

 

具体锁的实现可以参考Semaphore接口实现。JobStoreTX根据是否支持集群选择不同的锁实现,分别是StdRowLockSemaphore和SimpleSemaphore。
StdRowLockSemaphore通过select * from table where ... for update来实现上锁。
SimpleSemaphore则是通过wait(), notifyAll()来实现上锁。

特别注意:

基于mysql的StdRowLockSemaphore实现依赖于DB的autocommit值,而autocommit在LocalDataSourceJobStore中没有做处理,依赖spring的事务实现。

前文提到: 如果使用spring集成的SchedulerFactoryBean,且设置了datasource属性。Quartz会使用LocalDataSourceJobStore覆盖掉org.quartz.jobStore.class配置。

根本原因在于datasource的管理权在Spring还是在Quartz,但是在Spring管理时对于没有上锁居然没有任务提示,也是万万没想到。

MisfireHandler

这个线程是专门用来处理那些超过执行时间但是未被执行的任务(misfire),随QuartzScheduler启动而启动。大致处理流程如下:

  1. 检索出超过指定时间未执行的OperableTrigger
  2. 修改trigger.MISFIRE_INSTR指定misfire的处理办法
  3. QuartzSchedulerThread根据前面的处理办法,执行对应操作

具体代码请参见MisfireHandler类的实现。

ClusterManager

Quartz集群节点的监控和管理是由ClusterManager线程来完成的,随QuartzScheduler启动而启动。大致流程如下:

针对上述流程做如下说明: 

 

  1. 检查间隔由org.quartz.jobStore.clusterCheckinInterval配置,默认7.5秒
  2. 失效节点的recover操作,会把节点正在执行的trigger状态都改成STATE_WAITING,最后把状态为STATE_COMPLETE的trigger都删除掉。具体参见ClusterManager.clusterRecover实现。

特别注意:

如果一个任务执行时间大于org.quartz.jobStore.clusterCheckinInterval,就算任务添加了DisallowConcurrentExecution注解,也会因为集群节点检查会导致OperableTrigger锁定失效,最后被其它节点执行。

本文来自网易实践者社区,经作者雷琨授权发布。