资讯专栏INFORMATION COLUMN

基于websocket的celery任务状态监控

microelec / 3097人阅读

摘要:目的曾经想向前台实时返回任务的状态监控,也查看了很多博客,但是好多也没能如愿,因此基于网上已有的博客已经自己的尝试,写了一个小的,实现前台实时获取后台传输的任务状态。实现仿照其他例子实现了一个简单的后台任务监控。

1. 目的
曾经想向前台实时返回Celery任务的状态监控,也查看了很多博客,但是好多也没能如愿,因此基于网上已有的博客已经自己的尝试,写了一个小的demo,实现前台实时获取后台传输的任务状态。

2. 准备
本篇文章使用的是Flask框架,安装celery,celery采用redis作为存储。同时用到了Flask-SocketIO建立websocket。同时还用到了协程库eventlet(这个是Flask-SocketIO文档建议的,链接文档)。

3. 实现
demo仿照其他例子实现了一个简单的后台任务监控。我们直接上代码吧,下面是server端代码:

# -*- utf-8 -*-
# app.py
import time
import uuid
from flask import Flask, render_template, request, make_response, jsonify
from flask_socketio import SocketIO
from celery import Celery
import eventlet
from flask_redis import FlaskRedis
eventlet.monkey_patch()

app = Flask(__name__)

app.config["BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0"
app.config["CELERY_ACCEPT_CONTENT"] = ["json", "pickle"]
app.config["REDIS_URL"] = "redis://localhost:6379/0"

socketio = SocketIO(app, async_mode="eventlet",message_queue=app.config["CELERY_RESULT_BACKEND"])
redis = FlaskRedis(app)

celery = Celery(app.name)
celery.conf.update(app.config)

#模拟后台耗时任务
@celery.task
def background_task(uid):
    sid = redis.get(uid)
    socketio.emit("info", {"data": "Task starting ...", "time": time.time() * 1000 },room=sid, namespace="/task")
    socketio.sleep(4)
    socketio.emit("info", {"data": "Task running!", "time": time.time() * 1000 }, room=sid, namespace="/task")
    socketio.sleep(5)
    socketio.emit("info", {"data": "Task complete!", "time": time.time()*1000 }, room=sid, namespace="/task")

#建立链接时把sid传到浏览器端保存。
@socketio.on("connect", namespace="/task")
def connect_host():
    sid = request.sid
    socketio.emit("hostadd", {"sid": sid}, room=sid, namespace="/task")

#将每一个客户端生成一个uuid存放在cookie中
@app.route("/")
def index():
    if not request.cookies.get("host_uid", None):
        uid = uuid.uuid1().get_hex()
        response = make_response(render_template("index.html"))
        response.set_cookie("host_uid", uid)
        return response
    return render_template("index.html")

@app.route("/task")
def start_background_task():
    uid = request.cookies.get("host_uid")
    background_task.delay(uid)
    return "Started"

#设置sid建立链接后浏览器将sid传送到server,并将uid与sid映射存放在redis里面,默认保留12小时
@app.route("/setsid", methods=["POST"])
def set_uid():
    data = request.json
    uid = request.cookies.get("host_uid")
    redis.set(uid, data["sid"])
    redis.expire(uid, 3600 * 12)
    return jsonify({"success": True})

if __name__ == "__main__":
    socketio.run(app, host="0.0.0.0", port=5000, debug=True)

如果不想使用debug模式的话,可以用gunicorn运行,命令如下所示:

gunicorn --worker-class eventlet -w 1 app:app

使用上述命令需要注意,由于gunicorn负载均衡算法的限制,文档建议worker数量为1,我测试过大于1,确实会出问题。
前端代码如下,index.html:




    test
    
    


    

Logging

GitHub地址:https://github.com/junfenggoo...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/42274.html

相关文章

  • 分布式任务框架之celery

    摘要:架构消息代理,作为临时储存任务的中间媒介,为提供了队列服务。生产者将任务发送到,消费者再从获取任务。如果使用,则有可能发生突然断电之类的问题造成突然终止后的数据丢失等后果。任务调度器,负责调度并触发定时周期任务。 架构 showImg(https://segmentfault.com/img/bVbmDXa?w=831&h=413); Broker 消息代理,作为临时储存任务的中间媒...

    fredshare 评论0 收藏0
  • 如何在多个queue多台server上部署Celery 以及任务状态监控flower

    摘要:是分布式任务队列,能实时处理任务,同时支持官方文档工作原理如下发送给从中消费消息,并将结果存储在中本文中使用的是,使用的是现在有两个,分别是加法运算和乘法运算。假定乘法运算的事件优先级高事件也很多,对于加法运算,要求每分钟最多处理个事件。 Celery是分布式任务队列,能实时处理任务, 同时支持task scheduling. 官方文档Celery工作原理如下: celery cli...

    goji 评论0 收藏0
  • Flask+Celery+Redis实现队列化异步任务

    摘要:使用异步框架,例如等等,装饰异步任务。它是一个专注于实时处理的任务队列,同时也支持任务调度。不存储任务状态。标识要使用的默认序列化方法的字符串。指定该任务的结果存储后端用于此任务。 概述:         我们考虑一个场景,公司有一个需求,现在需要做一套web系统,而这套系统某些功能需要使用...

    Ali_ 评论0 收藏0
  • 基于Flask-Angular项目组网架构与部署

    摘要:基于网,分享项目的组网架构和部署。项目组网架构架构说明流项目访问分为两个流,通过分两个端口暴露给外部使用数据流用户访问网站。通过进行配置,使用作为异步队列来存储任务,并将处理结果存储在中。 基于Raindrop网,分享项目的组网架构和部署。 项目组网架构 showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...

    kelvinlee 评论0 收藏0

发表评论

0条评论

microelec

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<