资讯专栏INFORMATION COLUMN

spring-boot-starter-quartz集群实践

kycool / 3280人阅读

摘要:前情提要由于项目需要,需要一个定时任务集群,故此有了这个集群的实践。数据库表前缀在被认为失火之前,调度程序将容忍一个将其下一个启动时间通过的毫秒数。设置此实例检入与群集的其他实例的频率以毫秒为单位。影响检测失败实例的速度。

前情提要】由于项目需要,需要一个定时任务集群,故此有了这个spring-boot-starter-quartz集群的实践。springboot的版本为:2.1.6.RELEASE;quartz的版本为:2.3.1.假如这里一共有两个定时任务的节点,它们的代码完全一样。

壹.jar包依赖

</>复制代码

  1. 1.8
  2. org.springframework.boot
  3. spring-boot-starter
  4. org.springframework.boot
  5. spring-boot-starter-quartz
  6. mysql
  7. mysql-connector-java
  8. runtime
  9. org.springframework.boot
  10. spring-boot-starter-jdbc
  11. org.projectlombok
  12. lombok
  13. true
  14. org.springframework.boot
  15. spring-boot-starter-test
  16. test

这里选择将定时任务的数据入库,避免数据直接存在内存中,因应用重启造成的数据丢失和做集群控制。

贰、项目配置

</>复制代码

  1. spring:
  2. server:
  3. port: 8080
  4. servlet:
  5. context-path: /lovin
  6. datasource:
  7. url: jdbc:mysql://127.0.0.1:3306/training?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
  8. username: root
  9. password: root
  10. driver-class-name: com.mysql.cj.jdbc.Driver
  11. quartz:
  12. job-store-type: jdbc #数据库方式
  13. jdbc:
  14. initialize-schema: never #不初始化表结构
  15. properties:
  16. org:
  17. quartz:
  18. scheduler:
  19. instanceId: AUTO #默认主机名和时间戳生成实例ID,可以是任何字符串,但对于所有调度程序来说,必须是唯一的 对应qrtz_scheduler_state INSTANCE_NAME字段
  20. #instanceName: clusteredScheduler #quartzScheduler
  21. jobStore:
  22. class: org.quartz.impl.jdbcjobstore.JobStoreTX #持久化配置
  23. driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #我们仅为数据库制作了特定于数据库的代理
  24. useProperties: false #以指示JDBCJobStore将JobDataMaps中的所有值都作为字符串,因此可以作为名称 - 值对存储而不是在BLOB列中以其序列化形式存储更多复杂的对象。从长远来看,这是更安全的,因为您避免了将非String类序列化为BLOB的类版本问题。
  25. tablePrefix: qrtz_ #数据库表前缀
  26. misfireThreshold: 60000 #在被认为“失火”之前,调度程序将“容忍”一个Triggers将其下一个启动时间通过的毫秒数。默认值(如果您在配置中未输入此属性)为60000(60秒)。
  27. clusterCheckinInterval: 5000 #设置此实例“检入”*与群集的其他实例的频率(以毫秒为单位)。影响检测失败实例的速度。
  28. isClustered: true #打开群集功能
  29. threadPool: #连接池
  30. class: org.quartz.simpl.SimpleThreadPool
  31. threadCount: 10
  32. threadPriority: 5
  33. threadsInheritContextClassLoaderOfInitializingThread: true

这里需要注意的是两个节点的端口号应该不一致,避免冲突

叁、实现一个Job

</>复制代码

  1. @Slf4j
  2. public class Job extends QuartzJobBean {
  3. @Override
  4. protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
  5. // 获取参数
  6. JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
  7. // 业务逻辑 ...
  8. log.info("------springbootquartzonejob执行"+jobDataMap.get("name").toString()+"###############"+jobExecutionContext.getTrigger());
  9. }

其中的日志输出是为了便于观察任务执行情况

肆、封装定时任务操作

</>复制代码

  1. @Service
  2. public class QuartzService {
  3. @Autowired
  4. private Scheduler scheduler;
  5. @PostConstruct
  6. public void startScheduler() {
  7. try {
  8. scheduler.start();
  9. } catch (SchedulerException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. /**
  14. * 增加一个job
  15. *
  16. * @param jobClass
  17. * 任务实现类
  18. * @param jobName
  19. * 任务名称
  20. * @param jobGroupName
  21. * 任务组名
  22. * @param jobTime
  23. * 时间表达式 (这是每隔多少秒为一次任务)
  24. * @param jobTimes
  25. * 运行的次数 (<0:表示不限次数)
  26. * @param jobData
  27. * 参数
  28. */
  29. public void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime,
  30. int jobTimes, Map jobData) {
  31. try {
  32. // 任务名称和组构成任务key
  33. JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
  34. .build();
  35. // 设置job参数
  36. if(jobData!= null && jobData.size()>0){
  37. jobDetail.getJobDataMap().putAll(jobData);
  38. }
  39. // 使用simpleTrigger规则
  40. Trigger trigger = null;
  41. if (jobTimes < 0) {
  42. trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
  43. .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
  44. .startNow().build();
  45. } else {
  46. trigger = TriggerBuilder
  47. .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
  48. .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
  49. .startNow().build();
  50. }
  51. scheduler.scheduleJob(jobDetail, trigger);
  52. } catch (SchedulerException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. /**
  57. * 增加一个job
  58. *
  59. * @param jobClass
  60. * 任务实现类
  61. * @param jobName
  62. * 任务名称(建议唯一)
  63. * @param jobGroupName
  64. * 任务组名
  65. * @param jobTime
  66. * 时间表达式 (如:0/5 * * * * ? )
  67. * @param jobData
  68. * 参数
  69. */
  70. public void addJob(Class jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) {
  71. try {
  72. // 创建jobDetail实例,绑定Job实现类
  73. // 指明job的名称,所在组的名称,以及绑定job类
  74. // 任务名称和组构成任务key
  75. JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
  76. .build();
  77. // 设置job参数
  78. if(jobData!= null && jobData.size()>0){
  79. jobDetail.getJobDataMap().putAll(jobData);
  80. }
  81. // 定义调度触发规则
  82. // 使用cornTrigger规则
  83. // 触发器key
  84. Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
  85. .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
  86. .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
  87. // 把作业和触发器注册到任务调度中
  88. scheduler.scheduleJob(jobDetail, trigger);
  89. } catch (Exception e) {
  90. e.printStackTrace();
  91. }
  92. }
  93. /**
  94. * 修改 一个job的 时间表达式
  95. *
  96. * @param jobName
  97. * @param jobGroupName
  98. * @param jobTime
  99. */
  100. public void updateJob(String jobName, String jobGroupName, String jobTime) {
  101. try {
  102. TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
  103. CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
  104. trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
  105. .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
  106. // 重启触发器
  107. scheduler.rescheduleJob(triggerKey, trigger);
  108. } catch (SchedulerException e) {
  109. e.printStackTrace();
  110. }
  111. }
  112. /**
  113. * 删除任务一个job
  114. *
  115. * @param jobName
  116. * 任务名称
  117. * @param jobGroupName
  118. * 任务组名
  119. */
  120. public void deleteJob(String jobName, String jobGroupName) {
  121. try {
  122. scheduler.deleteJob(new JobKey(jobName, jobGroupName));
  123. } catch (Exception e) {
  124. e.printStackTrace();
  125. }
  126. }
  127. /**
  128. * 暂停一个job
  129. *
  130. * @param jobName
  131. * @param jobGroupName
  132. */
  133. public void pauseJob(String jobName, String jobGroupName) {
  134. try {
  135. JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
  136. scheduler.pauseJob(jobKey);
  137. } catch (SchedulerException e) {
  138. e.printStackTrace();
  139. }
  140. }
  141. /**
  142. * 恢复一个job
  143. *
  144. * @param jobName
  145. * @param jobGroupName
  146. */
  147. public void resumeJob(String jobName, String jobGroupName) {
  148. try {
  149. JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
  150. scheduler.resumeJob(jobKey);
  151. } catch (SchedulerException e) {
  152. e.printStackTrace();
  153. }
  154. }
  155. /**
  156. * 立即执行一个job
  157. *
  158. * @param jobName
  159. * @param jobGroupName
  160. */
  161. public void runAJobNow(String jobName, String jobGroupName) {
  162. try {
  163. JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
  164. scheduler.triggerJob(jobKey);
  165. } catch (SchedulerException e) {
  166. e.printStackTrace();
  167. }
  168. }
  169. /**
  170. * 获取所有计划中的任务列表
  171. *
  172. * @return
  173. */
  174. public List> queryAllJob() {
  175. List> jobList = null;
  176. try {
  177. GroupMatcher matcher = GroupMatcher.anyJobGroup();
  178. Set jobKeys = scheduler.getJobKeys(matcher);
  179. jobList = new ArrayList>();
  180. for (JobKey jobKey : jobKeys) {
  181. List triggers = scheduler.getTriggersOfJob(jobKey);
  182. for (Trigger trigger : triggers) {
  183. Map map = new HashMap<>();
  184. map.put("jobName", jobKey.getName());
  185. map.put("jobGroupName", jobKey.getGroup());
  186. map.put("description", "触发器:" + trigger.getKey());
  187. Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
  188. map.put("jobStatus", triggerState.name());
  189. if (trigger instanceof CronTrigger) {
  190. CronTrigger cronTrigger = (CronTrigger) trigger;
  191. String cronExpression = cronTrigger.getCronExpression();
  192. map.put("jobTime", cronExpression);
  193. }
  194. jobList.add(map);
  195. }
  196. }
  197. } catch (SchedulerException e) {
  198. e.printStackTrace();
  199. }
  200. return jobList;
  201. }
  202. /**
  203. * 获取所有正在运行的job
  204. *
  205. * @return
  206. */
  207. public List> queryRunJob() {
  208. List> jobList = null;
  209. try {
  210. List executingJobs = scheduler.getCurrentlyExecutingJobs();
  211. jobList = new ArrayList>(executingJobs.size());
  212. for (JobExecutionContext executingJob : executingJobs) {
  213. Map map = new HashMap();
  214. JobDetail jobDetail = executingJob.getJobDetail();
  215. JobKey jobKey = jobDetail.getKey();
  216. Trigger trigger = executingJob.getTrigger();
  217. map.put("jobName", jobKey.getName());
  218. map.put("jobGroupName", jobKey.getGroup());
  219. map.put("description", "触发器:" + trigger.getKey());
  220. Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
  221. map.put("jobStatus", triggerState.name());
  222. if (trigger instanceof CronTrigger) {
  223. CronTrigger cronTrigger = (CronTrigger) trigger;
  224. String cronExpression = cronTrigger.getCronExpression();
  225. map.put("jobTime", cronExpression);
  226. }
  227. jobList.add(map);
  228. }
  229. } catch (SchedulerException e) {
  230. e.printStackTrace();
  231. }
  232. return jobList;
  233. }
陆、初始化任务

这里不准备给用户用web界面来配置定时任务,故此采用CommandLineRunner来子啊应用初始化的时候来初始化任务。只需要实现CommandLineRunner的run()方法即可。

</>复制代码

  1. @Override
  2. public void run(String... args) throws Exception {
  3. HashMap map = new HashMap<>();
  4. map.put("name",1);
  5. quartzService.deleteJob("job", "test");
  6. quartzService.addJob(Job.class, "job", "test", "0 * * * * ?", map);
  7. map.put("name",2);
  8. quartzService.deleteJob("job2", "test");
  9. quartzService.addJob(Job.class, "job2", "test", "10 * * * * ?", map);
  10. map.put("name",3);
  11. quartzService.deleteJob("job3", "test2");
  12. quartzService.addJob(Job.class, "job3", "test2", "15 * * * * ?", map);
  13. }
柒、测试验证

分别夏侯启动两个应用,然后观察任务执行,以及在运行过程中杀死某个服务,来观察定时任务的执行。

写在后面的话】下面给出的是所需要脚本的连接地址:脚本下载地址,另外这边又一个自己实现的demo

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76058.html

相关文章

  • Spring Boot 参考指南(Quartz Scheduler)

    摘要:下列类型的将自动被拾取并与关联定义一个特定的工作,实例可以使用构建。定义何时触发特定工作。可以使用配置属性定制配置和,它允许编程进行定制化。特别是,不与相关联,因为提供了一种通过配置的方法,如果需要自定义任务执行程序,请考虑实现。 39. Quartz Scheduler Spring Boot为使用Quartz Scheduler提供了一些方便,引入spring-boot-start...

    Baoyuan 评论0 收藏0
  • Docker Swarm的前世今生

    摘要:当然此时的局限性较大,比如没有副本和负载均衡的概念,这导致服务无法高可用当然也更不存在什么服务网络管理和跨节点数据存储这些东西没有服务模型集群中服务间关系和启动顺序编排也很复杂于是就有了下面的的诞生。 showImg(https://segmentfault.com/img/remote/1460000015317037?w=1885&h=1153); 概述 在我的《Docker S...

    lemon 评论0 收藏0
  • 京东云Kubernetes集群最佳实践

    摘要:京东云集群最佳实践容器是的基石,它们之间的关系不言而喻。因此我们今天的文章将会和大家分享关于京东云集群的部分最佳实践。京东云集群采用管理节点全托管的方式,为用户提供简单易用高可靠功能强大的容器管理服务。 京东云Kubernetes集群最佳实践 容器是Cloud Native的基石,它们之间的关系不言而喻。了解容器对于学习Cloud Native也是十分重要的。近期,京东云Cloud N...

    XGBCCC 评论0 收藏0
  • 京东云Kubernetes集群最佳实践

    摘要:京东云集群最佳实践容器是的基石,它们之间的关系不言而喻。因此我们今天的文章将会和大家分享关于京东云集群的部分最佳实践。京东云集群采用管理节点全托管的方式,为用户提供简单易用高可靠功能强大的容器管理服务。 京东云Kubernetes集群最佳实践 容器是Cloud Native的基石,它们之间的关系不言而喻。了解容器对于学习Cloud Native也是十分重要的。近期,京东云Cloud N...

    刘永祥 评论0 收藏0

发表评论

0条评论

kycool

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<