资讯专栏INFORMATION COLUMN

分布式定时任务组件

Mertens / 839人阅读

摘要:基于的分布式任务调度组件,非常小巧,使用简单,只需要引入包。单个任务节点故障时自动转移到其他任务节点继续执行。和都是一整套的定时任务框架,没有必要强行将集成进来,专注做的分布式以及动态任务的封装。是之后自主开发的定时任务工具。

基于Spring Task + Zookeeper的分布式任务调度组件,非常小巧,使用简单,只需要引入jar包。不需要多带带部署服务端。确保所有任务在集群中不重复,不遗漏的执行。支持动态添加和删除任务。

GitHub地址

功能概述(包括优化的部分)

基于zookeeper+spring task的分布任务调度系统。

确保每个任务在集群中不同节点上不重复的执行。

单个任务节点故障时自动转移到其他任务节点继续执行。

任务节点启动时必须保证zookeeper可用,任务节点运行期zookeeper集群不可用时任务节点保持可用前状态运行,zookeeper集群恢复正常运行。

支持动态添加、修改和删除任务,支持任务暂停和重新启动。

添加ip黑名单,过滤不需要执行任务的节点。

后台管理和任务执行监控。

支持spring-boot,支持单个任务运行多个实例(使用扩展后缀)。

主要改动

删除了quartz的集成。Spring Task和quartz都是一整套的定时任务框架,没有必要强行将quartz集成进来,专注做Spring Task的分布式以及动态任务的封装。删除quartz后,组件更加轻便。且所有功能依旧保留。

对于Spring Boot的支持更加智能化。通过spring.factories的方式自动加载配置类UncodeScheduleAutoConfiguration。只需要引入jar包依赖,无须显示的添加配置类扫描。

参照Alibaba代码规范对代码进行了大量重构优化,更具有可读性。

删除了默认1s的心跳机制(主要作用:刷新server、重新分配任务、检查当前serve可执行的任务),采用watcher的方式,对server节点和task节点进行动态监听,进一步提升性能。

对于非动态添加的任务,也就是注解或配置文件配置的任务会在容器启动通过组件定义的方式启动。但是在删除此类任务时,没有真正的删除,taskWrapper任然会定时的执行。 解决了这个bug。

关于UncodeScheduleAutoConfiguration中SchedulerTaskManager的定义。将SchedulerTaskManager的Bean名称定义为taskScheduler,这样可以阻止Spring Task初始化名为taskScheduler的bean,以免重复加载。当然你也可以不这么做,因为SchedulerTaskManager继承了ThreadPoolTaskScheduler,我们动态添加的任务都是通过SchedulerTaskManager添加的。

说明:

单节点故障时需要业务保障数据完整性或幂等性。

Spring Task是Spring 3.0之后自主开发的定时任务工具。

Spring Task默认不是并行执行,需要添加一个名为taskScheduler的Bean,采用ThreadPoolTaskScheduler或其他线程池的Scheduler实现。Spring Task默认采用ThreadPoolTaskScheduler

所有的任务都是基于Spring Bean的方式。可以通过定义一个或多个任务模板(Bean 的方式),通过使用任务后缀可以动态的添加多个该模板的任务实例,你只需要传递不同的参数即可。

模块架构


代码实战 定义非动态的定时任务
@Component    
public class SimpleTask {

    private static int i = 0;
    
    @Scheduled(fixedDelay = 5000)
    public void print() {
        System.out.println("===========start!=========");
        System.out.println("I:"+i);i++;
        System.out.println("=========== end !=========");
    }
    
    @Scheduled(cron = "0/5 * * * * ?")
    public void print1() {
        System.out.println("===========start!=========");
        System.out.println("I:"+i);i++;
        System.out.println("=========== end !=========");
    }
    
    @Scheduled(fixedRate = 3000)
    public void print3() {
        System.out.println("===========start!=========");
        System.out.println("I:"+i);i++;
        System.out.println("=========== end !=========");
    }
}
    
定义动态的定时任务
// 定义任务实体
TaskDefine task = new TaskDefine();
task.setTargetBean(SchedulerTaskForward.BEAN_NAME);
task.setTargetMethod(SchedulerTaskForward.METHOD);
task.setExtKeySuffix(SUFFIX + model.getId());
task.setCronExpression(model.getCronExpression());
task.setParams(gson.toJson(model));
if (ConsoleManager.isExistsTask(task)) {
    // 更新
    ConsoleManager.updateScheduleTask(task);
} else {
    // 新增
    task.setStartTime(new Date());
    ConsoleManager.addScheduleTask(task);
}

SchedulerTaskForward是我预先定义好的任务模板,下面是代码片段

@SuppressWarnings("unchecked")
@Component("schedulerTaskForward")
public class SchedulerTaskForward {
    public static final String BEAN_NAME = "schedulerTaskForward";
    public static final String METHOD = "forward";
    private Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerTaskForward.class);

    @Autowired
    private EpmManagerTerminalFeign terminalFeign;

    @Autowired
    private AmqpClientService mqService;

    @Value("${collectd.batch}")
    protected int batch;

    public void forward(String json) {
        SchedulerTaskModel model = gson.fromJson(json, SchedulerTaskModel.class);
        if (ExecuteType.report.getCode().equals(Integer.valueOf(model.getExecuteType()))) {
            // 上报的在定时任务中不执行
            throw new SchedulerTaskExecuteException("任务: " + model.getTaskName() + ", 属于上报任务, 禁止在定时任务中执行");
        }
        executeCollect(TaskType.forCode(model.getTaskType()), model.getAfns(), model.getOrgId(), model.getTermType());
        LOGGER.debug("任务: " + model.getTaskName() + ", 执行完成");
    }
基于Spring Boot的配置

application.yml

uncode:
  schedule:
    zkConnect: ${spring.cloud.zookeeper.connectString}
    rootPath: /uncode/schedule
    zkSessionTimeout: 60000
    zkUsername: admin
    zkPassword: admin
    poolSize: 10
#    ipBlackList[0]: 127.0.0.2 #server黑名单可选
#    ipBlackList[1]: 127.0.0.3 #server黑名单可选

2 启动类

@SpringBootApplication
@EnableScheduling
// 这个也是可选的,如果你不需要默认的任务管理界面的话
// 强烈建议自己去实现这个任务管理功能
@ServletComponentScan("cn.uncode.schedule")
public class UncodeScheduleApplication {
    public static void main(String[] agrs){
        SpringApplication.run(UncodeScheduleApplication.class,agrs);
    }
}
基于Spring项目配置


    
        
            
            
            
            
            
            
            
        
    





    
使用API或后台添加任务(静态方法的方式)

1 动态添加任务

ConsoleManager.addScheduleTask(TaskDefine taskDefine);

2 动态删除任务

ConsoleManager.delScheduleTask(TaskDefine taskDefine);

3 动态更新任务

ConsoleManager.updateScheduleTask(TaskDefine taskDefine);

4 查询任务列表

ConsoleManager.queryScheduleTask();

使用API或后台添加任务(Spring Bean的方式)

通过获得我们定义的SchedulerTaskManager这个bean,依然可以动态的添加任务。这里就不展示了。

关于

作者:刘惠涛
转载请注明出处
2017-10-23

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67869.html

相关文章

  • 微服务架构中,二次浅封装实践

    摘要:三实践案例案例简介分布式系统中,微服务基础组件等,系统中间件,等,对常用功能配置等,进行二次浅封装并统一集成管理,以满足日常开发中基础环境搭建与临时工具的快速实现。 一、背景简介 分布式系统中存在很多拆分的服务,在不断迭代升级的过程中,会出现如下常见的棘手情况: 某个技术组件版本升级,依赖包升级导致部分语法或者API过期,或者组件修复紧急的问题,从而会导致分布式系统下各个服...

    Hujiawei 评论0 收藏0
  • 跨云迁移过程中的数据同步及一致性校验实践(二)

    摘要:另外对于需要尽量减少应用重启的系统也可以优先考虑这种方式来保障数据一致性。只需要保证这三类程序都是停止的,那么就可以保证没有同步服务以外的程序对数据进行修改,从而保障数据一致性。在《跨云迁移过程中的数据同步及一致性校验实践(一)》中我们主要介绍了跨云迁移中数据同步阶段的存储组件MySQL、文件存储和对象存储的数据迁移过程,本文将重点围绕跨云迁移的数据规整阶段(清理测试时产生的脏数据)和数据割...

    Tecode 评论0 收藏0
  • 用Quartz实现工作流

    摘要:也有,触发点和相关,和我们的需求关系不大,暂忽略。实现为每个算法任务创建一个,任务失败不能启动后续任务,所以在运行失败的情况下,需要把启动的删除掉。需要自己在中实现多个依赖是否完成的检查。后续主线程的任务就是检查工作流是否已经完成。 Quartz简介 作为一个优秀的开源调度框架,Quartz 具有以下特点:强大的调度功能,支持立即调度、定时调度、周期调度、并发调度; 灵活的应用方式,支...

    Apollo 评论0 收藏0
  • 基于通用jar、动态配置、组件编排的会员任务中心系统设计

    摘要:基于的动态配置推送。对于任务中心这种多任务平台型的配置,有一定影响。基于回调和配置的扩展点流程共建在建中通过扩展点共建方式,将流程编排的能力,暴露给内外部的开发者,完成任务中心的共建。 一、聊聊本文想说什么:   为更好帮助商家的会员快速成长,保持用户活性,完善用户的成长体系,有赞用户中心-会员成长团队基于现有的业务场景,设计了一套较完备任务中心系统。同时也有很多通用技术组件能够落地。...

    null1145 评论0 收藏0
  • 有赞业务对账平台的探索与实践

    摘要:业务对账平台的核心目的,就是及时发现类似问题,并及时修复。这对对账平台的吞吐量造成了挑战。五健康度对账中心可以拿到业务系统及其所在整个链路的数据一致性信息。在分布式环境下,没有人能回避数据一致性问题,我们对此充满着敬畏。 一、引子 根据CAP原理,分布式系统无法在保证了可用性(Availability)和分区容忍性(Partition)之后,继续保证一致性(Consistency)。我...

    wangjuntytl 评论0 收藏0

发表评论

0条评论

Mertens

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<