摘要:表示事件处理过程。表示在事件进入前要使用的编码处理。示例如下将非结构化数据转为结构化数据自定义匹配模式文件地址。添加字段将非结构化数据中的内容提取出来,转为结构化数据,并对加入到结构中。将处理后的事件输出到消息中间件。
前言
在之前的文章【大数据实践】游戏事件处理系统(1)——事件收集-filebeat中,对本系统的背景、目标及技术方案进行了概述,并利用filebeat收集到日志,发送到logstash。因此,本文章将对logstash如何接收、处理、输出事件进行介绍。
logstash根据配置文件****.conf(如game-score.conf)对每个事件执行输入、处理、输出过程,logstash为每一个过程提供了丰富的插件。配置文件的大体结构如下:
input {} filter {} output {}
input表示事件输入。
filter表示事件处理过程。
output表示事件输出。
事件输入假设日志文件存在目录地址为:/Users/admin/Documents/workspace/elk/logstash-6.2.3/gameScore.log。
其内容为:
2015-11-02 14:26:53,355 DEBUG [IScoreService] service.score.IScoreService.recordScore Arguments:[SSZ game result. gameId : 2015-11-02_14:26:37_新手入门_1_002_512 tax : 0, [Lservice.score.GameResultBean;@1a4347b9] Returns: [snailiu,999979438,15]cost : 26
logstash的配置文件中,输入模块的配置内容如下:
input { file { path => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/gameScore.log" type => "game_score" start_position => "beginning" codec => plain { charset => "GBK" } } ## 来自于filebeat的事件作为输入 beats { ## 端口与filebeat中filebeat.yml文件output中配置的端口一致。 port => 5044 } }
日志文件作为输入,需要使用的file插件(input插件列表)。input中可以有多个file,用于同时收集多个日志文件(参考官方文档)。其中:
path表示文件路径。
type表示该输入事件的类型,可自由定义。
start_position表示开始位置,设为beginning表示需从文件头导入旧的事件。
codec表示在事件进入input前要使用的编码处理。若日志文件使用GBK编码,那么就需要在codec指定,以便后续处理过程中,不再需要处理编码问题。
接收filebeat发送过来的数据时,需要使用到beats插件。
事件处理每一个事件都会经过filter中的处理过程(从上到下处理)。可利用多个filter 插件来完成处理过程。filter示例如下:
filter { ## 将非结构化数据转为结构化数据 grok { ## 自定义匹配模式文件地址。 patterns_dir => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/game_score_patterns" ## 根据自定义的匹配模式,匹配数据。数组表示匹配多个模式。 match => [ "message","%{DC_SCORE_NORMAL}", "message","%{DC_SCORE_MODE}" ] ## 删掉一些不要的字段。 remove_field => [host,path,message,svr_type,gm_gr,user1_balance,user2_balance,user3_balance,user4_balance,cost,svr_method,type,money] ## 添加字段:将非结构化数据`game_id`中的内容提取出来,转为结构化数据,并kv对加入到map结构中。 add_field => {"game_id" => "%{game_date}_%{game_time}_%{bet_name}_%{bet_count}_%{room_id}_%{desk_id}"} } ## 如果解析出错,则抛弃该事件。 if "_grokparsefailure" in [tags] { drop { } } mutate { ## 对map中数据进行类型转换。 convert => [ "tax" , "integer" , "user1_delta" , "integer" , "user2_delta" , "integer" , "user3_delta" , "integer" , "user4_delta" , "integer" ] ## 字符串替换 gsub => [ "user1_name", "[]", "", "user2_name", "[]", "", "user3_name", "[]", "", "user4_name", "[]", "" ] } ## ruby插件,支持ruby脚本 ruby { ## ruby脚本文件路径 path => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/drop_percentage.rb" ## ruby脚本程序的输入参数 script_params => { "PDK" => ",20,30,1,"} } ## 若出错,则丢弃该事件 if "_jsonparsefailure" in [tags] { drop { } } ## 日期处理插件 date { match => [ "date_stamp", "YY-MM-dd HH:mm:ss,SSS" ] timezone => "Asia/Shanghai" remove_field => ["@version","@timestamp","server_tag","date_stamp","tmp_users","user1_name","user2_name","user3_name","user4_name","user1_delta","user2_delta","user3_delta","user4_delta"] } }
其中,用到的filter插件有:
grok:将非结构化日志数据解析为结构化数据的插件,非常常用。支持自定义一些解析模式(patterns),如game_score_patterns文件:
DC_SCORE_USER_1 %{DATA:user1_name},%{INT:user1_balance},%{INT:user1_delta} DC_SCORE_USER_2 %{DC_SCORE_USER_1}, %{DATA:user2_name},%{INT:user2_balance},%{INT:user2_delta} DC_SCORE_USER_3 %{DC_SCORE_USER_2}, %{DATA:user3_name},%{INT:user3_balance},%{INT:user3_delta} DC_SCORE_USER_4 %{DC_SCORE_USER_3}, %{DATA:user4_name},%{INT:user4_balance},%{INT:user4_delta} DC_SCORE_USERS %{DC_SCORE_USER_4}|%{DC_SCORE_USER_3}|%{DC_SCORE_USER_2}|%{DC_SCORE_USER_1} DC_SCORE_NORMAL %{DATA:date_stamp} DEBUG [%{DATA:svr_type}] com.basecity.hjd.service.score.IScoreService.%{DATA:svr_method} Arguments:[%{DATA:server_tag} game result. gameId(( : pdk %{DATA:score_type})|( : sreqw %{DATA:score_type})|) : %{DATA:game_date}_%{DATA:game_time}_%{DATA:bet_name}_%{INT:bet_count}_%{DATA:room_id}_%{DATA:desk_id} tax : %{INT:tax}, %{DATA:gm_gr}] Returns: [%{DC_SCORE_USERS}]cost : %{INT:cost} DC_SCORE_MODE %{DATA:date_stamp} DEBUG [%{DATA:svr_type}] com.basecity.hjd.service.score.IScoreService.%{DATA:svr_method} Arguments:[%{DATA:score_type} game result. gameId : %{DATA:game_date}_%{DATA:game_time}_%{DATA:bet_name}_%{INT:bet_count}_%{DATA:room_id}_%{DATA:desk_id} user: %{DATA:username} get prize:%{INT:money}, [%{DATA:gm_gr}] Returns: [%{DC_SCORE_USERS}]cost : %{INT:cost}
通过match => ... 从非结构化的日志中,提取出想要的数据,并为其指定关键字命名,转为map结构。
mutate:数据转换插件,支持对字段内容进行格式转换、类型转换等。
ruby:当你感觉其他插件都不够灵活,需要你通过代码自由处理的时候,可以使用ruby插件,编写事件处理脚本。其中,ruby脚本的输入事件即为上面mutate处理之后的结果事件。ruby中对事件的处理过程可参照event api。
date:日期处理插件,如转换时区,转换时间格式。
事件输出时间经过filter之后,转化为结构化的JSON格式。可以通过各种输出插件将其输出到指定位置(或服务)。如:
## 输出到标准输出。 stdout { codec => rubydebug } ## 输出到http服务 http { http_method => "post" url => "http://127.0.0.1:9090" } ## 输出到kafka kafka { codec => json topic_id => "mytopic" }
其中,示例的三个插件分别为:
stdout:将事件输出到标准输出,可用于调试。
http:将事件输出到其他web服务。
kafka:将处理后的事件输出到kafka消息中间件。
小结至此,利用logstash,完成了日志事件的收集和处理的过程,因本系统是搭配kafka消息中间件工作的,因此,output中使用kafka插件,后续需根据具体情况,完善output中kafka模块的编写。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/33925.html
摘要:背景及目标游戏平台每天产生上千万条牌局结算日志包括跑得快斗地主等各种游戏,这些日志未非结构化数据。因此,想要一个实现游戏日志处理系统,实现如下功能监控日志文件,收集日志。处理日志非结构化转为结构化数据如。输出结构化数据到消息中间件。 背景及目标 游戏平台每天产生上千万条牌局结算日志(包括跑得快PDK、斗地主DDZ等各种游戏),这些日志未非结构化数据。很多时候,我们需要依据这些日志制定各...
摘要:表示事件处理过程。表示在事件进入前要使用的编码处理。示例如下将非结构化数据转为结构化数据自定义匹配模式文件地址。添加字段将非结构化数据中的内容提取出来,转为结构化数据,并对加入到结构中。将处理后的事件输出到消息中间件。 前言 在之前的文章【大数据实践】游戏事件处理系统(1)——事件收集-filebeat中,对本系统的背景、目标及技术方案进行了概述,并利用filebeat收集到日志,发送...
摘要:前言上一篇文章大数据实践游戏事件处理系统事件处理中,对日志的处理进行了讲解,其事件最终要输出到集群中。大数据实践游戏事件处理系统系列文章主要更倾向于试验,因此对深一层的理论研究和介绍不是很多,后面可能开另外的系列来讲。 前言 上一篇文章【大数据实践】游戏事件处理系统(2)——事件处理-logstash中,对日志的处理进行了讲解,其事件最终要输出到kafka集群中。因此,在本文章中,将介...
摘要:从上述构造函数可以看出,可以通过和两种形式传递配置信息,用于构造对象,配置信息均为对。获取内部的和,用于后续该产生的事务性消息。该方法的返回值为该被发送到的分区的元数据,如偏移量,创建时间等。 前言 在文章【大数据实践】游戏事件处理系统系列文章中中,我们已经做到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为...
摘要:从上述构造函数可以看出,可以通过和两种形式传递配置信息,用于构造对象,配置信息均为对。获取内部的和,用于后续该产生的事务性消息。该方法的返回值为该被发送到的分区的元数据,如偏移量,创建时间等。 前言 在文章【大数据实践】游戏事件处理系统系列文章中中,我们已经做到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为...
阅读 1698·2021-11-24 09:39
阅读 3360·2021-09-28 09:36
阅读 3118·2021-09-06 15:10
阅读 3274·2019-08-30 15:44
阅读 1047·2019-08-30 15:43
阅读 1663·2019-08-30 14:20
阅读 2540·2019-08-30 12:51
阅读 1902·2019-08-30 11:04