资讯专栏INFORMATION COLUMN

Python任务调度模块APScheduler

zxhaaa / 2755人阅读

介绍

官网文档:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...

APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:

triggers: 任务触发器组件,提供任务触发方式

job stores: 任务商店组件,提供任务保存方式

executors: 任务调度组件,提供任务调度方式

schedulers: 任务调度组件,提供任务工作方式

安装

pip 安装

</>复制代码

  1. $ pip install apscheduler

源码安装

</>复制代码

  1. $ python setup.py install
简单的实例

</>复制代码

  1. from apscheduler.schedulers.blocking import BlockingScheduler
  2. import time
  3. # 实例化一个调度器
  4. scheduler = BlockingScheduler()
  5. def job1():
  6. print "%s: 执行任务" % time.asctime()
  7. # 添加任务并设置触发方式为3s一次
  8. scheduler.add_job(job1, "interval", seconds=3)
  9. # 开始运行调度器
  10. scheduler.start()

输出:

</>复制代码

  1. $ python first.py
  2. Fri Sep 8 20:41:55 2017: 执行任务
  3. Fri Sep 8 20:41:58 2017: 执行任务
  4. ...
各组件功能 trigger组件

trigger提供任务的触发方式,共三种方式:

date:只在某个时间点执行一次run_date(datetime|str)

</>复制代码

  1. scheduler.add_job(my_job, "date", run_date=date(2017, 9, 8), args=[])
  2. scheduler.add_job(my_job, "date", run_date=datetime(2017, 9, 8, 21, 30, 5), args=[])
  3. scheduler.add_job(my_job, "date", run_date="2017-9-08 21:30:05", args=[])
  4. # The "date" trigger and datetime.now() as run_date are implicit
  5. sched.add_job(my_job, args=[[])

interval: 每隔一段时间执行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

</>复制代码

  1. scheduler.add_job(my_job, "interval", hours=2)
  2. scheduler.add_job(my_job, "interval", hours=2, start_date="2017-9-8 21:30:00", end_date="2018-06-15 21:30:00)
  3. @scheduler.scheduled_job("interval", id="my_job_id", hours=2)
  4. def my_job():
  5. print("Hello World")

cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

</>复制代码

  1. sched.add_job(my_job, "cron", hour=3, minute=30)
  2. sched.add_job(my_job, "cron", day_of_week="mon-fri", hour=5, minute=30, end_date="2017-10-30")
  3. @sched.scheduled_job("cron", id="my_job_id", day="last sun")
  4. def some_decorated_task():
  5. print("I am printed at 00:00:00 on the last Sunday of every month!")
scheduler组件

scheduler组件提供执行的方式,在不同的运用环境中选择合适的方式

BlockingScheduler: 进程中只运行调度器时的方式

</>复制代码

  1. from apscheduler.schedulers.blocking import BlockingScheduler
  2. import time
  3. scheduler = BlockingScheduler()
  4. def job1():
  5. print "%s: 执行任务" % time.asctime()
  6. scheduler.add_job(job1, "interval", seconds=3)
  7. scheduler.start()

BackgroundScheduler: 不想使用任何框架时的方式

</>复制代码

  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. import time
  3. scheduler = BackgroundScheduler()
  4. def job1():
  5. print "%s: 执行任务" % time.asctime()
  6. scheduler.add_job(job1, "interval", seconds=3)
  7. scheduler.start()
  8. while True:
  9. pass

AsyncIOScheduler: asyncio module的方式(Python3)

</>复制代码

  1. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  2. try:
  3. import asyncio
  4. except ImportError:
  5. import trollius as asyncio
  6. ...
  7. ...
  8. # while True:pass
  9. try:
  10. asyncio.get_event_loop().run_forever()
  11. except (KeyboardInterrupt, SystemExit):
  12. pass

GeventScheduler: gevent方式

</>复制代码

  1. from apscheduler.schedulers.gevent import GeventScheduler
  2. ...
  3. ...
  4. g = scheduler.start()
  5. # while True:pass
  6. try:
  7. g.join()
  8. except (KeyboardInterrupt, SystemExit):
  9. pass

TornadoScheduler: Tornado方式

</>复制代码

  1. from tornado.ioloop import IOLoop
  2. from apscheduler.schedulers.tornado import TornadoScheduler
  3. ...
  4. ...
  5. # while True:pass
  6. try:
  7. IOLoop.instance().start()
  8. except (KeyboardInterrupt, SystemExit):
  9. pass

TwistedScheduler: Twisted方式

</>复制代码

  1. from twisted.internet import reactor
  2. from apscheduler.schedulers.twisted import TwistedScheduler
  3. ...
  4. ...
  5. # while True:pass
  6. try:
  7. reactor.run()
  8. except (KeyboardInterrupt, SystemExit):
  9. pass

QtScheduler: Qt方式

executors组件

executors组件提供任务的调度方式

base

debug

gevent

pool(max_workers=10)

twisted

jobstore组件

jobstore提供任务的各种持久化方式

base

memory

mongodb
scheduler.add_jobstore("mongodb", collection="example_jobs")

redis
scheduler.add_jobstore("redis", jobs_key="example.jobs", run_times_key="example.run_times")

rethinkdb
scheduler.add_jobstore("rethinkdb", database="apscheduler_example")

sqlalchemy
scheduler.add_jobstore("sqlalchemy", url=url)

zookeeper
scheduler.add_jobstore("zookeeper", path="/example_jobs")

任务操作 添加任务add_job(如上)

</>复制代码

  1. 如果使用了任务的存储,开启时最好添加replace_existing=True,否则每次开启都会创建任务的副本
    开启后任务不会马上启动,可修改trigger参数

删除任务remove_job

</>复制代码

  1. # 根据任务实例删除
  2. job = scheduler.add_job(myfunc, "interval", minutes=2)
  3. job.remove()
  4. # 根据任务id删除
  5. scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id")
  6. scheduler.remove_job("my_job_id")
任务的暂停pause_job和继续resume_job

</>复制代码

  1. job = scheduler.add_job(myfunc, "interval", minutes=2)
  2. # 根据任务实例
  3. job.pause()
  4. job.resume()
  5. # 根据任务id暂停
  6. scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id")
  7. scheduler.pause_job("my_job_id")
  8. scheduler.resume_job("my_job_id")
任务的修饰modify和重设reschedule_job

修饰:job.modify(max_instances=6, name="Alternate name")
重设:scheduler.reschedule_job("my_job_id", trigger="cron", minute="*/5")

调度器操作

开启 scheduler.start()

关闭 scheduler.shotdown(wait=True | False)

暂停 scheduler.pause()

继续 scheduler.resume()

监听 http://apscheduler.readthedoc...

</>复制代码

  1. def my_listener(event):
  2. if event.exception:
  3. print("The job crashed :(")
  4. else:
  5. print("The job worked :)")
  6. scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
官方实例

</>复制代码

  1. from pytz import utc
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. from apscheduler.jobstores.mongodb import MongoDBJobStore
  4. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  5. from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
  6. jobstores = {
  7. "mongo": MongoDBJobStore(),
  8. "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
  9. }
  10. executors = {
  11. "default": ThreadPoolExecutor(20),
  12. "processpool": ProcessPoolExecutor(5)
  13. }
  14. job_defaults = {
  15. "coalesce": False,
  16. "max_instances": 3
  17. }
  18. scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/40820.html

相关文章

  • 定时任务框架APScheduler学习详解

    摘要:安装利用进行安装源码安装有四种组成部分触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。 APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用t...

    sewerganger 评论0 收藏0
  • APScheduler任务调度利器

    摘要:中任务调度一般用中的任务调度工具也有不少等。调度器配置示例方式一方式二三略。移除调用放到,参数为调用实例的方法注意如果任务已经调度完毕,并且之后也不会再被执行的情况下,会被自动移除。可以监听调度任务执行情况相关的事件。 Java中任务调度一般用Quartz,Python中的任务调度工具也有不少:Celery,RQ,APScheduler等。Celery:非常强大的分布式任务调度框架RQ...

    Flink_China 评论0 收藏0
  • Python Apscheduler源代码解析(一) 任务调度流程

    摘要:最近公司有项目需要使用到定时任务,其定时逻辑类似于的,就使用了这个类库。在一次循环结束之前会计算任务下次执行事件与当前时间之差,然后让调度线程挂起直到那个时间到来。 最近公司有项目需要使用到定时任务,其定时逻辑类似于linux的Cron,就使用了Apscheduler这个类库。基于公司的业务,需要修改Apshceduler,故而研究了一下Apscheduler的代码。 Apschedu...

    chavesgu 评论0 收藏0
  • Flask-APScheduler使用教程

    摘要:项目中需要用到定时器和循环执行。运用线程执行轮询操作,也有运用系统的的文章最多,但是太麻烦。和中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用进行本地开发。同时也包含了对任务的控制。后续有需求在继续。 项目中需要用到定时器和循环执行。去网上搜了一下,比较常见的有一下集中。运用Python线程执行轮询操作,也有运用Linux系统的Cron,Celery的文章最多,但...

    Noodles 评论0 收藏0
  • django开发-定时任务的使用

    摘要:今天介绍在中使用定时任务的两种方式。添加并启动定时任务其它命令显示当前的定时任务删除所有定时任务今天的定时任务就说到这里,有错误之处,欢迎交流指正 今天介绍在django中使用定时任务的两种方式。 方式一: APScheduler1)安装: pip install apscheduler 2)使用: from apscheduler.scheduler import Scheduler...

    wean 评论0 收藏0

发表评论

0条评论

zxhaaa

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<