摘要:另外,也是目前性能最好的消息中间件。架构在集群中,一个节点就是一个,消息由来承载,可以存储在个或多个中。发布消息的应用为消费消息的应用为,多个可以促成共同消费一个中的消息。
一、前言 1、Kafka简介
Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等)
Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。
2、Kafka架构在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
概念/对象 | 简单说明 |
---|---|
Broker | Kafka节点 |
Topic | 主题,用来承载消息 |
Partition | 分区,用于主题分片存储 |
Producer | 生产者,向主题发布消息的应用 |
Consumer | 消费者,从主题订阅消息的应用 |
Consumer Group | 消费者组,由多个消费者组成 |
准备3台CentOS服务器,并配置好静态IP、主机名
服务器名 | IP | 说明 |
---|---|---|
kafka01 | 192.168.88.51 | Kafka节点1 |
kafka02 | 192.168.88.52 | Kafka节点2 |
kafka03 | 192.168.88.53 | Kafka节点3 |
软件版本说明
项 | 说明 |
---|---|
Linux Server | CentOS 7 |
Kafka | 2.3.0 |
Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息,这里我们部署三台ZK
服务器名 | IP | 说明 |
---|---|---|
zk01 | 192.168.88.21 | ZooKeeper节点 |
zk02 | 192.168.88.22 | ZooKeeper节点 |
zk03 | 192.168.88.23 | ZooKeeper节点 |
部署过程参考:https://ken.io/note/zookeeper...
二、部署过程 1、应用&数据目录#创建应用目录 mkdir /usr/kafka #创建Kafka数据目录 mkdir /kafka mkdir /kafka/logs chmod 777 -R /kafka2、下载&解压
Kafka官方下载地址:https://kafka.apache.org/down...
这次我下载的是2.3.0版本
#创建并进入下载目录 mkdir /home/downloads cd /home/downloads #下载安装包 wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz #解压到应用目录 tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
kafka_2.12-2.3.0.tgz 其中2.12是Scala编译器的版本,2.3.0才是Kafka的版本3、Kafka节点配置
#进入应用目录 cd /usr/kafka/kafka_2.12-2.3.0/ #修改配置文件 vi config/server.properties通用配置
配置日志目录、指定ZooKeeper服务器
# A comma separated list of directories under which to store log files log.dirs=/kafka/logs # root directory for all kafka znodes. zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181分节点配置
Kafka01
broker.id=0 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.51:9092
Kafka02
broker.id=1 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.52:9092
Kafka03
broker.id=2 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.53:90924、防火墙配置
#开放端口 firewall-cmd --add-port=9092/tcp --permanent #重新加载防火墙配置 firewall-cmd --reload5、启动Kafka
#进入kafka根目录 cd /usr/kafka/kafka_2.12-2.3.0/ #启动 /bin/kafka-server-start.sh config/server.properties & #启动成功输出示例(最后几行) [2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser) [2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser) [2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)三、Kafka测试 1、创建Topic
在kafka01(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区
bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io
Topic在kafka01上创建后也会同步到集群中另外两个Broker:kafka02、kafka03
2、查看Topic我们可以通过命令列出指定Broker的
bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:90923、发送消息
这里我们向Broker(id=0)的Topic=test-ken-io发送消息
bin/kafka-console-producer.sh --broker-list 192.168.88.51:9092 --topic test-ken-io #消息内容 > test by ken.io4、消费消息
在Kafka02上消费Broker03的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning
在Kafka03上消费Broker02的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning
然后均能收到消息
test by ken.io
这是因为这两个消费消息的命令是建立了两个不同的Consumer
如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,1个消息同时只会被一个Consumer消费到
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken四、备注 1、Kafka常用配置项说明
Kafka常用Broker配置说明:
配置项 | 默认值/示例值 | 说明 |
---|---|---|
broker.id | 0 | Broker唯一标识 |
listeners | PLAINTEXT://192.168.88.53:9092 | 监听信息,PLAINTEXT表示明文传输 |
log.dirs | kafka/logs | kafka数据存放地址,可以填写多个。用","间隔 |
message.max.bytes | message.max.bytes | 单个消息长度限制,单位是字节 |
num.partitions | 1 | 默认分区数 |
log.flush.interval.messages | Long.MaxValue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
log.flush.interval.ms | Long.MaxValue | 在数据被写入到硬盘前的最大时间 |
log.flush.scheduler.interval.ms | Long.MaxValue | 检查数据是否要写入到硬盘的时间间隔。 |
log.retention.hours | 24 | 控制一个log保留时间,单位:小时 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper服务器地址,多台用","间隔 |
https://kafka.apache.org/
https://zh.wikipedia.org/zh-c...
本文首发于我的独立博客:https://ken.io/note/kafka-cluster-deploy-guide
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/34011.html
摘要:项目地址前言大数据技术栈思维导图大数据常用软件安装指南一分布式文件存储系统分布式计算框架集群资源管理器单机伪集群环境搭建集群环境搭建常用命令的使用基于搭建高可用集群二简介及核心概念环境下的安装部署和命令行的基本使用常用操作分区表和分桶表视图 项目GitHub地址:https://github.com/heibaiying... 前 言 大数据技术栈思维导图 大数据常用软件安装指...
摘要:主题和分区的悄息通过主题进行分类。在给定的分区里,每个悄息的偏移量都是唯一的。消费者把每个分区最后读取的悄息偏移量保存在或上,如果悄费者关闭或重启,它的读取状态不会丢失。主题可以配置自己的保留策略,可以将悄息保留到不再使用它们为止。发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性。数据(消息)的发送...
摘要:主题和分区的悄息通过主题进行分类。在给定的分区里,每个悄息的偏移量都是唯一的。消费者把每个分区最后读取的悄息偏移量保存在或上,如果悄费者关闭或重启,它的读取状态不会丢失。主题可以配置自己的保留策略,可以将悄息保留到不再使用它们为止。发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性。数据(消息)的发送...
摘要:主题和分区的悄息通过主题进行分类。在给定的分区里,每个悄息的偏移量都是唯一的。消费者把每个分区最后读取的悄息偏移量保存在或上,如果悄费者关闭或重启,它的读取状态不会丢失。主题可以配置自己的保留策略,可以将悄息保留到不再使用它们为止。发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性。数据(消息)的发送...
摘要:前言在笔者最开始维护的日志服务中,日质量较小,没有接入。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在的基础上如何接入,并做到向前兼容。 1 前言 在笔者最开始维护的日志服务中,日质量较小,没有接入kafka。随着业务规模扩增,日质量不断增长,接入到日志服务的产品线不断增多,遇到流量高峰,写入到es的性能就会降低,cpu打满,随时都有集群宕机的风险。因此,接入消息队列,进行...
阅读 1758·2021-11-24 09:39
阅读 2545·2021-07-29 13:49
阅读 2109·2019-08-29 14:15
阅读 2100·2019-08-29 12:40
阅读 1744·2019-08-28 17:46
阅读 3195·2019-08-26 13:42
阅读 487·2019-08-26 12:13
阅读 1894·2019-08-26 11:41