资讯专栏INFORMATION COLUMN

快速搭建ELK日志收集(kafka队列版)

妤锋シ / 609人阅读

摘要:快速搭建日志收集版本进行文章的第二次修改,包括了之前的简单方案的升级过程。分割线快速搭建日志收集第一版本新项目短时间来实现日志采集。

快速搭建elk日志收集 kafka版本

进行文章的第二次修改,包括了之前的简单方案的升级过程。

因为业务的不断更新升级,为了保证线上业务也能正常使用elk服务,并且使得elk的服务和线业务流解耦(即避免直接写入es的方式可能会带来的耗时影响)所以我们采用了下面最新的方案,也是常规方案

方案
业务层 >> kafka队列 >> logstash 消费 >> elasticsearch

业务层将日志写入到kafka队列,同事logstash可以开启多个线程,启用同一个group_id来对kafka进行消费,将读取到的日志进行解析后,写入到elasticsearch中,并且按照索引模板进行解析。

优点

业务层可以直接写入到kafka队列中,不用担心elasticsearch的写入效率问题。

缺点

比起之前的简单版本,需要保证kafka队列、logstash的高可用(虽然logstash挂掉后,可以重启后重新读取队列日志)

搭建

整个搭建过程,写入kafka是非常简单的,这里遇到的问题是logstash和elasticsearch索引模板带来的困扰。

logstash内置一套模板

elasticsearch 本身可以自定义一套模板

如何确定使用谁的模板呢?
网上找到的资料,建议采用将模板配置在elasticsearch侧,这样就不用每个logstash进行一个模板配置文件的维护。

Logstash 配置文件说明

官网的文档kafka的input插件
https://www.elastic.co/guide/...

input {
    kafka {
        // 需要读取的kafka队列集群配置
        bootstrap_servers => "xxx.xxx.xxx.xxx:9092"
        // 配置的消费者的group名称,因为同一个组内的消费消息不会重复
        group_id => "logstash-group"
        // 主题配置
        topics => "kibana_log"
        // 从未消费过的偏移量开始
        auto_offset_reset =>"earliest"
    }
}

// 重要,下面多带带讲
filter {
  json {
    source => "message"
  }
}

output {
    // stdout可以省略,这个是为了命令行模式下方便调试
    stdout{ codec => rubydebug }
    elasticsearch {
        // es 集群
        hosts=>"xxx.xxx.xxx.xxx:9200"
        // 重要:取消logstash自定义模板功能,进而强制使用es的内置模板
        manage_template=>false
        // 需要匹配的模板名称
        index=>"logstash-dev-%{+YYYY.MM.dd}"
    }
}

先解释下filter配置,当我们从kafka读取消息的时候,消息体是通过message字段来进行传递的,所以message是一个字符串,但是我们的es索引模板可能会非常复杂,所以我们需要对其进行json解析后,再交给es。否则es收到的之后一个message字段。

filter {
  json {
    source => "message"
  }
}

再说下模板配置,首先通过kibana的devtool向es中写入了一个模板,我区分了两套环境dev、prod。

Php 实现写入

这里字段都进行了strval转义,为什么呢?这和下面要讲的动态模板有关联的。往下看

$position = YnUtil::getPosition();

$urlData = parse_url(Wii::app()->request->url);
$path    = $urlData["path"] ?? "";

$params = [
    "category"   => strval($category),
    "appType"    => strval(YnUtil::getAppType()),
    "appVersion" => strval(YnUtil::getAppVersion()),
    "host"       => strval(Wii::app()->request->hostInfo),
    "uri"        => strval(Wii::app()->request->url),
    "uid"        => strval(Wii::app()->user->getUid()),
    "path"       => strval($path),
    "server"     => strval(gethostname()),
    "geoip"      => [
        "ip"       => strval(Yii::$app->request->userIP),
        "location" => [
            "lat" => floatval($position["latitude"]),
            "lon" => floatval($position["longitude"]),
        ],
    ],
    "userAgent"  => strval(Wii::app()->request->userAgent),
    "message"    => is_array($message) ? Json::encode($message) : strval($message),
    // "@timestamp" => intval(microtime(true) * 1000),
];
索引模板

下面的模板是写入到es里面的自定义模板,为了防止索引规则名称冲突,这里将order置为1。

我们先来看下第一个模板(这个是不推荐的,因为很繁琐,但是类型很强制有效)

PUT _template/logstash-dev
{
    "index_patterns": "logstash-dev*",
    "aliases": {},
    "order":1,
    "mappings": {
      // 这里使用logs是因为logstash默认的type类型
      "logs": {
        // 动态模板
        "dynamic_templates": [
          {
            "string_fields": {
              "match": "*",
              "match_mapping_type": "string",
              "mapping": {
                "fields": {
                  "keyword": {
                    "ignore_above": 256,
                    "type": "keyword"
                  }
                },
                "norms": false,
                "type": "text"
              }
            }
          }
        ],
        // 这里对属性进行了类型设置
        "properties": {
          "@timestamp": {
            "type": "date"
          },
          "@version": {
            "type": "keyword"
          },
          "appType": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "appVersion": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "category": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "geoip": {
            "dynamic": "true",
            "properties": {
              "ip": {
                "type": "ip"
              },
              "latitude": {
                "type": "half_float"
              },
              "location": {
                "type": "geo_point"
              },
              "longitude": {
                "type": "half_float"
              }
            }
          },
          "host": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "message": {
            "type": "text",
            "norms": false
          },
          "server": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "uid": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "uri": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "path": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "userAgent": {
            "type": "text",
            "norms": false,
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      }
    },
    "settings": {
      "index": {
        "number_of_shards": "1"
      }
    }
}
动态模板概念

上面的模板不是最优的,但是却是一种尝试,模板里面针对每个属性做了设置,这样客户端只需要写入对应的属性就好了。但是如何动态配置呢?如果想加入一个字段,难道还要修改模板么? 这里引入了动态模板的概念

Only the following datatypes can be automatically detected: boolean, date, double, long, object, string. It also accepts * to match all datatypes.

动态映射
https://www.elastic.co/guide/...

动态模板(注意看match和match_mapping_type)
https://www.elastic.co/guide/...

映射属性
https://www.elastic.co/guide/...

然后我们给出了一个全新的模板

全新索引模板

这个模板里面只针对特殊的属性进行了设置,其他的都是通过动态模板扩展的,下面看下效果。

PUT _template/logstash-dev
{
    "index_patterns": "logstash-dev*",
    "aliases": {},
    "order":1,
    "mappings": {
      "logs": {
        "dynamic_templates": [
          {
            "string_fields": {
              "match": "*",
              "match_mapping_type": "string",
              "mapping": {
                "fields": {
                  "keyword": {
                    "ignore_above": 256,
                    "type": "keyword"
                  }
                },
                "norms": false,
                "type": "text"
              }
            }
          }
        ],
        "properties": {
          "@timestamp": {
            "type": "date"
          },
          "@version": {
            "type": "keyword"
          },
          "geoip": {
            "dynamic": "true",
            "properties": {
              "ip": {
                "type": "ip"
              },
              "latitude": {
                "type": "half_float"
              },
              "location": {
                "type": "geo_point"
              },
              "longitude": {
                "type": "half_float"
              }
            }
          }
        }
      }
    },
    "settings": {
      "index": {
        "number_of_shards": "1"
      }
    }
}

我们上面说了,全都进行了strval转义,为什么呢?因为动态模板里面,匹配的是string类型,如果我们写入的是一个int类型,那么就不会进行自动扩展了。试验后表明,会生成一个int类型的message字段,这样是不合理的。最终生成的效果是如下图的。

分割线 =============================

快速搭建elk日志收集 第一版本

新项目短时间来实现日志采集。

资源

一台8G 4核 500G硬盘 服务器

部署方案

项目部署在4台服务器,每台服务器通过phpsdk直接写入一台es服务器中。
(在本次部署中,没有使用logstash的功能)

缺点

没有使用异步队列,导致直接写入es可能会影响业务逻辑,但是目前只会在开发和测试环境使用。

组件

主要利用elasticsearch 和 kibana
要使用x-pack做安全校验,包括给kibana加入登录授权功能

Elasticsearch
https://www.elastic.co/cn/pro...

Elasticsearch-clients
这里包含的多种语言的sdk包
https://www.elastic.co/guide/...

Kibana
https://www.elastic.co/cn/pro...

Logstash
https://www.elastic.co/cn/pro...

X-pack
安装流程说明很详细,秘钥生成后记得保存下,并且加入x-pack后,kibana和elasticsearch的通讯,需要修改配置文件。另外phpsdk也需要加入秘钥,后面说明。
https://www.elastic.co/cn/pro...

配置 创建索引

es的模板超级复杂的,所以我们要利用标准的现有的模板,需要从logstash中提取一个。
解压下载的logstash-6.1.2.tar,执行搜索命令

$ find ./  -name "elasticsearch-template*"
./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es2x.json
./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es5x.json
./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es6x.json

会发现有2x 5x 6x三个模板,这里我们选择6x,要根据你的es版本来选择。
然后创建索引,索引要在数据写入之前创建,因为要给每个字段设置类型。

创建索引模板

https://www.elastic.co/guide/...

按照文档的方式,将获取到的6x通过curl的方式写入到es

Es开启外网访问

因为默认的es配置是开启了localhost:9200端口,用于执行RESTFUL,但是本次我们采用php-sdk的方式,直接写入es,就要求每台业务服务器,都能访问到es。

# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
# 修改此处的host配置为0.0.0.0,这样所有的请求都可以接入进来
network.host: 0.0.0.0
#
# Set a custom port for HTTP:
#
#http.port: 9200
使用client sdk

下载地址
https://www.elastic.co/guide/...

try {
    $hosts = [
        // 这个地方要填写x-pack分配的密码和用户名
        "http://{用户名}:{密码}@192.168.1.11:9200",       // HTTP Basic Authentication
        // "http://user2:pass2@other-host.com:9200" // Different credentials on different host
    ];

    $client = ClientBuilder::create()->setHosts($hosts)->build();

    $position = YnUtil::getPosition();

    $params = [
        "index" => "logstash-yn-" . date("Ymd"),
        // elastic6版本有个bug,每个索引只能有一个type类型
        "type"  => "xxxx",
        "id"    => md5(rand(0, 999999999)) . (microtime(true) * 1000),
        "body"  => [
            // 索引创建好后,写入数据一定要注意类型,不然会报错,我这里都会进行格式化一遍
            // 类别作为一个主要字段用于区分日志
            "category"   => strval($category),
            "appType"    => strval(YnUtil::getAppType()),
            "appVersion" => strval(YnUtil::getAppVersion()),
            "host"       => strval($hostInfo),
            "uri"        => strval($url),
            "uid"        => strval($user->getUid()),
            "server"     => strval(gethostname()),
            "geoip"      => [
                "ip"       => strval($ip),
                // 这个很重要,可以实现geo可视化图形
                "location" => [
                    "lat" => floatval($position["latitude"]),
                    "lon" => floatval($position["longitude"]),
                ],
            ],
            "userAgent"  => strval($userAgent),
            "message"    => Json::encode($message),
            // 这里一定要写入毫秒时间戳
            "@timestamp" => intval(microtime(true) * 1000),
        ],
    ];

    $client->index($params);

} catch (Exception $e) {

}

安装好x-pack后,sdk也要配置用户名和密码

日志的索引,按照日期来构建名称

elastic6版本,每个索引只能有一个type类型,这是一个bug

geoip是logstash的模板中定义的字段,可以实现geo可视化(稍后解释模板)

@timestamp 这里一定要毫秒时间戳

要捕获报错日志,不要影响业务逻辑

模板说明

官方说明
https://www.elastic.co/guide/...

我们这里以6x作为模板

{
    // 将这个模板应用于所有以 logstash- 为起始的索引。
    "template": "logstash-*",
    "version": 60001,
    "settings": {
        "index.refresh_interval": "5s"
    },
    "mappings": {
        "_default_": {
            //  动态模板说明,很重要,配置了动态模板后,我们可以添加任意字段
            // https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html
            "dynamic_templates": [{
                // 信息字段 官方说这里可以自定义 The template name can be any string value.
                "message_field": {
                    "path_match": "message",
                    "match_mapping_type": "string",
                    "mapping": {
                        "type": "text",
                        "norms": false
                    }
                }
            }, {
                // 字符串字段说明
                "string_fields": {
                    // 匹配所有
                    "match": "*",
                    // 并且字段类型是string的
                    "match_mapping_type": "string",
                    // 
                    "mapping": {
                        "type": "text",
                        // 这里应该是和受欢迎程度评分相关
                        "norms": false,
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    }
                }
            }],
            // 定义好的属性说明
            "properties": {
                // 时间字段,这个很重要,不然kibana不会出现时间相关的查询控件
                "@timestamp": {
                    "type": "date"
                },
                "@version": {
                    "type": "keyword"
                },
                // 这个可以只写入properies里面的任意一个字段
                "geoip": {
                    "dynamic": true,
                    "properties": {
                        "ip": {
                            "type": "ip"
                        },
                        // 我只是用了这个location
                        "location": {
                            "type": "geo_point"
                        },
                        "latitude": {
                            "type": "half_float"
                        },
                        "longitude": {
                            "type": "half_float"
                        }
                    }
                }
            }
        }
    }
}
遇到的问题 索引创建太复杂怎么办?

使用logstash内置的模板

添加了时间字段后,创建数据不显示?

检查是否是毫秒时间戳

安装x-pack后,clientsdk向es写入数据报错,提示校验失败?

需要在链接中添加用户名和密码

类型和模板不匹配导致错误?

写入类型,一定要和索引模板中定义的一致,不然肯定报错!

同一个索引,添加多个type报错?

elasticsearch6的 bug,官方承诺在7进行修复

kibana加入登录验证?

安装x-pack吧

如何保证高可用,无人值守?

使用supervisor

如何清理elasticsearch老数据?

我的方案是:定时脚本来清理7天之前的索引DELETE logstash-xxxx

如何快速执行curl查询?

在kibana中有一个Dev Tools 可以执行curl,并且看到结果

如何格式化kibana的日期格式?

在kibana菜单的Management->Index Patterns中可以管理

结束

不详细的地方,可以留言

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/28196.html

相关文章

  • 快速搭建ELK日志收集kafka队列

    摘要:快速搭建日志收集版本进行文章的第二次修改,包括了之前的简单方案的升级过程。分割线快速搭建日志收集第一版本新项目短时间来实现日志采集。 快速搭建elk日志收集 kafka版本 进行文章的第二次修改,包括了之前的简单方案的升级过程。 因为业务的不断更新升级,为了保证线上业务也能正常使用elk服务,并且使得elk的服务和线业务流解耦(即避免直接写入es的方式可能会带来的耗时影响)所以我们采用...

    ingood 评论0 收藏0
  • 如何建设高吞吐量的日志平台

    摘要:对于七牛云的实践来说,你也可以把容器内的日志投递出去。七牛云日志平台面临的挑战七牛云是如何解决日志平台吞吐量的挑战的呢首先,的集群通过扩展,是能够承接百级别的数据的那么问题的瓶颈就变成了如何将百量级的数据从中写入到上。 showImg(https://segmentfault.com/img/remote/1460000015879375?w=640&h=235); 上图列举了一些日...

    AlexTuan 评论0 收藏0
  • ELK结合logback搭建日志中心

    摘要:日志监控和分析在保障业务稳定运行时,起到了很重要的作用。本文搭建的的是一个分布式的日志收集和分析系统。对于队列上的这些未处理的日志,有不同的几台进行接收和分析。再由统一的进行日志界面的展示。如等配置文件可以配置,等日志报表可视化熟练 ELK简介ELKStack即Elasticsearch + Logstash + Kibana。日志监控和分析在保障业务稳定运行时,起到了很重要的作用。比...

    tracy 评论0 收藏0
  • ELK结合logback搭建日志中心

    摘要:日志监控和分析在保障业务稳定运行时,起到了很重要的作用。本文搭建的的是一个分布式的日志收集和分析系统。对于队列上的这些未处理的日志,有不同的几台进行接收和分析。再由统一的进行日志界面的展示。如等配置文件可以配置,等日志报表可视化熟练 ELK简介ELKStack即Elasticsearch + Logstash + Kibana。日志监控和分析在保障业务稳定运行时,起到了很重要的作用。比...

    Brenner 评论0 收藏0
  • TOP100summit:【分享实录】爆炸式增长的斗鱼架构平台的演进

    摘要:吴瑞诚斗鱼数据平台部总监曾先后就职于淘宝一号店。目前负责斗鱼实时离线数据处理个性推荐系统和搜索引擎。斗鱼统一日志监控系统有了配置中心,可以以此实现服务的开关和降级。 本篇文章内容来自2016年TOP100summit斗鱼数据平台部总监吴瑞城的案例分享。编辑:Cynthia2017年11月9-12日北京国家会议中心第六届TOP100summit,留言评论有机会获得免费体验票。 吴瑞诚:斗...

    baukh789 评论0 收藏0

发表评论

0条评论

妤锋シ

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<