资讯专栏INFORMATION COLUMN

Quartz 2 定时任务(二):多线程并发执行与数据共享

OpenDigg / 2027人阅读

摘要:注意当使用注解时,为了避免并发时,存储数据造成混乱,强烈建议把注解也加上。示例假设定时任务的时间间隔为秒,但执行时间是秒。当设置以后程序会等任务执行完毕后再去执行,否则会在秒时再启动新的线程执行。

版权声明:本文由吴仙杰创作整理,转载请注明出处:https://segmentfault.com/a/1190000009128328

1. 禁止同一个 JobDetail 中的多个实例并发执行

Quartz 定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行,如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。

禁止并发执行的意思并不是不能同时执行多个 Job,而是不能并发执行同一个 Job Definition(由 JobDetail 定义),但是可以同时执行多个不同的 JobDetail,举例说明,我们有一个 Job 类,叫做 SayHelloJob,并在这个 Job 上加了 @DisallowConcurrentExecution 注解,然后在这个 Job上 定义了很多个 JobDetail,如 sayHelloToJoeJobDetail,sayHelloToMikeJobDetail,那么当 scheduler 启动时,不会并发执行多个 sayHelloToJoeJobDetail 或者 sayHelloToMikeJobDetail,但可以同时执行 sayHelloToJoeJobDetail 跟 sayHelloToMikeJobDetail。

1.1 使用 Spring 整合 Quartz 时

Spring 配置文件中加入:


1.2 Quartz 原生使用时

当不使用 Spring 的时候就需要在 Job 的实现类上加 @DisallowConcurrentExecution 的注解。

2. 同一个 JobDetail 中多个实例的数据共享

@PersistJobDataAfterExecution 是用在 Job 实现类上,表示一个有状态的任务,意思是当正常执行完 Job 后,JobDataMap 中的数据应该被改动,以被下一次调用时用。

注意:当使用 @PersistJobDataAfterExecution 注解时,为了避免并发时,存储数据造成混乱,强烈建议@DisallowConcurrentExecution 注解也加上。

3. 示例

假设定时任务的时间间隔为 3 秒,但 job 执行时间是 10 秒。当设置 @DisallowConcurrentExecution 以后程序会等任务执行完毕后再去执行,否则会在 3 秒时再启动新的线程执行。

当设置 @PersistJobDataAfterExecution 时,在执行完 Job 的 execution 方法后保存 JobDataMap 当中固定数据,以便任务在重复执行的时候具有相同的 JobDataMap;在默认情况下也就是没有设置 @PersistJobDataAfterExecution 的时候每个 job 都拥有独立 JobDataMap。

任务类:

package org.quartz.examples;

import org.quartz.*;

import java.util.Date;

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class TaskJob implements Job {

    public static final String NUM_EXECUTIONS = "NumExecutions";
    public static final String EXECUTION_DELAY = "ExecutionDelay";

    /**
     * 静态变量可以保持工作状态,但无法达到预期效果
     */
    private static int _staticCounter = 0;

    /**
     * Quartz 每次执行作业时都会重新实例化,非静态变量无法保持工作状态
     */
    private int _counter = 0;

    /**
     * 需要一个公共的空构造方法,以便 scheduler 随时实例化 job
     */
    public TaskJob() {
    }

    /**
     * 该方法实现需要执行的任务
     */
    public void execute(JobExecutionContext context) throws JobExecutionException {
        System.err.println("---> " + context.getJobDetail().getKey() + " 运行中[" + new Date() + "]");

        JobDataMap map = context.getJobDetail().getJobDataMap();

        int executeCount = 0;
        if (map.containsKey(NUM_EXECUTIONS)) {
            executeCount = map.getInt(NUM_EXECUTIONS);
        }

        // 增量计数并将其存储回 JobDataMap,这样可以适当保持工作状态
        executeCount++;
        map.put(NUM_EXECUTIONS, executeCount);

        // 只要有任务执行都会递增,无法达到预期效果
        _staticCounter++;

        // 本地变量递增加,但实际上无法保持工作状态
        _counter++;

        long delay = 5000L;
        if (map.containsKey(EXECUTION_DELAY)) {
            delay = map.getLong(EXECUTION_DELAY);
        }

        try {
            // 模拟一个耗时的 job
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.err.println(context.getJobDetail().getKey() + " 的静态变量 _staticCounter 为:" + _staticCounter + ",非静态变量 scheduler 为:" + _counter);
        System.err.println(context.getJobDetail().getKey() + " 完成了(" + executeCount + ")次 <---");
    }
}

任务调度类:

package org.quartz.examples;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import java.util.Date;

public class Executer {

    public void run() throws Exception {
        // 通过 schedulerFactory 获取一个调度器
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler sched = sf.getScheduler();

        // 创建 jobDetail 实例,绑定 Job 实现类
        // 指明 job 的名称,所在组的名称,以及绑定 job 类
        JobDetail job1 = JobBuilder.newJob(TaskJob.class)
                .withIdentity("statefulJob1", "group1")
                // 给定的键-值对添加到 JobDetail 的 JobDataMap 中
                .usingJobData(TaskJob.EXECUTION_DELAY, 10000L).build();

        // 定义调度触发规则,先立即执行一次,然后每隔 3 秒执行一次
        SimpleTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("trigger1", "group1")
                .startNow()
                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
                        .withIntervalInSeconds(3)
                        .repeatForever())
                .build();

        // 把作业和触发器注册到任务调度中
        Date firstRunTime = sched.scheduleJob(job1, trigger);
        System.out.println(job1.getKey() + " 开始运行于:" + firstRunTime + ",重复:" + trigger.getRepeatCount() + " 次,每次间隔 "
                + trigger.getRepeatInterval() / 1000 + " 秒");

        // 任务 job1 方法中拿到的 JobDataMap 的数据是共享的
        // 这里要注意一个情况: 就是 JobDataMap 的数据共享只针对一个 job1 任务
        // 如果在下面在新增加一个任务 那么他们之间是不共享的,比如下面的 job2
        // 创建第二个 JobDetail 实例
        JobDetail job2 = JobBuilder.newJob(TaskJob.class)
                .withIdentity("statefulJob2", "group1")
                // 给定的键-值对添加到 JobDetail 的 JobDataMap 中
                .usingJobData(TaskJob.EXECUTION_DELAY, 10000L)
                .build();

        // 定义调度触发规则,先立即执行一次,然后每隔 3 秒执行一次
        trigger = TriggerBuilder.newTrigger()
                .withIdentity("trigger2", "group1")
                .startNow()
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().
                        withIntervalInSeconds(3)
                        .repeatForever()
                        // 指定失效时的策略
                        .withMisfireHandlingInstructionNowWithExistingCount())
                .build();

        // 这个 job2 与 job1 执行的 JobDataMap 不共享
        // 把作业和触发器注册到任务调度中
        firstRunTime = sched.scheduleJob(job2, trigger);
        System.out.println(job2.getKey() + " 开始运行于:" + firstRunTime + ",重复:" + trigger.getRepeatCount() + " 次,每次间隔 "
                + trigger.getRepeatInterval() / 1000 + " 秒");

        // 启动计划程序(实际上直到调度器已经启动才会开始运行)
        sched.start();

        // 等待 60 秒,使我们的 job 有机会执行
        Thread.sleep(60000);

        // 等待作业执行完成时才关闭调度器
        sched.shutdown(true);

        SchedulerMetaData metaData = sched.getMetaData();
        System.out.println("一共运行了:" + metaData.getNumberOfJobsExecuted() + " 个任务");
    }

    public static void main(String[] args) throws Exception {
        Executer example = new Executer();
        example.run();
    }
}

运行结果:

group1.statefulJob1 开始运行于:Wed Apr 19 17:04:22 CST 2017,重复:-1 次,每次间隔 3 秒
group1.statefulJob2 开始运行于:Wed Apr 19 17:04:22 CST 2017,重复:-1 次,每次间隔 3 秒

---> group1.statefulJob2 运行中[Wed Apr 19 17:04:22 CST 2017]
---> group1.statefulJob1 运行中[Wed Apr 19 17:04:22 CST 2017]
group1.statefulJob2 的静态变量 _staticCounter 为:2,非静态变量 scheduler 为:1
group1.statefulJob1 的静态变量 _staticCounter 为:2,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(1)次 <---
group1.statefulJob1 完成了(1)次 <---
---> group1.statefulJob1 运行中[Wed Apr 19 17:04:32 CST 2017]
---> group1.statefulJob2 运行中[Wed Apr 19 17:04:32 CST 2017]
group1.statefulJob1 的静态变量 _staticCounter 为:4,非静态变量 scheduler 为:1
group1.statefulJob1 完成了(2)次 <---
group1.statefulJob2 的静态变量 _staticCounter 为:4,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(2)次 <---
---> group1.statefulJob1 运行中[Wed Apr 19 17:04:42 CST 2017]
---> group1.statefulJob2 运行中[Wed Apr 19 17:04:42 CST 2017]
group1.statefulJob2 的静态变量 _staticCounter 为:6,非静态变量 scheduler 为:1
group1.statefulJob1 的静态变量 _staticCounter 为:6,非静态变量 scheduler 为:1
group1.statefulJob1 完成了(3)次 <---
group1.statefulJob2 完成了(3)次 <---
---> group1.statefulJob1 运行中[Wed Apr 19 17:04:52 CST 2017]
---> group1.statefulJob2 运行中[Wed Apr 19 17:04:52 CST 2017]
group1.statefulJob2 的静态变量 _staticCounter 为:8,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(4)次 <---
group1.statefulJob1 的静态变量 _staticCounter 为:8,非静态变量 scheduler 为:1
group1.statefulJob1 完成了(4)次 <---
---> group1.statefulJob2 运行中[Wed Apr 19 17:05:02 CST 2017]
---> group1.statefulJob1 运行中[Wed Apr 19 17:05:02 CST 2017]
group1.statefulJob2 的静态变量 _staticCounter 为:10,非静态变量 scheduler 为:1
group1.statefulJob1 的静态变量 _staticCounter 为:10,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(5)次 <---
group1.statefulJob1 完成了(5)次 <---
---> group1.statefulJob1 运行中[Wed Apr 19 17:05:12 CST 2017]
---> group1.statefulJob2 运行中[Wed Apr 19 17:05:12 CST 2017]

group1.statefulJob2 的静态变量 _staticCounter 为:12,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(6)次 <---
group1.statefulJob1 的静态变量 _staticCounter 为:12,非静态变量 scheduler 为:1
group1.statefulJob1 完成了(6)次 <---

一共运行了:12 个任务
4. 参考

Quartz 线程处理

Quartz API

PS:本文针对的 Quartz 版本为 Quartz 2.2.3。官方下载地址:Quartz 2.2.3 .tar.gz

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67057.html

相关文章

  • Quartz 2 定时任务(三):异常中断处理

    摘要:注意为了共享在同一个中的,我们需要在上面这个实现类上加入和注解,详见定时任务二多线程并发执行与数据共享。捕获异常,取消所有触发器在我们捕获异常时,可以调用取消所有与这个作业有关的触发器。 版权声明:本文由吴仙杰创作整理,转载请注明出处:https://segmentfault.com/a/1190000009141079 1. 作业异常 org.quartz.JobExecut...

    Hydrogen 评论0 收藏0
  • Quartz 2 定时任务(一):基本使用指南

    摘要:调度器就相当于一个容器,装载着任务和触发器。用于指定额外的值。然而,如果指定并且第一号是星期六,那么触发器的触发在第三号周一,因为它不会过一个月的日子的边界。注意如果只是指定,则触发器在月份中不会触发。 版权声明:本文由吴仙杰创作整理,转载请注明出处:https://segmentfault.com/a/1190000009128277 1. Quartz 体系结构 Quartz 设计...

    Freelander 评论0 收藏0
  • SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)

    摘要:也是自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。 原创不易,如需转载,请注明出处https://www.cnblogs.com/baixianlong/p/10659045.html,否则将追究法律责任!!! 一、在JAVA开发领域,目前可以通过以下几种方式进行定时任务 1、单机部署模式 Timer:jdk中...

    BWrong 评论0 收藏0
  • Quartz实现工作流

    摘要:也有,触发点和相关,和我们的需求关系不大,暂忽略。实现为每个算法任务创建一个,任务失败不能启动后续任务,所以在运行失败的情况下,需要把启动的删除掉。需要自己在中实现多个依赖是否完成的检查。后续主线程的任务就是检查工作流是否已经完成。 Quartz简介 作为一个优秀的开源调度框架,Quartz 具有以下特点:强大的调度功能,支持立即调度、定时调度、周期调度、并发调度; 灵活的应用方式,支...

    Apollo 评论0 收藏0
  • 微服务架构中,次浅封装实践

    摘要:三实践案例案例简介分布式系统中,微服务基础组件等,系统中间件,等,对常用功能配置等,进行二次浅封装并统一集成管理,以满足日常开发中基础环境搭建与临时工具的快速实现。 一、背景简介 分布式系统中存在很多拆分的服务,在不断迭代升级的过程中,会出现如下常见的棘手情况: 某个技术组件版本升级,依赖包升级导致部分语法或者API过期,或者组件修复紧急的问题,从而会导致分布式系统下各个服...

    Hujiawei 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<