一、QuartzSchedulerThread的run方法大致阐述 先说一下run方法的执行时机: 当Quartzscheduler执行start方法时,方法体中有一句 schedThread。togglePause(false);,接着就会调用QuartzSchedulerThread下的togglePause方法,将paused置为false,在此之后,QuartzSchedulerThread下的run方法开始真正运行通知主处理循环在下一个可能的点暂停voidtogglePause(booleanpause){synchronized(sigLock){if(paused){signalSchedulingChange(0);}else{sigLock。notifyAll();}}}复制代码publicvoidrun(){intacquiresFailed0;这里就是判断调度器是否该停止,如果没有收到信号的话,这个调度器是一直处于循环之中的while(!halted。get()){try{这里是检查我们是否应该暂停synchronized(sigLock){我们在初始化的时候,paused是置为true的,因此在上文中,我们才说当Quartzscheduler执行start方法时调用togglePause,将paused置为false,run方法才开始运行也是因为此处的判断while(paused!halted。get()){try{sigLock。wait(1000L);}catch(InterruptedExceptionignore){}acquiresFailed0;}if(halted。get()){}}如果从作业存储中读取一直失败(例如数据库关闭或重新启动)就会等待一段时间if(acquiresFailed1){try{这里就是计算延迟时间longdelaycomputeDelayForRepeatedErrors(qsRsrcs。getJobStore(),acquiresFailed);Thread。sleep(delay);}catch(Exceptionignore){}}从线程池拿出空闲可利用的线程数量这里多谈一嘴blockForAvailableThreads()方法它是一个阻塞式方法,直到至少有一个可用线程。intavailThreadCountqsRsrcs。getThreadPool()。blockForAvailableThreads();if(availThreadCount0){ListOperableTlongnowSystem。currentTimeMillis();清除信号调度变更clearSignaledSchedulingChange();try{如果可用线程数量足够那么就是30后再次扫描,acquireNextTriggers方法的三个参数的意思分别是:idleWaitTime:为如果没有的再次扫描的时间,默认是privatestaticlongDEFAULTIDLEWAITTIME30L1000L;30秒Math。min(availThreadCount,qsRsrcs。getMaxBatchSize()):这里的意思就是一次最多能取几个出来batchTimeWindow:默认是0,同样是一个时间范围,如果有两个任务只差一两秒,而执行线程数量满足及batchTimeWindow时间也满足的情况下就会两个都取出来具体的方法的执行,后文再看triggersqsRsrcs。getJobStore()。acquireNextTriggers(nowidleWaitTime,Math。min(availThreadCount,qsRsrcs。getMaxBatchSize()),qsRsrcs。getBatchTimeWindow());acquiresFailed0;if(log。isDebugEnabled()){。。。}在获取到triggers触发器不为空后,trigger列表是以下次执行时间排序查出来的if(triggers!null!triggers。isEmpty()){nowSystem。currentTimeMillis();取出集合中最早执行的触发器获取它的下一个触发时间longtriggerTimetriggers。get(0)。getNextFireTime()。getTime();longtimeUntilTriggertriggerT判断距离执行时间是否大于2毫秒while(timeUntilTrigger2){synchronized(sigLock){if(halted。get()){}判断是不是距离触发事件最近的,if(!isCandidateNewTimeEarlierWithinReason(triggerTime,false)){try{没有的话,就进行阻塞,稍后进行执行nowSystem。currentTimeMillis();timeUntilTriggertriggerTif(timeUntilTrigger1)sigLock。wait(timeUntilTrigger);}catch(InterruptedExceptionignore){}}}if(releaseIfScheduleChangedSignificantly(triggers,triggerTime)){}nowSystem。currentTimeMillis();timeUntilTriggertriggerT}thishappensifreleaseIfScheduleChangedSignificantlydecidedtoreleasetriggersif(triggers。isEmpty())settriggerstoexecutingListTriggerFiredResultbndlesnewArrayListTriggerFiredResult();booleangoAsynchronized(sigLock){goAhead!halted。get();}if(goAhead){try{开始根据需要执行的trigger从数据库中获取相应的JobDetail同时这一步也更新了triggers的状态,稍后会讲到ListTriggerFiredResultresqsRsrcs。getJobStore()。triggersFired(triggers);if(res!null)}catch(SchedulerExceptionse){qs。notifySchedulerListenersError(Anerroroccurredwhilefiringtriggerstriggers,se);for(inti0;itriggers。size();i){qsRsrcs。getJobStore()。releaseAcquiredTrigger(triggers。get(i));}}}将查询到的结果封装成为TriggerFiredResultfor(inti0;ibndles。size();i){TriggerFiredResultresultbndles。get(i);TriggerFiredBundle用于将执行时数据从JobStore返回到QuartzSchedulerThread。TriggerFiredBundlebndleresult。getTriggerFiredBundle();Exceptionexceptionresult。getException();if(exceptioninstanceofRuntimeException){getLog()。error(RuntimeExceptionwhilefiringtriggertriggers。get(i),exception);qsRsrcs。getJobStore()。releaseAcquiredTrigger(triggers。get(i));}if(bndlenull){qsRsrcs。getJobStore()。releaseAcquiredTrigger(triggers。get(i));}JobRunStry{把任务封装成JobRunShell线程任务,JobRunShellextendsSchedulerListenerSupportimplementsRunnable是实现了Runnable接口的然后放到线程池中跑动。shellqsRsrcs。getJobRunShellFactory()。createJobRunShell(bndle);shell。initialize(qs);}catch(SchedulerExceptionse){qsRsrcs。getJobStore()。triggeredJobComplete(triggers。get(i),bndle。getJobDetail(),CompletedExecutionInstruction。SETALLJOBTRIGGERSERROR);}别看这里是个if判断但是这里就是将obshell放进线程池执行的地方利用的就是booleanrunInThread(Runnablerunnable);方法这个方法的作用就是在下一个可用的Thread中执行给定Runnableif(qsRsrcs。getThreadPool()。runInThread(shell)false){getLog()。error(ThreadPool。runInThread()returnfalse!);qsRsrcs。getJobStore()。triggeredJobComplete(triggers。get(i),bndle。getJobDetail(),CompletedExecutionInstruction。SETALLJOBTRIGGERSERROR);}}while(!halted)}}else{if(availThreadCount0)shouldneverhappen,ifthreadPool。blockForAvailableThreads()while(!halted)}longnowSystem。currentTimeMillis();longwaitTimenowgetRandomizedIdleWaitTime();longtimeUntilContinuewaitTsynchronized(sigLock){。。。。}}。。。。}while(!halted)。。。。}复制代码二、一些细节2。1、先获取线程池中的可用线程数量 (若没有可用的会阻塞,直到有可用的);intavailThreadCountqsRsrcs。getThreadPool()。blockForAvailableThreads();复制代码2。2、获取30m内要执行的trigger (即acquireNextTriggers) 我们来看一看acquireNextTriggers方法 首先说acquireNextTriggers具体实现是在JobStoreSupport中,同时quartz与数据库关联的实现大都在JobStoreSupport中,当然更具体的SQL执行还是在DriverDelegate接口下的。 acquireNextTriggers做了哪些事情呢? 我们看看这两个方法: 首先看第一个acquireNextTrigger(conn,noLaterThan,maxCount,timeWindow); 主要就是获取下一个30m内可执行的triggers的触发器,在里面JobStoreSupport从数据库取出triggers时是按照nextFireTime排序的 更具体的就需要大家点进方法去看啦另外里面还包含triggers状态的变更,属于是更加细节化的东西。 第二个就是获取到触发的触发记录 然后在执行executeInNonManagedTXLock时,是需要先获得锁,之后再在提交时释放锁的。 待直到获取的trigger中最先执行的trigger在2ms内;if(triggers!null!triggers。isEmpty()){nowSystem。currentTimeMillis();longtriggerTimetriggers。get(0)。getNextFireTime()。getTime();longtimeUntilTriggertriggerTwhile(timeUntilTrigger2){。。。}}复制代码2。3、triggersFired(triggers) ListresqsRsrcs。getJobStore()。triggersFired(triggers); 这一步看着只是获取了List对象,实际上在triggersFired(triggers)方法中隐藏了很多东西 首先查询,确保触发器没有被删除、暂停或完成。。。,就更新firedTrigger的statusSTATEEXECUTING;代码的注释上还说,如果没有这些就会将状态该为deleted 另外就是更新触发触发器:更新trigger下一次触发的时间;更新trigger的状态: 如果下一次的执行时间为空,状态则改为STATECOMPLETE 在执行executeInNonManagedTXLock方法时,提交前先获得锁,transOwnergetLockHandler()。obtainLock(conn,lockName); 最后是释放锁:commitConnection(conn);2。4、创建JobRunShell,放进线程池执行 针对每个要执行的trigger,创建JobRunShell,并放入线程池执行: 然后由execute:执行job 更详细的看不下去啦 来源:https:juejin。cnpost7131671438901116935