摘要:分区数查看列表查看详情可以描述分区数副本数副本副本等信息删除注意,只是删除在的元数据,日志数据仍需手动删除。查看消费组消费者这里有两个消费组的消费者查看消费详情可以查看到消费的订阅的,负责的,消费进度积压的消息。
下载kafka,自带 zookeeper。
搭建Zookeeper集群zookeeper 集群使用 Raft 选举模式,故至少要三个节点(生产中应部署在三个不同的服务器实例上,这里用于演示就不那么做了)。
# 复制三分节点配置 cp config/zookeeper.properties config/zookeeper.2181.properties cp config/zookeeper.properties config/zookeeper.2182.properties cp config/zookeeper.properties config/zookeeper.2183.properties
修改配置
config/zookeeper.2181.properties
# the directory where the snapshot is stored. dataDir=/tmp/zookeeper/2181 # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 tickTime=2000 initLimit=10 syncLimit=5 server.1=localhost:12888:13888 server.2=localhost:22888:23888 server.3=localhost:32888:33888
config/zookeeper.2182.properties 修改clientPort=2182 dataDir=/tmp/zookeeper/2182 其他一致
config/zookeeper.2183.properties 修改clientPort=2183 dataDir=/tmp/zookeeper/2183 其他一致
主要是修改服务端口clientPort和数据目录dataDir,其他参数表征如下:
tickTime=2000为zk的基本时间单元,毫秒
initLimit=10Leader-Follower初始通信时限(tickTime*10)
syncLimit=5Leader-Follower同步通信时限(tickTime*5)
server.实例集群标识=实例地址:数据通信端口:选举通信端口
为实例添加集群标识
echo 1 >> /tmp/zookeeper/2181/myid echo 2 >> /tmp/zookeeper/2182/myid echo 3 >> /tmp/zookeeper/2183/myid
启动集群服务
bin/zookeeper-server-start.sh config/zookeeper.2181.properties bin/zookeeper-server-start.sh config/zookeeper.2182.properties bin/zookeeper-server-start.sh config/zookeeper.2183.properties搭建Kafka集群
Kafka集群节点>=2时便可对外提供高可用服务
cp config/server.properties config/server.9092.properties cp config/server.properties config/server.9093.properties
修改节点标识、服务端口、数据目录和zk集群节点列表
vi config/server.9092.properties
broker.id=1 ... listeners=PLAINTEXT://:9092 ... log.dirs=/tmp/kafka-logs/1 ... zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
vi config/server.9093.properties
broker.id=2 ... listeners=PLAINTEXT://:9093 ... log.dirs=/tmp/kafka-logs/2 ... zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
启动集群
bin/kafka-server-start.sh config/server.9092.properties bin/kafka-server-start.sh config/server.9093.propertiesTopic管理 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 2 --partition 4 --topic topic_1
--replication-factor 2:副本集数量,不能大于 broker 节点数量,多了也没用,1个节点放>=2个副本挂了都完蛋。
--partition 4:分区数
bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --list topic_1 topic_2查看Topic详情
可以描述Topic分区数/副本数/副本Leader/副本ISR等信息:
bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --describe --topic topic_1 Topic:topic_1 PartitionCount:4 ReplicationFactor:2 Configs: Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: topic_1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2删除Topic
注意,只是删除Topic在zk的元数据,日志数据仍需手动删除。
bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --delete --topic topic_2 #Topic topic_2 is marked for deletion. #Note: This will have no impact if delete.topic.enable is not set to true. #再查看topic列表 bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --list #topic_1 #topic_2 - marked for deletion生产者
bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic topic_1 # 进入 cli 输入消息回车发送 # hello kafka [enter] # send message [enter]消费者
新模式,offset存储在borker
--new-consumer Use new consumer. This is the default.
--bootstrap-server
老消费模式,offset存储在zk
--zookeeper
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --from-beginning --topic topic_1
可以尝试创建多个不同消费组的消费者(这里的sh脚本创建的都是不同消费组的),订阅同一个topic来实现发布订阅模式。
查看消费组/消费者bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --list #这里有两个消费组的消费者 console-consumer-47566 console-consumer-50875查看消费详情
可以查看到消费的订阅的 topic,负责的 partition,消费进度 offset, 积压的消息LAG。
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --group console-consumer-47566 --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER console-consumer-47566 topic_1 0 2 2 0 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 1 3 3 0 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 2 2 3 1 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 3 0 3 3 consumer-1_/127.0.0.1
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/34002.html
摘要:对消息保存时根据进行归类,发送消息者成为消息接受者成为此外集群有多个实例组成,每个实例成为。无论是集群,还是和都依赖于来保证系统可用性集群保存一些信息。 1 简介 1.1 简介 Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JM...
摘要:它最初由公司开发,于年贡献给了基金会并成为顶级开源项目。使用作为其分布式协调框架,很好的将消息生产消息存储消息消费的过程结合在一起。目的是防止某一台挂了。一、简介 Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级...
摘要:下载系统同学可以使用安装系统同学可以从官网下载源码解压,也可以直接执行以下命令启动使用来维护集群信息,所以这里我们先要启动,与的相关联系跟结合后续再深入了解,毕竟不能一口吃成一个胖子。 看完上一篇,相信大家对消息系统以及Kafka的整体构成都有了初步了解,学习一个东西最好的办法,就是去使用它,今天就让我们一起窥探一下Kafka,并完成自己的处女作。 消息在Kafka中的历程 虽然我们掌...
摘要:相关环境软件版本用户统一为前置操作各主机间启动连接这一步至关重要如果没有配置成功,会影响到集群之间的连接本机添加三台主机对应其他两台机器重复上面的操作即可完成后,可以一下各台机子包括本机是否还需要密码安装软件下载 相关环境 软件 版本 Centos 3.10.0-862.el7.x86_64 jdk 1.8 zookeeper 3.4.10 kafka 1.1.0...
摘要:基本概念介绍可以简单理解为一个节点多个节点构成整个集群某种类型的消息的合集它是在物理上的分组多个会被分散地存储在不同的节点上单个的消息是保证有序的但整个的消息就不一定是有序的包含消息内容的指定大小的文件由文件和文件组成一个由多个文件组成文件 基本概念介绍 Broker 可以简单理解为一个 Kafka 节点, 多个 Broker 节点构成整个 Kafka 集群;Topic 某种类型的消息...
阅读 4315·2021-09-29 09:34
阅读 1199·2021-09-24 10:38
阅读 1283·2021-09-22 15:15
阅读 2790·2021-09-09 09:33
阅读 776·2019-08-30 11:08
阅读 519·2019-08-30 10:52
阅读 1116·2019-08-30 10:52
阅读 2188·2019-08-28 18:01