摘要:前言在笔者最开始维护的日志服务中,日质量较小,没有接入。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在的基础上如何接入,并做到向前兼容。
1 前言
在笔者最开始维护的日志服务中,日质量较小,没有接入kafka。随着业务规模扩增,日质量不断增长,接入到日志服务的产品线不断增多,遇到流量高峰,写入到es的性能就会降低,cpu打满,随时都有集群宕机的风险。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在EFK的基础上如何接入kafka,并做到向前兼容。
2 主要内容如何搭建kafka集群
原有EFK升级
3 搭建kafka集群 3.1 搭建zookeeper集群主要参考文章:【zookeeper安装指南】
由于是要线上搭建集群,为避免单点故障,就需要部署至少3个节点(取决于多数选举机制)。
进入要下载的版本的目录,选择.tar.gz文件下载
3.1.2 安装使用tar解压要安装的目录即可,以3.4.5版本为例
这里以解压到/home/work/common,实际安装根据自己的想安装的目录修改(注意如果修改,那后边的命令和配置文件中的路径都要相应修改)
tar -zxf zookeeper-3.4.5.tar.gz -C /home/work/common3.1.3 配置
在主目录下创建data和logs两个目录用于存储数据和日志:
cd /home/work/zookeeper-3.4.5 mkdir data mkdir logs
在conf目录下新建zoo.cfg文件,写入如下配置:
tickTime=2000 dataDir=/home/work/common/zookeeper1/data dataLogDir=/home/work/common/zookeeper1/logs clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.220.128:2888:3888 server.2=192.168.222.128:2888:3888 server.3=192.168.223.128:2888:3888
在zookeeper1的data/myid配置如下:
echo "1" > data/myid
zookeeper2的data/myid配置如下:
echo "2" > data/myid
zookeeper2的data/myid配置如下:
echo "3" > data/myid3.1.4 启停
进入bin目录,启动、停止、重启分和查看当前节点状态(包括集群中是何角色)别执行:
./zkServer.sh start ./zkServer.sh stop ./zkServer.sh restart ./zkServer.sh status
zookeeper集群搭建完成之后,根据实际情况开始部署kafka。以部署2个broker为例。
3.2 搭建kafka broker集群 3.2.1 安装下载并解压包:
curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
tar zxvf kafka_2.10-0.9.0.0.tgz3.2.2 配置
进入kafka安装工程根目录编辑config/server.properties
#不同的broker对应的id不能重复 broker.id=1 delete.topic.enable=true inter.broker.protocol.version=0.10.0.1 log.message.format.version=0.10.0.1 listeners=PLAINTEXT://:9092,SSL://:9093 auto.create.topics.enable=false ssl.key.password=test ssl.keystore.location=/home/work/certificate/server-keystore.jks ssl.keystore.password=test ssl.truststore.location=/home/work/certificate/server-truststore.jks ssl.truststore.password=test num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/work/data/kafka/log num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=72 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.220.128:2181,192.168.222.128:2181,192.168.223.128:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=03.2.3 启动kafka
进入kafka的主目录
nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &3.2.4 连通性测试
首先创建一个topic:topic_1
sh bin/kafka-topics.sh --create --topic topic_1 --partitions 2 --replication-factor 2 --zookeeper 192.168.220.128:2181
可以先检查一下是否创建成功:
sh bin/kafka-topics.sh --list --zookeeper 192.168.220.128:2181
起两个终端,一个作为producer,一个作为consumer
生产消息:
bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.220.128:9092,192.168.223.128:9092
消费消息:
sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092,192.168.223.128:9092 --topic topic_1
好了,上面的调通了,万里长征第一步就走完了。
4 EFK接入kafka向前兼容 4.1 准备证书在之前的EFK中是通过证书进行安全加固的,所以要先为接入kafka准备一下相关的证书。要确保给kafka生成的证书和给efk生成的证书是同一个根证书。关于证书的生成,笔者会写文章专门介绍。主要包括:
服务端证书
client证书
那么作为kafka的输入(filebeat)和输出(logstash),都需要kafka的client证书,kafka的broker需要的是服务端证书。
需要注意的是,filebeat配置的是pem证书,kafka和logstash的kafka-input插件用的是jks证书~~~因此,证书生成工具最好需要能够同时生成这两种证书。
在fields中添加log_topic字段,指定写入的topic
fields: module: sonofelice type: debug log_topic: topic_1 language: java4.2.2 filebeat.yml文件
output.kafka: hosts: ["192.168.220.128:9093","192.168.223.128:9093"] topic: "%{[fields.log_topic]}" partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000 ssl.certificate_authorities: ["/home/work/filebeat/keys/root-ca.pem"] ssl.certificate: "/home/work/filebeat/keys/kafka.crt.pem" ssl.key: "/home/work/filebeat/keys/kafka.key.pem"4.3 logstash升级
input { kafka { bootstrap_servers => "10.100.27.199:9093,10.64.56.75:9093" group_id => "consumer-group-01" topics => ["topic_1"] consumer_threads => 5 decorate_events => false auto_offset_reset => "earliest" security_protocol => "SSL" ssl_keystore_password => "test" ssl_keystore_location => "/home/work/certificate/kafka-keystore.jks" ssl_keystore_password => "test" ssl_truststore_password => "test" ssl_truststore_location => "/home/work/cvca/certificate/truststore.jks" codec => json { charset => "UTF-8" } } }
那为了向前兼容之前的filebeat日志收集,我们在input中同时保留beats配置,最终配置如下:
input { kafka { bootstrap_servers => "192.168.220.128:9093,192.168.223.128:9093" group_id => "consumer-group-01" topics => ["topic_1"] consumer_threads => 5 decorate_events => false auto_offset_reset => "earliest" security_protocol => "SSL" ssl_keystore_password => "test" ssl_keystore_location => "/home/work/certificate/kafka-keystore.jks" ssl_keystore_password => "test" ssl_truststore_password => "test" ssl_truststore_location => "/home/work/cvca/certificate/truststore.jks" codec => json { charset => "UTF-8" } } beats { port => 5044 client_inactivity_timeout => 600 ssl => true ssl_certificate_authorities => ["/home/work/certificate/chain-ca.pem"] ssl_certificate => "/home/work/certificate/server.crt.pem" ssl_key => "/home/work/certificate/server.key.pem" ssl_verify_mode => "force_peer" } }
需要特别注意的是,对于kafka的input来说,codec并不是默认为json的,导致之前用beats能成功解析到es的字段都无法解析成功,所以务必加上codec的配置。
至此,改造升级的点应该没有太大的坑了,也能够向前兼容,接入端自行切换即可。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/34441.html
摘要:前言在笔者最开始维护的日志服务中,日质量较小,没有接入。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在的基础上如何接入,并做到向前兼容。 1 前言 在笔者最开始维护的日志服务中,日质量较小,没有接入kafka。随着业务规模扩增,日质量不断增长,接入到日志服务的产品线不断增多,遇到流量高峰,写入到es的性能就会降低,cpu打满,随时都有集群宕机的风险。因此,接入消息队列,进行...
摘要:安全保证独有的安全加固体系,提供业务操作可回溯,消息存储加密及租户间有效隔离等有效安全措施。内建消息冗余存储,保证消息存储的可靠性,有效避免服务节点故障。最终一致性用于两个系统的状态最终保持一致,或都成功或都失败。一分钟玩转,就是这么简单 说起Kafka,许多使用者对它是又爱又恨。Kafka是一种分布式的、基于发布/订阅的消息系统,其极致体验让人欲罢不能,但操心的运维、复杂的安全策略、...
摘要:在全面兼容Apache Kafka生态的基础上,消息队列Kafka彻底解决ApacheKafka稳定性不足的长期痛点,并且支持消息无缝迁移到云上。 近日,阿里云宣布正式推出消息队列Kafka,全面融合开源生态。在全面兼容Apache Kafka生态的基础上,消息队列Kafka还具备了超易用,超高可用可靠性,扩缩容不操心,全方位安全诊断,数据安全有保障的特点。可用行达99.9%,数据可靠行99...
摘要:是腾讯云内部自研基于的高可靠强一致可扩展分布式消息队列,在腾讯内部包括微信手机业务红包腾讯话费充值广告订单等都有广泛使用。目前已上线腾讯云对外开放,本文对核心技术原理进行分享介绍。 极牛技术实践分享活动 极牛技术实践分享系列活动是极牛联合顶级VC、技术专家,为企业、技术人提供的一种系统的线上技术分享活动。 每期不同的技术主题,和行业专家深度探讨,专注解决技术实践难点,推动技术创新,...
摘要:是腾讯云内部自研基于的高可靠强一致可扩展分布式消息队列,在腾讯内部包括微信手机业务红包腾讯话费充值广告订单等都有广泛使用。目前已上线腾讯云对外开放,本文对核心技术原理进行分享介绍。 极牛技术实践分享活动 极牛技术实践分享系列活动是极牛联合顶级VC、技术专家,为企业、技术人提供的一种系统的线上技术分享活动。 每期不同的技术主题,和行业专家深度探讨,专注解决技术实践难点,推动技术创新,...
阅读 940·2021-11-24 09:38
阅读 3427·2021-11-22 15:32
阅读 3831·2021-10-12 10:12
阅读 3238·2019-08-30 15:54
阅读 2426·2019-08-30 15:53
阅读 1352·2019-08-30 15:52
阅读 1813·2019-08-30 13:15
阅读 1697·2019-08-29 12:21