资讯专栏INFORMATION COLUMN

大数据系列——kafka学习笔记

MAX_zuo / 3069人阅读

摘要:当某一台故障失效时,生产者和消费者转而使用其它的机器整体健壮性的组件一个消息队列需要哪些部分生产消费消息类别存储等等主题处理的消息的不同分类消息代理集群中的一个服务节点称为一个,主要存储消息数据存在硬盘中。

1. 大数据领域数据类型 1.1 有界数据

一般批处理(一个文件 或者一批文件),不管文件多大,都是可以度量

mapreduce hive sparkcore sparksql

1.2 无界数据

源源不断的流水一样 (流数据)

Storm SparkStreaming

2. 消息队列(Message Queue)

消息 Message

网络中的两台计算机或者两个通讯设备之间传递的数据,例如说:文本、音乐、视频等内容

队列 Queue

一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部移除元素和在尾部追加元素。入队、出队。

消息队列 MQ

消息+队列

保存消息的队列

消息的传输过程中的容器

主要提供生产、消费接口供外部调用做数据的存储和获取

3. 消息队列的分类 3.1 点对点(P2P)

一个生产者生产的消息只能被一个消费者消费

3.2 发布订阅(Pub/Sub)

消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)

消息的发布者

消息的订阅者

每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。

4. Kafka的简介

在大数据领域呢,为了满足日益增长的数据量,也有一款可以满足百万级别消息的生成和消费,分布式、持久稳定的产品——Kafka

Kafka是分布式的发布—订阅消息系统(基于PS的一个消息队列)

它最初由LinkedIn(领英)公司发布,使用Scala语言编写

Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统

它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据

5. Kafka的特点

高吞吐量

可以满足每秒百万级别消息的生产和消费(生产消费 )

持久性

有一套完善的消息存储机制,确保数据的高效安全的持久化 (数据的存储)

分布式

基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体健壮性

6. Kafka的组件

一个消息队列需要哪些部分?

生产

消费

消息类别

存储等等

Topic(主题)

Kafka处理的消息的不同分类

Broker (消息代理)

Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据,存在硬盘中。每个topic都是有分区的

Partition (物理上的分区)

一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定

Message (消息)

消息,是通信的基本单位,每个消息都属于一个partition

7. Kafka的服务

Producer : 消息和数据的生产者,向Kafka的一个topic发布消息

Consumer :消息和数据的消费者,定于topic并处理其发布的消息

Zookeeper :协调kafka的正常运行

8. Kafka的安装 8.1 单机版的安装

准备kafka

kafka_2.10-0.10.0.1.tgz

解压kafka

tar -zxvf kafka_2.10-0.10.0.1.tgz -C /opt/

重命名

mv kafka_2.10-0.10.0.1.tgz kafka

配置环境变量

export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin

编辑server.properties

broker.id=1
log.dirs=/opt/kafka/logs
zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
listeners=PLAINTEXT://:9092          

启动kafka-server服务

kafka-server-start.sh [-daemon] server.properties

停止kafka服务

 kafka-server-stop.sh

8.2 集群的安装

只需要在每个机器上修改对应的 ==broker.id=1== 即可

9. Kafka中Topic的操作

创建topic

kafka-topics.sh  --create --topic t1 --partitions 3 --replication-factor 1  --zookeeper uplooking03:2181,uplooking04:2181

==注意: 创建topic过程的问题,replication-factor个数不能超过brokerserver的个数==

查看topic

kafka-topics.sh  --list --zookeeper uplooking03

查看具体topic的详情

kafka-topics.sh  --describe --topic t1 --zookeeper uplooking04:2181
PartitionCount:topic对应的partition的个数
ReplicationFactor:topic对应的副本因子,说白就是副本个数
Partition:partition编号,从0开始递增
Leader:当前partition起作用的breaker.id
Replicas: 当前副本数据存在的breaker.id,是一个列表,排在最前面的其作用
Isr:当前kakfa集群中可用的breaker.id列表    

修改topic(不能修改replication-factor,以及只能对partition个数进行增加,不能减少 )

kafka-topics.sh --alter --topic t1 --partitions 4 --zookeeper uplooking03

删除Topic

kafka-topics.sh --delete --topic t1 --zookeeper uplooking03

ps:这种删除只是标记删除,要想彻底删除必须设置一个属性,在server.properties中配置delete.topic.enable=true,否则只是标记删除

配置完成之后,需要重启kafka服务

10. Kafka中的生产者和消费者接口

自己写代码实现kafka提供的消息生产和消费的接口

kafka自身也实现了自身的生产和消费的接口,给出了两个工具(kafka-console-producer.sh , kafka-console-consumer.sh)

11. Kafka自带的生产和消费消息的工具 11.1 kafka-console-producer.sh(生产工具)
kafka-console-producer.sh --topic t1  --broker-list uplooking03:9092,uploo
king04:9092,uplooking05:9092
11.2 kafka-console-consumer.sh(消费工具)
kafka-console-consumer.sh  --zookeeper uplooking03 --topic t1
--from-beginning:从头开始消费
--blacklist:黑名单过滤(kafka-console-consumer.sh  --zookeeper uplooking03   --blacklist t1,t3)
--whitelist:白名单过滤(kafka-console-consumer.sh  --zookeeper uplooking03   --whitelist t2)    

ps:--topic|--blacklist|--whitelist 只能出现其中一个
12. ==Flume与Kafka的整合==

配置flume的agent配置文件

touch flume-kafka.properties

# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,类型是netcat网络
a1.sources.r1.type = netcat
# source监听的网络ip地址和端口号
a1.sources.r1.bind = uplooking01
a1.sources.r1.port = 44444



# 用于描述sink,类型是kafka

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hadoop
a1.sinks.k1.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 2


# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100


# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume开始采集数据

[root@uplooking01:/opt/flume/conf]
    flume-ng agent --name a1 --conf-file flume-kafka.properties

开启Kafka消息消费工具

[root@uplooking03:/opt/flume/conf]
    kafka-console-consumer.sh  --zookeeper uplooking03 --topic hadoop

给flume监听的Source发送数据

[root@uplooking03:/]
    nc uplooking01 44444

现在就可以到kafka的消费工具(kafka-console-consumer.sh)中区查看nc发送的数据

13. Kafka的API操作(生产者和消费者)

  org.apache.kafka
  kafka_2.10
  0.10.0.1
13.1 Kafka的生产者

创建生产者的配置文件 producer.properties

bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

创建生产者并且发送数据到topic中

public class MyKafkaProducer {
    public static void main(String[] args) throws IOException {
        Properties prop = new Properties();
        prop.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
        KafkaProducer kafkaProducer = new KafkaProducer(prop);
        kafkaProducer.send(new ProducerRecord("hadoop", "name", "admin123"));
        kafkaProducer.close();
    }
}

13.2 Kafka的消费者

创建消费者的配置文件consumer.properties

zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
group.id=test-consumer-group
bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

创建消息消费者消费topic中的数据

public static void main(String[] args) throws Exception {
    Properties prop = new Properties();
    prop.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
    KafkaConsumer kafkaConsumer = new KafkaConsumer(prop);
    Collection topics = new ArrayList();
    topics.add("hadoop");
    kafkaConsumer.subscribe(topics);
    while (true) {
        ConsumerRecords records = kafkaConsumer.poll(1000);
        for (ConsumerRecord record : records) {
            System.out.println(record.value());
        }
    }
}

自定义分区(MyCustomPartition)

package com.uplooking.bigdata.kafka.partition;
public class MyCustomPartition implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)  {
//获取分区数,    分区编号一般都是从0开始
int partitionSize = cluster.partitionCountForTopic(topic);
int keyHash = Math.abs(key.hashCode());
int valueHash = Math.abs(value.hashCode());
return keyHash % partitionSize;
}
public void close() {
}
public void configure(Map configs) {
}
}

配置自定义分区(producer.properties)

partitioner.class=com.uplooking.bigdata.kafka.partition.MyCustomPartition

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/34014.html

相关文章

  • ApacheCN 编程/数据/数据科学/人工智能学习资源 2019.6

    摘要:请回复这个帖子并注明组织个人信息来申请加入。权限分配灵活,能者居之。数量超过个,在所有组织中排名前。网站日超过,排名的峰值为。导航归档社区自媒体平台微博知乎专栏公众号博客园简书合作侵权,请联系请抄送一份到赞助我们 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=...

    Bmob 评论0 收藏0
  • ApacheCN 编程/数据/数据科学/人工智能学习资源 2019.5

    摘要:请回复这个帖子并注明组织个人信息来申请加入。版笔记等到中文字幕翻译完毕后再整理。数量超过个,在所有组织中排名前。网站日超过,排名的峰值为。主页归档社区自媒体平台微博知乎专栏公众号博客园简书合作侵权,请联系请抄送一份到赞助我们 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1...

    zhonghanwen 评论0 收藏0
  • ApacheCN 编程/数据/数据科学/人工智能学习资源 2019.4

    摘要:我们是一个大型开源社区,旗下群共余人,数量超过个,网站日超过,拥有博客专家和简书程序员优秀作者认证。我们组织公益性的翻译活动学习活动和比赛组队活动,并和等国内著名开源组织保持良好的合作关系。 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=200); 我们是一个...

    tomorrowwu 评论0 收藏0
  • ApacheCN 编程/数据/数据科学/人工智能学习资源 2019.4

    摘要:我们是一个大型开源社区,旗下群共余人,数量超过个,网站日超过,拥有博客专家和简书程序员优秀作者认证。我们组织公益性的翻译活动学习活动和比赛组队活动,并和等国内著名开源组织保持良好的合作关系。 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=200); 我们是一个...

    ky0ncheng 评论0 收藏0
  • Kafka学习笔记

    摘要:学习笔记使用一个叫的文学家的名字用来命名的。引入,正式升级为分布式流处理平台。主要还是针对组成员数量减少的情况。当所有成员都退出组后,消费者组状态变更为。自动定期删除过期位移的条件就是,组要处于状态。减少下游系统一次性消费的消息总数。 Kafka 学习笔记 Kafka使用一个叫Franz Kafka的文学家的名字用来命名的。 Kafka是一款开源的消息引擎系统。也是一个分布式流处理平台...

    aikin 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<