资讯专栏INFORMATION COLUMN

基于redis的分布式任务管理

shiguibiao / 2861人阅读

摘要:虽然服务器时间同步实现起来并不困难,但针对贵公司云主机环境近年来的表现,只能说不信任,只能做保底方案。

业务背景

系统中有各种定时任务,需要满足以下要求:

定时任务需要能够动态增删改查

需要能够设置任务的有效时间范围(只在此范围内执行)

任务执行周期需要能够灵活配置

需要能够轻松接入各种任务实现

需要能够灵活配置任务实现的运行参数

系统为分布式集群,需要保证同一时刻同一任务只能被一个节点执行,同时增删改动作需要同步到每个节点

定时任务实现方式 1. Timer

Java API中的Timer提供了多种定时实现,对照需求:

[√] 可以通过动态创建、取消实现

[x] Timer本身不支持

[x] 目前Timer实现的执行周期还很简单

[√] 需要实现不同Job

[√] 需要在创建Job时指定

[x] Timer目前不支持

2. ScheduledExecutorService

ScheduledExecutorService与Timer类似,对照需求:

[√] 可以通过动态创建、取消实现

[x] ScheduledExecutorService本身不支持

[x] 目前ScheduledExecutorService实现的执行周期同样很简单

[√] 需要实现不同Job

[√] 需要在创建Job时指定

[x] ScheduledExecutorService目前同样不支持

3. Spring Schedule

spring提供了开箱即用的轻量版定时任务schedule,以注解的形式使用,支持cron表达式,用起来可谓十分方便顺手
但针对这里的需求:

[x] 由于以注解的形式使用,本身并不支持动态增删改

[x] 目前不支持

[√] spring schedule提供了cron、fixedRate两种方式,基本满足日常需求

[√] 需要实现不同job

[?] 由于不能动态增删改,job均是事先编码的,不直接支持(可以通过其他方式实现)

[x] 目前不支持

4. Quartz

Quartz的强大已经深得人心,使用Quartz实现这里的需求也是绰绰有余,针对第6条分布式集群Quartz也提出了解决方案

[√] 支持

[√] 支持

[√] 支持

[√] 支持

[√] 支持

[√] 支持

但在Quartz官方文档中出现了以下两段提示

Never run clustering on separate machines, unless their clocks are synchronized using some form of time-sync service (daemon) that runs very regularly (the clocks must be within a second of each other). See http://www.boulder.nist.gov/t... if you are unfamiliar with how to do this.
Never start (scheduler.start()) a non-clustered instance against the same set of database tables that any other instance is running (start()ed) against. You may get serious data corruption, and will definitely experience erratic behavior.

其中需要关注的一点是,各节点机器之间服务器时间误差不能超过1秒。虽然服务器时间同步实现起来并不困难,但针对贵公司云主机环境近1年来的表现,只能说不信任,Quartz只能做保底方案。

5. jesque

jesque是resque的java版实现,jesque的定位主要是延迟任务及简单的定时任务,不支持cron表达式,对照需求:

[√] 支持

[x] jesque本身不支持

[x] 延迟任务及简单的定时任务,不支持cron表达式

[√] 需要实现不同Job

[√] 可以配置Job类的构造函数参数及属性参数值

[√] 基于redis实现


综上

需求编号 Timer ScheduledExecutorService Spring Schedule Quartz jesque
1 x
2 x x x x
3 x x x
4
5 ?
6 x x x

尽管Quartz秒杀当前需求,但鉴于贵公司云主机环境的表现,同时考虑到没有SQL数据库环境,故重新撸一套,奉上源码 https://github.com/manerfan/m...

基于Redis实现分布式任务管理 实现思路

基本思路为,将任务数据存放到redis,定时获取redis中的任务数据,利用反射创建任务实例并执行。这里分别用到了redis中的有序集合及哈希表

创建/更新任务流程:

以任务执行时间为score,任务ID为member存入有序集合(ZADD)

以任务ID为field,任务实体为value存入哈希表(HSET)

获取/执行任务流程:

从有序集合中获取score小于当前时间的任务ID(ZRANGEBYSCORE)

根据任务ID从哈希表中取出任务实体(HMGET)

使用反射创建任务实例并执行

编码实现 任务实体Job
@JsonInclude(JsonInclude.Include.NON_EMPTY)
class JobEntity(
        var uuid: String = UUID.randomUUID().toString(),
        var name: String? = null,

        var className: String,
        var args: Array? = null,
        @JsonInclude(JsonInclude.Include.NON_EMPTY) 
        var vars: Map? = null,

        @JsonDeserialize(using = DateTimeDeserializer::class)
        @JsonSerialize(using = DateTimeSerializer::class)
        var startedAt: DateTime? = null,

        @JsonDeserialize(using = DateTimeDeserializer::class)
        @JsonSerialize(using = DateTimeSerializer::class)
        var endedAt: DateTime? = null,

        cron: String? = null, //  优先使用此参数
        var fixedRate: Long? = null
) {
    /**
     * cron表达式解析器
     */
    @JsonIgnore
    private var cronGenerator: CronSequenceGenerator? = cron?.let { CronSequenceGenerator(it, TimeZone.getTimeZone("Asia/Shanghai")) }

    /**
     * cron表达式
     */
    var cron = cron
        set(cron) {
            field = cron
            cronGenerator = cron?.let { CronSequenceGenerator(it, TimeZone.getTimeZone("Asia/Shanghai")) }
        }

    /**
     * 记录下次需要执行的时间
     */
    var nextScheduledAt: Long = -1 private set

    /**
     * 计算并更新下次执行时间
     * 若指定endedAt且下次执行时间晚于endedAt,则说明任务已结束,并返回false
     *
     * @return 是否需要更新 | 是否已失效
     */
    fun updateNextScheduledAt(timestamp: Long = System.currentTimeMillis()): Boolean {
        val limit = startedAt?.let { max(it.millis, timestamp) } ?: timestamp

        nextScheduledAt = when {
            null != cronGenerator -> cronGenerator!!.next(Date(limit)).time
            null != fixedRate -> limit + fixedRate!!
            else -> nextScheduledAt
        }

        return endedAt?.let { it.millis > nextScheduledAt } ?: true
    }
}

其中

className 真正任务实例的类路径
args 对应className类构造函数参数
vars 对应className类中的属性-值

startedAt 任务的开始时间,不指定则立即开始
endedAt 任务的结束时间,不指定则永久执行

cron 任务执行cron表达式
fixedRate 以该速率(毫秒)循环执行(若指定了cron,则该参数失效)

updateNextScheduledAt函数用于计算任务下次执行时间,若下次执行时间晚于endedAt则说明任务已结束

任务管理
/**
 * 添加任务Job
 *
 * 计算并更新job.nextScheduledAt
 * 若指定endedAt且nextScheduledAt晚于endedAt,则说明任务已结束,直接返回
 * 反之,将更新后的job存入redis
 *
 * @param job 任务
 *
 * @return job
 */
fun add(job: JobEntity): JobEntity {
    if (!job.updateNextScheduledAt(now)) {
        logger.warn("Job is Arrived! {}", job.toString())
        // Update to DB
        return job
    }

    val connection = jobTemplate.connectionFactory.connection
    try {
        connection.multi()
        connection.hSet(hKey, job.uuid.toByteArray(), objectMapper.writeValueAsBytes(job))
        connection.zAdd(zKey, job.nextScheduledAt.toDouble(), job.uuid.toByteArray())
        connection.exec()
    } finally {
        connection.close()
    }

    return job
}

/**
 * 更新任务
 *
 * 1. 删除任务
 * 2. 计算并更新job.nextScheduledAt
 * 若指定endedAt且nextScheduledAt晚于endedAt,则说明任务已结束,直接返回
 * 反之,将更新后的job存入redis
 *
 * @param job 任务
 *
 * @return job
 */
fun update(job: JobEntity): JobEntity {
    delete(job.uuid)
    return add(job)
}

/**
 * 删除任务
 *
 * 1. 从Hash中删除
 * 2. 从SortedSet中删除
 *
 * @param uuid 任务uuid
 */
fun delete(uuid: String) {
    val connection = jobTemplate.connectionFactory.connection
    try {
        connection.multi()
        connection.hDel(hKey, uuid.toByteArray())
        connection.zRem(zKey, uuid.toByteArray())
        connection.exec()
    } finally {
        connection.close()
    }
}

其中

add 计算任务下次执行时间,若晚于结束时间则直接放回,反之更新到redis中
update 计算任务下次执行时间,若晚于结束时间则从redis中删除,反之更新到redis中
delete 将任务从redis中删除

任务执行
/**
 * 任务Job执行器
 *
 * 每隔1秒运行一次
 * 1. 从SortedSet中将score在(0,now)之间的uuid取出
 * 2. 从Hash中将uuid对应的job取出
 * 3. 解析job,计算job的nextScheduledAt,并将job回写到redis中
 * 4. 执行job
 */
@Scheduled(fixedRate = 1000) // 不使用cron是为了使集群中各节点执行时间随机分散开
fun schedule() {

    /**
     * SortedSet(有序集合)中,member为job.uuid,score为job.nextScheduledAt
     * 将score在 (0, now) 之间的uuid取出
     * 其对应的即是现在需要执行的job
     */

    var connection = jobTemplate.connectionFactory.connection
    var keys: Set?
    try {
        val now = System.currentTimeMillis().toDouble()
        connection.multi()
        connection.zRangeByScore(zKey, 0.0, now) // 将score在(0,now)之间的uuid取出
        connection.zRemRangeByScore(zKey, 0.0, now) // 同时从redis中删除
        keys = connection.exec()[0] as? Set
    } finally {
        connection.close()
    }

    if (ObjectUtils.isEmpty(keys)) {
        return
    }

    /**
     * Hash(哈希表)中,field为job.uuid,value为job
     * 通过uuid将对应job取出
     */

    connection = jobTemplate.connectionFactory.connection
    var values: List?
    try {
        connection.multi()
        connection.hMGet(hKey, *keys!!.toTypedArray()) // 将uuid对应的job取出
        connection.hDel(hKey, *keys.toTypedArray()) // 同时从redis中删除
        values = connection.exec()[0] as? List
    } finally {
        connection.close()
    }

    if (ObjectUtils.isEmpty(values)) {
        return
    }

    // 解析jobs并回写到redis中
    val jobs = values!!.map {
        try {
            // 计算job的nextScheduledAt,并将其回写到redis中
            add(objectMapper.readValue(it, JobEntity::class.java))
        } catch (e: Exception) {
            logger.warn("JSON Parse Error {} {}", it.toString(), e.message)
            null
        }
    }

    // 执行jobs
    jobs.filterNotNull().forEach {
        var job = ReflectionUtils.createObject(Class.forName(it.className), it.args, it.vars)
        when (job) {
            is Runnable -> executorService.submit(job)
            else -> logger.warn("Job Must Implement Runnable {}", job)
        }
    }
}

这里使用spring schedule,每1秒执行一次

首先,从有序集合中获取score小于当前时间的任务ID,并删除
其次,根据任务ID从哈希表中取出任务实体Job,并删除
之后,利用反射,根据Job中的className args vargs创建任务实例,并放入线程池执行任务
最后,计算更新任务Job下次执行时间,若任务未过期,则将其更新到redis中,等待下次执行

示例代码见 https://github.com/manerfan/m...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/36483.html

相关文章

  • 大数据开发平台(Data Platform)在有赞最佳实践

    摘要:任务调度设计大数据开发平台的任务调度是指在作业发布之后,按照作业配置中指定的调度周期通过指定在一段时间范围内通过开始结束时间指定周期性的执行用户代码。 前言 随着公司规模的增长,对大数据的离线应用开发的需求越来越多,这些需求包括但不限于离线数据同步(MySQL/Hive/Hbase/Elastic Search 等之间的离线同步)、离线计算(Hive/MapReduce/Spark 等...

    HitenDev 评论0 收藏0
  • 布式任务框架之celery

    摘要:架构消息代理,作为临时储存任务的中间媒介,为提供了队列服务。生产者将任务发送到,消费者再从获取任务。如果使用,则有可能发生突然断电之类的问题造成突然终止后的数据丢失等后果。任务调度器,负责调度并触发定时周期任务。 架构 showImg(https://segmentfault.com/img/bVbmDXa?w=831&h=413); Broker 消息代理,作为临时储存任务的中间媒...

    fredshare 评论0 收藏0
  • 从0-1打造最强性能Scrapy爬虫集群

    摘要:包括爬虫编写爬虫避禁动态网页数据抓取部署分布式爬虫系统监测共六个内容,结合实际定向抓取腾讯新闻数据,通过测试检验系统性能。 1 项目介绍 本项目的主要内容是分布式网络新闻抓取系统设计与实现。主要有以下几个部分来介绍: (1)深入分析网络新闻爬虫的特点,设计了分布式网络新闻抓取系统爬取策略、抓取字段、动态网页抓取方法、分布式结构、系统监测和数据存储六个关键功能。 (2)结合程序代码分解说...

    vincent_xyb 评论0 收藏0
  • 微服务架构中,二次浅封装实践

    摘要:三实践案例案例简介分布式系统中,微服务基础组件等,系统中间件,等,对常用功能配置等,进行二次浅封装并统一集成管理,以满足日常开发中基础环境搭建与临时工具的快速实现。 一、背景简介 分布式系统中存在很多拆分的服务,在不断迭代升级的过程中,会出现如下常见的棘手情况: 某个技术组件版本升级,依赖包升级导致部分语法或者API过期,或者组件修复紧急的问题,从而会导致分布式系统下各个服...

    Hujiawei 评论0 收藏0
  • 基于Celery布式爬虫管理平台: Crawlab

    摘要:基于的爬虫分布式爬虫管理平台,支持多种编程语言以及多种爬虫框架。后台程序会自动发现这些爬虫项目并储存到数据库中。每一个节点需要启动应用来支持爬虫部署。任务将以环境变量的形式存在于爬虫任务运行的进程中,并以此来关联抓取数据。 Crawlab 基于Celery的爬虫分布式爬虫管理平台,支持多种编程语言以及多种爬虫框架。 Github: https://github.com/tikazyq/...

    legendaryedu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<