摘要:宋体本文通过一个教育行业的应用案例,剖析业务系统对实时计算的需求场景,并分析了和两种实现方式的异同,最后通过运用产品中封装的模块,来加速开发效率,更快地完成需求。宋体中间的实时计算框架,则在和中选择。
{ "student_id": "学生ID_16", "textbook_id": "教材ID_1", "grade_id": "年级ID_1", "subject_id": "科目ID_2_语文", "chapter_id": "章节ID_chapter_2", "question_id": "题目ID_100", "score": 2, "answer_time": "2019-09-11 12:44:01", "ts": "Sep 11, 2019 12:44:01 PM"}
{"student_id":"学生ID_16","textbook_id":"教材ID_1","grade_id":"年级ID_1","subject_id":"科目ID_2_语文","chapter_id":"章节ID_chapter_2","question_id":"题目ID_100","score":2,"answer_time":"2019-09-11 12:44:01","ts":"Sep 11, 2019 12:44:01 PM"}………
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(3)
val props = new Properties()props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")props.setProperty("group.id", "group_consumer_learning_test01") val flinkKafkaSource = new FlinkKafkaConsumer011[String]("test_topic_learning_1", new SimpleStringSchema(), props)val eventStream = env.addSource[String](flinkKafkaSource)
val answerDS = eventStream.map(s => { val gson = new Gson() val answer = gson.fromJson(s, classOf[Answer]) answer})
val tableEnv = StreamTableEnvironment.create(env)val table = tableEnv.fromDataStream(answerDS)tableEnv.registerTable("t_answer", table)
//实时:统计题目被作答频次val result1 = tableEnv.sqlQuery( """SELECT | question_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | question_id """.stripMargin) //实时:按照年级统计每个题目被作答的频次val result2 = tableEnv.sqlQuery( """SELECT | grade_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | grade_id """.stripMargin) //实时:统计不同科目下,每个题目被作答的频次val result3 = tableEnv.sqlQuery( """SELECT | subject_id, question_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | subject_id, question_id """.stripMargin)
tableEnv.toRetractStream[Result1](result1) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_2", new SimpleStringSchema())) tableEnv.toRetractStream[Result2](result2) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_3", new SimpleStringSchema())) tableEnv.toRetractStream[Result3](result3) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_4", new SimpleStringSchema()))
env.execute("Flink StreamingAnalysis")
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/117605.html
摘要:目前实时计算在有赞的整体技术架构如下图未来规划首先要落地并的是实时任务化,提高化任务可以覆盖的业务场景目标是,从而通过提高业务开发效率的角度赋能业务。 1. 概述 有赞是一个商家服务公司,提供全行业全场景的电商解决方案。在有赞,大量的业务场景依赖对实时数据的处理,作为一类基础技术组件,服务着有赞内部几十个业务产品,几百个实时计算任务,其中包括交易数据大屏,商品实时统计分析,日志平台,调...
摘要:经过近两个小时的讨论,很不幸我们得出了最后的结论在国内互联网发展的这年间,短平快的发展模式造成了中国软件工程领域架构师的严重断层。中国真正的架构师在哪里在和产品组里的同学的讨论过程中。 点击上方蓝色字体,选择设为星标 回复面试获取更多惊喜 背景 我先说下这篇文章的背景。 放假前的晚上,...
摘要:所以要用实现流与维表的分两步一用实现维表的功能要实现维表功能就要用到这个功能,是由阿里巴巴贡献给的。是由阿里巴巴贡献给社区的,于版本引入,主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。 showImg(https://segmentfault.com/img/bVbqZJE?w=583&h=123); 作为一家创新驱动的科技公司,袋鼠云每年研发投入达数千万,公司80%...
摘要:再如通过处理流数据生成简单的报告,如五分钟的窗口聚合数据平均值。复杂的事情还有在流数据中进行数据多维度关联聚合塞选,从而找到复杂事件中的根因。因为各种需求,也就造就了现在不断出现实时计算框架,而下文我们将重磅介绍我们推荐的实时计算框架。 前言 先广而告之,本文摘自本人《大数据重磅炸弹——实时计算框架 Flink》课程第二篇,内容首发自我的知识星球,后面持续在星球里更新,这里做个预告,今...
摘要:基于流处理机制实现批流融合相对基于批处理机制实现批流融合的思想更自然,更合理,也更有优势,因此阿里巴巴在基于支持大量核心实时计算场景的同时,也在不断改进的架构,使其朝着真正批流融合的统一计算引擎方向前进。 阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行。Flink Forward是由Apache软件基金会授权的全球范围...
阅读 903·2021-09-22 16:04
阅读 3056·2019-12-27 12:10
阅读 1383·2019-08-30 15:43
阅读 895·2019-08-29 14:01
阅读 3255·2019-08-26 12:19
阅读 3227·2019-08-26 12:15
阅读 1321·2019-08-26 12:13
阅读 3125·2019-08-23 17:00