资讯专栏INFORMATION COLUMN

基于wokerman实现即时消息推送

JellyBool / 1563人阅读

摘要:目前网站有两个用到实时消息推送的功能,源码最新动态,实时显示用户的操作行为消息推送,如重要消息通知,任务指派等等考虑的问题既然要实现即时,那就少不了。

目前网站有两个用到实时消息推送的功能,源码:https://github.com/wuzhc/team

最新动态,实时显示用户的操作行为

消息推送,如重要消息通知,任务指派等等

考虑的问题

既然要实现即时,那就少不了socketio。因为项目是PHP写的,所以服务端直接用phpsocket.io

我们应该保存离线消息,否则如果用户不在线,那就接受不到消息。这里我用mongodb来存储消息。

一是消息不需要关联表,一条消息一个文档

二是mongodb适合做海量数据存储,并且分片也很简单

三是消息不需要永久存储,随着时间推移,消息的价值性越低,可以用固定集合来存储消息,当数据量达到一定值时覆盖最久的消息

一个页面连接一个socket,若用户同时打开了多个页面(即一个用户会对应多个socketID),那么消息应该推送到用户每一个打开的页面。一个简单的做法就是把用户多个sockID加到group分组(socket.emit("group")),group分组名称可以是"uid:1",表示userID为1的用户,然后socket.broadcast.to("uid:1").emit("event_name", data);

就可以实现对多个页面推送消息

实时动态

实时消息推送

客户端
        // 初始化io对象
        var socket = io("http://" + document.domain + ":2120");
        // 当socket连接后发送登录请求
        socket.on("connect", function () {
            socket.emit("login", {
                uid: "user->id?>",
                companyID: "user->identity->fdCompanyID?>"
            });
        });
        // 实时消息,当服务端推送来消息时触发
        socket.on("new_msg", function (msg) {
            if (msg.typeID == "") {
                var html = "
  • " + "" + "
    " + "" + "
    " + "

    " + msg.title + "

    " + "

    " + msg.content + "

    " + "
    " + "
  • "; $("#msg-handle").prepend(html); var num = $(".msg-handle-num").eq(1).text() * 1; $(".msg-handle-num").text(num + 1); } }); // 实时动态,当服务端推送来消息时触发 socket.on("update_dynamic", function (msg) { var html = "
  • " + "" + "User Image" + "
    " + "" + "" + msg.date + "" + "

    " + "" + msg.operator + "" + "" + msg.title + "" + "

    " + "
    " + msg.content + "
    " + "
    " + "
  • "; $("#dynamic-list").prepend(html); }); // 在线人数 socket.on("update_online_count", function (msg) { $("#online-people").html(msg); });
    服务端
        /**
         * 消息推送
         */
        public function actionStart()
        {
            // PHPSocketIO服务
            $io = new SocketIO(2120);
            // 客户端发起连接事件时,设置连接socket的各种事件回调
            $io->on("connection", function ($socket) use ($io) {
    
                /** @var Connection $redis */
                $redis = Yii::$app->redis;
    
                // 当客户端发来登录事件时触发
                $socket->on("login", function ($data) use ($socket, $redis, $io) {
    
                    $uid = isset($data["uid"]) ? $data["uid"] : null;
                    $companyID = isset($data["companyID"]) ? $data["companyID"] : null;
    
                    // uid和companyID应该做下加密 by wuzhc 2018-01-28
                    if (empty($uid) || empty($companyID)) {
                        return ;
                    }
                    $this->companyID = $companyID;
    
                    // 已经登录过了
                    if (isset($socket->uid)) {
                        return;
                    }
    
                    $key = "socketio:company:" . $companyID;
                    $field = "uid:" . $uid;
    
                    // 用hash方便统计用户打开页面数量
                    if (!$redis->hget($key, $field)) {
                        $redis->hset($key, $field, 0);
                    }
    
                    // 同个用户打开新页面时加1
                    $redis->hincrby($key, $field, 1);
    
                    // 加入uid分组,方便对同个用户的所有打开页面推送消息
                    $socket->join("uid:". $uid);
    
                    // 加入companyID,方便对整个公司的所有用户推送消息
                    $socket->join("company:".$companyID);
    
                    $socket->uid = $uid;
                    $socket->companyID = $companyID;
    
                    // 整个公司在线人数更新
                    $io->to("company:".$companyID)->emit("update_online_count", $redis->hlen($key));
                });
    
                // 当客户端断开连接是触发(一般是关闭网页或者跳转刷新导致)
                $socket->on("disconnect", function () use ($socket, $redis, $io) {
                    if (!isset($socket->uid)) {
                        return;
                    }
    
                    $key = "socketio:company:" . $socket->companyID;
                    $field = "uid:" . $socket->uid;
                    $redis->hincrby($key, $field, -1);
    
                    if ($redis->hget($key, $field) <= 0) {
                        $redis->hdel($key, $field);
    
                        // 某某下线了,刷新整个公司的在线人数
                        $io->to("company:".$socket->companyID)->emit("update_online_count", $redis->hlen($key));
                    }
                });
            });
    
            // 开始进程时,监听2121端口,用户数据推送
            $io->on("workerStart", function () use ($io) {
                /** @var Connection $redis */
                $redis = Yii::$app->redis;
    
                $httpWorker = new Worker("http://0.0.0.0:2121");
                // 当http客户端发来数据时触发
                $httpWorker->onMessage = function ($conn, $data) use ($io, $redis) {
                    $_POST = $_POST ? $_POST : $_GET;
                    switch (@$_POST["action"]) {
                        case "message":
                            $to = "uid:" . @$_POST["receiverID"];
                            $_POST["content"] = htmlspecialchars(@$_POST["content"]);
                            $_POST["title"] = htmlspecialchars(@$_POST["title"]);
    
                            // 有指定uid则向uid所在socket组发送数据
                            if ($to) {
                                $io->to($to)->emit("new_msg", $_POST);
                            }
    
                            $companyID = @$_POST["companyID"];
                            $key = "socketio:company:" . $companyID;
                            $field = "uid:" . $to;
    
                            // http接口返回,如果用户离线socket返回fail
                            if ($to && $redis->hget($key, $field)) {
                                return $conn->send("offline");
                            } else {
                                return $conn->send("ok");
                            }
                            break;
                        case "dynamic":
                            $to = "company:" . @$_POST["companyID"];
                            $_POST["content"] = htmlspecialchars(@$_POST["content"]);
                            $_POST["title"] = htmlspecialchars(@$_POST["title"]);
    
                            // 有指定uid则向uid所在socket组发送数据
                            if ($to) {
                                $io->to($to)->emit("update_dynamic", $_POST);
                            }
    
                            $companyID = @$_POST["companyID"];
                            $key = "socketio:company:" . $companyID;
    
                            // http接口返回,如果用户离线socket返回fail
                            if ($to && $redis->hlen($key)) {
                                return $conn->send("offline");
                            } else {
                                return $conn->send("ok");
                            }
                    }
    
                    return $conn->send("fail");
                };
    
                // 执行监听
                $httpWorker->listen();
            });
    
            // 运行所有的实例
            global $argv;
            array_shift($argv);
    
            if (isset($argv[2]) && $argv[2] == "daemon") {
                $argv[2] = "-d";
            }
    
            Worker::runAll();
        }
    例子
        /**
         * 新建任务
         * @param $projectID
         * @param $categoryID
         * @return string
         * @throws ForbiddenHttpException
         * @since 2018-01-26
         */
        public function actionCreate($projectID, $categoryID)
        {
            $userID = Yii::$app->user->id;
    
            if (!Yii::$app->user->can("createTask")) {
                throw new ForbiddenHttpException(ResponseUtil::$msg[1]);
            }
    
            if ($data = Yii::$app->request->post()) {
                if (empty($data["name"])) {
                    ResponseUtil::jsonCORS(["status" => Conf::FAILED, "msg" => "任务标题不能为空"]);
                }
    
                // 保存任务
                $taskID = TaskService::factory()->save([
                    "name"       => $data["name"],
                    "creatorID"  => $userID,
                    "companyID"  => $this->companyID,
                    "level"      => $data["level"],
                    "categoryID" => $categoryID,
                    "projectID"  => $projectID,
                    "content"    => $data["content"]
                ]);
    
                if ($taskID) {
                    $url = Url::to(["task/view", "taskID" => $taskID]);
                    $portrait = UserService::factory()->getUserPortrait($userID);
                    $username = UserService::factory()->getUserName($userID);
                    $title = "创建了新任务";
                    $content = $data["name"];
    
                    // 保存操作日志
                    LogService::factory()->saveHandleLog([
                        "objectID"   => $taskID,
                        "companyID"  => $this->companyID,
                        "operatorID" => $userID,
                        "objectType" => Conf::OBJECT_TASK,
                        "content"    => $content,
                        "url"        => $url,
                        "title"      => $title,
                        "portrait"   => $portrait,
                        "operator"   => $username
                    ]);
    
                    // 动态推送
                    MsgService::factory()->push("dynamic", [
                        "companyID"  => $this->companyID,
                        "operatorID" => $userID,
                        "operator"   => $username,
                        "portrait"   => $portrait,
                        "title"      => $title,
                        "content"    => $content,
                        "url"        => $url,
                        "date"       => date("Y-m-d H:i:s"),
                    ]);
    
                    ResponseUtil::jsonCORS(null, Conf::SUCCESS, "创建成功");
                } else {
                    ResponseUtil::jsonCORS(null, Conf::FAILED, "创建失败");
                }
            } else {
                return $this->render("create", [
                    "projectID" => $projectID,
                    "category"  => TaskCategory::findOne(["id" => $categoryID]),
                ]);
            }
        }
    调用保存操作日志底层接口
    LogService::factory()->saveHandleLog($args)
    参数说明
    参数 说明
    operatorID 操作者ID,用于用户跳转链接
    companyID 公司ID,用于该公司下的动态
    operator 操作者姓名
    portrait 操作者头像
    receiver 接受者,可选
    title 动态标题,例如创建任务
    content 动态内容,例如创建任务的名称
    date 动态时间
    url 动态跳转链接
    token 验证
    objectType 对象类型,例如任务,文档等等
    objectID 对象ID,例如任务ID,文档ID等等

    companyID 用于查询该公司下的动态

    objectType和objectID组合可以查询某个对象的操作日志,例如一个任务被操作的日志

    保存消息
    参数 说明
    senderID 发送者ID,用于用户跳转链接
    companyID 公司ID,用于该公司下的动态
    sender 发送者姓名
    portrait 发送者头像
    receiverID 接受者ID
    title 动态标题,例如创建任务
    content 动态内容,例如创建任务的名称
    date 动态时间
    url 动态跳转链接
    typeID 消息类型,1系统通知,2管理员通知,3操作通知

    receiverID和typeID和isRead组合可以查询用户未读消息

    请求接口
    MsgService::factory()->push($action, $args)
    即时动态
    参数 说明
    operatorID 操作者ID,用于用户跳转链接
    companyID 公司ID,用于给该公司的所有socket推送
    operator 操作者姓名
    portrait 操作者头像
    receiver 接受者,可选
    title 动态标题,例如创建任务
    content 动态内容,例如创建任务的名称
    date 动态时间
    url 动态跳转链接
    即时消息
    参数 说明
    senderID 发送者ID,用于用户跳转链接
    sender 发送者姓名
    receiverID 接受者ID,用于给接受者的所有socket推送消息
    typeID 消息类型,1系统通知,2管理员通知,3操作通知
    portrait 发送者头像
    title 消息标题
    content 消息内容
    url 消息跳转链接
    完整代码参考:https://github.com/wuzhc/team

    文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

    转载请注明本文地址:https://www.ucloud.cn/yun/28194.html

    相关文章

    • 打造你的Laravel即时应用(二)-消息推送与监听

      摘要:打造你的即时应用二消息推送与监听年月日接于上篇博客打造你的即时应用一项目初始化构建在上一篇博客中介绍了项目的基本构建现在进入实战操作一消息推送创建事件类的广播推送通过来实现下面通过命令来创建一个事件类为了配合我们的广播系统使用需要实现接 打造你的Laravel即时应用(二)-消息推送与监听 2019年08月04日20:16:21 XXM 接于上篇博客: 打造你的Laravel即时应用(...

      omgdog 评论0 收藏0
    • 搭建IM服务 so easy

      摘要:现在很多网站都通过服务来实现消息推送及数据即时同步功能,即时通讯组件逐渐成为产品的标配。目前国内有很多成熟稳定的第三方即时通讯服务厂家,比如融云。 现在很多网站、APP都通过IM服务来实现消息推送及数据即时同步功能,即时通讯组件逐渐成为产品的标配。目前国内有很多成熟稳定的第三方即时通讯服务厂家,比如:融云。使用这些专业的服务可以提高开发效率而且服务稳定有保障。 如果自己DIY或者需要在...

      imccl 评论0 收藏0

    发表评论

    0条评论

    最新活动
    阅读需要支付1元查看
    <