Elastic-Job项目源码分析1--核心骨架JobScheduler

    xiaoxiao2021-03-25  53

    简介

    我觉得,还是原文写的好,大家可以去看看:

    https://github.com/dangdangdotcom/elastic-job/blob/master/README.md

    说下我喜欢的原因吧

    可以和现有的工程对接 支持任务分片,这个太重要了!!弹性,水平扩展很好;如果有机器宕机,由其他机器执行可视化的、web管理平台代码架构不错,虽然各种各样的service,看起来很令人头疼

    Elastic-job程序入口 –JobScheduler

    无论开发还是调试代码,都必须找到程序的入口,否则就是没有头的苍蝇,不知道到来龙去脉 elastic-job-example\elastic-job-example-lite-java\src\main\java\com\dangdang\ddframe\job\example\JavaMain.java

    public static void main(final String[] args) throws IOException { // CHECKSTYLE:ON EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT); CoordinatorRegistryCenter regCenter = setUpRegistryCenter(); JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(setUpEventTraceDataSource()); setUpSimpleJob(regCenter, jobEventConfig); // setUpDataflowJob(regCenter, jobEventConfig); // setUpScriptJob(regCenter, jobEventConfig); }

    我们看到这里启动了三种Job,分别是SimpleJob、DataflowJob、ScriptJob,这三种job本质上是不同的,其内部实现采用了3中不同的执行器(稍后讲解)。

    我们先看下setUpSimpleJob

    private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) { JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob", "0 5 * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName()); new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig, new JavaSimpleListener(), new JavaSimpleDistributeListener(1000L, 2000L)).init(); }

    就几行代码,重点来了,JobScheduler这是一个很重要的类!!!里面创建了很多服务 同时是任务的启动源头。在看代码时,大家可能看的有点懵逼,建议画个结构图,很简单,就是把核心的类、方法抽取出来,如下图

    几大服务创建好了,就需要启动它,就是JobScheduler.init 方法了

    这里是列举除了主要模块服务。 之前不是说elastic-job是基于quartz之上嘛,那怎么没看到quartz?客官稍等就来。 来,我们看看JobScheduler.init

    /** * 初始化作业. */ public void init() { jobExecutor.init(); JobTypeConfiguration jobTypeConfig = jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig(); JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName); jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron()); jobRegistry.addJobScheduleController(jobName, jobScheduleController); }

    看到createScheduler 没

    private JobDetail createJobDetail(final String jobClass) { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build(); result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade); Optional<ElasticJob> elasticJobInstance = createElasticJobInstance(); if (elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { try { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); } catch (final ReflectiveOperationException ex) { throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass); } } return result; } protected Optional<ElasticJob> createElasticJobInstance() { return Optional.absent(); } private Scheduler createScheduler(final boolean isMisfire) { Scheduler result; try { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(getBaseQuartzProperties(isMisfire)); result = factory.getScheduler(); result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener()); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } return result; } private Properties getBaseQuartzProperties(final boolean isMisfire) { Properties result = new Properties(); result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName()); result.put("org.quartz.threadPool.threadCount", "1"); result.put("org.quartz.scheduler.instanceName", jobName); if (!isMisfire) { result.put("org.quartz.jobStore.misfireThreshold", "1"); } result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName()); result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString()); return result; }

    看到result.put("org.quartz.threadPool.threadCount", "1"); 这到这个没,不得不赞亮哥的想法

    这里还有一个亮点就是createJobDetail ,用LiteJob代理了目标类,因为他门的cron是一样的。我们看下这里的LiteJob

    /** * Lite调度作业. * * @author zhangliang */ public static final class LiteJob implements Job { @Setter private ElasticJob elasticJob; @Setter private JobFacade jobFacade; @Override public void execute(final JobExecutionContext context) throws JobExecutionException { JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute(); } }

    简短的令人发指!!!

    看到JobExecutorFactory,就会知道上面三种不同的job,其实是对应了不同的Executor。

    好了今天就到此

    转载请注明原文地址: https://ju.6miu.com/read-36073.html

    最新回复(0)