资讯专栏INFORMATION COLUMN

【大数据实践】KSQL流处理——如何将多个STREAM输出到一个TOPIC

Cristalven / 3121人阅读

摘要:大数据实践流处理如何将数据处理结果推到指定需求场景描述在生产环境中,各个业务服务产生的事件都会被到消息中间件中。这应该是的一个。派生流中输出的数据结构充值超过元奖励万金币卡道具与流定义的结构相同。

【大数据实践】KSQL流处理——如何将数据处理结果推到指定Topic 需求场景描述

在生产环境中,各个业务服务产生的事件都会被push到Kafka消息中间件中。如:充值中心的 充值事件 会被push到kafka的recharge topic中,玩家 结算事件 会被push到kafka的game_score topic中。

平台希望通过处理,实时分析这些事件,筛选出满足条件的一些玩家,对其奖励相应道具。如,想做一个针对不同充值金额的玩家奖励不同道具的活动:

0 < 充值金额 < 100 时, 奖励一个10万金币卡道具(道具ID:"10w")

100 <= 充值金额 时,奖励一个100万金币卡道具(道具ID:"100w")

方案设计

将充值的事件(原始日志数据,JSON格式),推送到Kafka的recharge topic中。充值事件数据格式:

{"event_type" : "cash_order",
 "username" : "foo",
 "channel" : "wx_scan",
 "cash" : 100
 }

kafka中新建一个PROPREWARD的topic,专门接收道具奖励的事件,该主题事件消息格式为:

{"user/name" : "foo"
 "prop/id" : "道具ID"
 "reward/reason" : "奖励的原因"}

一个道具发放服务(Kafka消费者)订阅该主题,当道具奖励事件到达时,获取事件中的 用户名道具ID 为指定玩家发放道具。

使用KSQL创建两个派生流(Stream),分别从recharge topic中过滤出0 < 充值金额 < 100100 <= 充值金额 的事件,过滤出符合条件的用户名,并组装成约定的道具奖励事件,将其推送到Kafka的PROPREWARD topic中。

具体实现

新建一个kafka topic : PROPREWARD (大写),用于接收和存储道具奖励事件

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic PROPREWARD

从kafka topic PROPREWARD创建一个流(PROPREWARD),用于输出道具奖励事件,注意:流的名字与kafka topic的名字相同。

CREATE STREAM PROPREWARD (`user/name` varchar, `seed/id` varchar, `reward/reason` varchar) 
WITH (kafka_topic="PROPREWARD", value_format="JSON");

根据业务需要,新建一个查询规则为0 < 充值金额 < 100 的派生流并插入到PROPREWARD流中。

INSERT INTO PROPREWARD 
   SELECT username AS `user/name` , "10w" AS `prop/id`, "充值100元以内奖励10万金币卡道具" AS `reward/reason` 
   FROM recharge 
   WHERE EVENT_TYPE = "cash_order" 
          AND CASH > 0 
          AND CASH <= 100;

根据业务需要,新建一个查询规则为 100 <= 充值金额 的派生流并插入到PROPREWARD流中。

INSERT INTO PROPREWARD 
   SELECT username AS `user/name` , "100w" AS `prop/id`, "充值超过100元奖励100万金币卡道具" AS `reward/reason` 
   FROM recharge 
   WHERE EVENT_TYPE = "cash_order" AND CASH >= 100;

还可以根据业务需要,从其他kafka topic中派生出其他流,插入到PROPREWARD流中。

结果验证

往kafka的recharge topic中写入数据:

{"event_type" : "cash_order",
 "username" : "foo",
 "channel" : "wx_scan",
 "cash" : 9
 }

可以在topic PROPREWARD中接收到事件:

{"user/name" : "foo" ,
 "prop/id" : "10w"
 "reward/reason" : "充值100元以内奖励10万金币卡道具"}

往kafka的 recharge topic中写入数据:

{"event_type" : "cash_order",
 "username" : "foo",
 "channel" : "wx_scan",
 "cash" : 11
 }

可以在topic PROPREWARD 中接收到事件:

{"user/name" : "foo" ,
 "prop/id" : "100w",
 "reward/reason" : "充值100元以上奖励100万金币卡道具"}

注意

要想将上述两个派生流插入(INSERT INTO)到输出结果的PROPREWARD流中,需要确保:

PROPREWARD 的名字与输出结果的Kafka topic名字相同,否则会抛出异常。这应该是KSQL 5.0.0 的一个BUG。

派生流中输出的数据结构 (SELECT username AS user/name , "100w" AS prop/id, "充值超过100元奖励100万金币卡道具" AS reward/reason 与 流 PROPREWARD 定义的结构相同。

总结

通过Kafka + KSQL 流式处理,可以配置出丰富的活动——根据各种不同的事件和规则,奖励不同的道具(或者其他类型的东西),而不需要额外的代码开发!!

KSQL官方参考文档

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/33967.html

相关文章

  • 大数据实】Kafka生产者编程(1)——KafkaProducer详解

    摘要:从上述构造函数可以看出,可以通过和两种形式传递配置信息,用于构造对象,配置信息均为对。获取内部的和,用于后续该产生的事务性消息。该方法的返回值为该被发送到的分区的元数据,如偏移量,创建时间等。 前言 在文章【大数据实践】游戏事件处理系统系列文章中中,我们已经做到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为...

    yuanzhanghu 评论0 收藏0
  • 大数据实】Kafka生产者编程(1)——KafkaProducer详解

    摘要:从上述构造函数可以看出,可以通过和两种形式传递配置信息,用于构造对象,配置信息均为对。获取内部的和,用于后续该产生的事务性消息。该方法的返回值为该被发送到的分区的元数据,如偏移量,创建时间等。 前言 在文章【大数据实践】游戏事件处理系统系列文章中中,我们已经做到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为...

    shengguo 评论0 收藏0
  • 大数据实】Kafka生产者编程(3)——Interceptor & Partitione

    摘要:前言在上一篇文章大数据实践生产者编程发送流程中,对自定义和自定义做了简单介绍,没有做深入讲解。必须字段,表示消息内容。如果没有设置和,则会采用类似于轮询方式但不是严格轮询,而是类似于随机数。 前言 在上一篇文章【大数据实践】Kafka生产者编程(2)——producer发送流程中,对自定义Interceptor和自定义Partitioner做了简单介绍,没有做深入讲解。因此,在本文章中...

    worldligang 评论0 收藏0
  • 大数据实】Kafka生产者编程(3)——Interceptor & Partitione

    摘要:前言在上一篇文章大数据实践生产者编程发送流程中,对自定义和自定义做了简单介绍,没有做深入讲解。必须字段,表示消息内容。如果没有设置和,则会采用类似于轮询方式但不是严格轮询,而是类似于随机数。 前言 在上一篇文章【大数据实践】Kafka生产者编程(2)——producer发送流程中,对自定义Interceptor和自定义Partitioner做了简单介绍,没有做深入讲解。因此,在本文章中...

    learning 评论0 收藏0
  • 大数据实】Kafka生产者编程(2)——producer发送

    摘要:必须字段,表示消息内容。检查长度是否超过限制根据配置项和进行检查,超出任何一项就会抛出异常。过滤掉过期的,对于过期的,会通过通知发送失败。 前言 在上一篇文章【大数据实践】Kafka生产者编程(1)——KafkaProducer详解中,主要对KafkaProducer类中的函数进行了详细的解释,但仅针对其中的一些方法,对于producer背后的原理、机制,没有做深入讲解。因此,在本文章...

    tianlai 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<