这篇文章记录了我基于Spring Boot 2.0.4 RELEASE版本整合Elastic Job的过程,以及对Elastic Job的基本介绍

Java开发中定时任务我们一般使用Timer,quartz或者Spring的Spring Task等等,当服务单节点部署时用哪个定时器都是没有什么问题的。但是我们生产环境肯定不可能是单节点的,那么多节点部署就有如下几个问题:

每个应用服务都会执行一样的定时任务,重复多次执行浪费资源不说还可能会引发系统逻辑错误。

如果控制了每次只需要其中一个节点去执行任务,解决了可能多次执行引发的系统逻辑错误,但是又出现了两个新的问题,一是如果执行任务的这个节点宕机怎么办?二是需要处理的任务过多时,处理任务的节点可能会不堪重负,而其他节点又只能在那里空闲“喝茶”。

Elastic Job解决了上述的问题,Elastic Job将一个任务进行分片,分配到各个应用服务去执行,如果其中某个节点宕机,Elastic Job将在下次把宕掉的节点删除,对任务重新分片,保证任务的执行成功。

Elastic Job是当当开源的一个分布式调度框架,它由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化,使用时在工程中导入jar包即可,外部仅依赖Zookeeper,本文也只是讲集成Elastic-Job-Lite。下面开始整合。

Spring Boot 2.0.4

Zookeeper 3.4.13

JDK安装不讲

Spring Boot框架搭建不讲,通常在上面,选择自己需要的组件下载下来,导入到eclipse或者IintelliJI IDEA就能跑起来,如果很不幸你搭建不顺利的话那就自行解决吧哈哈。

pom.xml中增加

<properties>

<elastic-job.version>2.1.5</elastic-job.version>

<curator.version>2.10.0</curator.version>

</properties>

<dependency>

<groupId>com.dangdang</groupId>

<artifactId>elastic-job-common-core</artifactId>

<version>${elastic-job.version}</version>

</dependency>

<dependency>

<groupId>com.dangdang</groupId>

<artifactId>elastic-job-lite-core</artifactId>

<version>${elastic-job.version}</version>

</dependency>

<dependency>

<groupId>com.dangdang</groupId>

<artifactId>elastic-job-lite-spring</artifactId>

<version>${elastic-job.version}</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-test</artifactId>

<version>${curator.version}</version>

</dependency>

4.application.yml配置

#zookeeper配置

serverList: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

namespace: elastic-job

#作业任务配置

dataFlowJob:

cron: 0/5 * * * * ?

shardingTotalCout: 3

shardingItemParameters: 0=0,1=1,2=2

5.zookeeper初始化

* @author Chonpan

* zookeeper初始化,本地调试时需要先启动zookeeper

@Configuration

@ConditionalOnExpression("'${regCenter.serverList}'.length()>0")

public class RegistryCenterConfig{

@Bean(initMethod="init")

public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {

return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));

Elastic-Job提供Simple,Dataflow和Script3种作业类型,以Dataflow作业类型为例。 初始化Dataflow作业

@Configuration

public class DataflowJobConfig{

private ZookeeperRegistryCenter regCenter;

@Bean(initMethod="init")

public JobScheduler dataFlowJobScheduler(@Value("${dataFlow.jobName}") final String jobName,@Value("${dataFlow.cron}") final String cron, @Value("${dataFlow.shardingTotalCount}") final int shardingTotalCount, @Value("${dataFlow.shardingItemParameters}") final String shardingItemParameters){

return new JobScheduler(regCenter, createJobConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters));

private static LiteJobConfiguration createJobConfiguration(String jobName, String cron, int shardingTotalCount, String shardingItemParameters){

JobCoreConfiguration coreConfig= JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();

DataflowConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig, MyDataflowJob.class.getCanonicalName(), true);

return LiteJobConfiguration.newBuilder(dataflowConfig).build();

实现自己的DataflowJob,DataflowJob接口提供了两个方法可供实现,一个是fetchData用于抓取数据,一个是processData用于处理抓取的数据。

public class MyElasticJob implements DataflowJob<Foo> {

public List<Foo> fetchData(ShardingContext context) {

switch (context.getShardingItem()) {

List<Foo> data = // get data from database by sharding item 0

return data;

List<Foo> data = // get data from database by sharding item 1

return data;

List<Foo> data = // get data from database by sharding item 2

return data;

// case n: ...

public void processData(ShardingContext shardingContext, List<Foo> data) {

// process data

我是Win7上面搭建的单机伪分布式集群,Windows和Linux上搭建是一样的,单机伪分布式集群和分布式集群也是一样的,只不过前者将zk装在一台机器上起多个进程,后者是多台机器上分别安装zk,很简单请自行安装。

先启动zookeeper集群,然后启动服务。因为我本地在eclipse里面测试不方便起多个服务,所以下面展示下服务部署在Linux测试环境两台机器下的效果,任务分片数为3,

可以看出第一台机器处理2和0的任务,第二台机器处理1的任务。