资讯专栏INFORMATION COLUMN

【大数据实践】Kafka生产者编程(3)——Interceptor & Partitione

learning / 2985人阅读

摘要:前言在上一篇文章大数据实践生产者编程发送流程中,对自定义和自定义做了简单介绍,没有做深入讲解。必须字段,表示消息内容。如果没有设置和,则会采用类似于轮询方式但不是严格轮询,而是类似于随机数。

前言

在上一篇文章【大数据实践】Kafka生产者编程(2)——producer发送流程中,对自定义Interceptor和自定义Partitioner做了简单介绍,没有做深入讲解。因此,在本文章中,尝试补充介绍Interceptor和Partitioner的一些理论知识,并介绍如何自定义者两个类。

Producer拦截器(interceptor)和拦截链 实现接口

拦截器(interceptor)可以让用户在消息记录发送之前,或者producer回调方法执行之前,对消息或者回调信息做一些逻辑处理。拦截器实现了以下接口:

package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor extends Configurable {
    ProducerRecord onSend(ProducerRecord var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}

onSend():onSend函数将会在消息记录被发送之前被调用,它可以对ProducerRecord做一些处理,返回处理之后的ProducerRecord

onAcknowledgement():onAcknowledgement方法将在send时指定的回调函数执行之前被调用,可对执行结果进行一些处理。

close():close方法将在执行producer.close()的时候被调用,可以释放资源等。

拦截链ProducerInterceptors

拦截链(ProducerInterceptors)包含了一个由多个拦截器组装起来的拦截器列表List> ,在producer发送消息,消息回应以及close时,拦截链的onSend、onAcknowledgement、close方法会被调用,而这些方法中,会逐一调用每个拦截器的onSend、onAcknowledgement、close方法。就像是生成流水线上,各个处理程序一样。

拦截链类所在位置:

package org.apache.kafka.clients.producer.internals;

public class ProducerInterceptors implements Closeable {}
自定义拦截器

自定义一个计数拦截器,如下:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor {
    public int sendCounter = 0;
    public int succCounter = 0;
    public int failCounter = 0;

    public void configure(Map configs) {

    }

    public ProducerRecord onSend(ProducerRecord record) {
        System.out.println("onSend called in CounterInterceptor, key = " + record.key());
        sendCounter++;
        return  record;
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception exception) {
        if (exception == null) {
            System.out.println("record send ok. topic = " + recordMetadata.topic() + "partion = " + recordMetadata.partition());
            succCounter++;
        } else {
            System.out.println("record send failed. topic = " + recordMetadata.topic() + "partion = " + recordMetadata.partition());
            failCounter++;
        }

    }

    public void close() {
        System.out.println("sendCounter = " + sendCounter + " succCounter = " + succCounter + " failCounter = " + failCounter);
    }

}

将拦截器装配到自定义的Producer中:

package myproducers; 

/**
 * kafka消息生产者——
 */

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;


public class GameRecordProducer {
    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;

    public GameRecordProducer() {}


    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myproducers.GameRecordProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        List intercepters = new ArrayList();
        intercepters.add("myproducers.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, intercepters);
        KafkaProducer producer;
        producer = new KafkaProducer(props);


        try {
            producer.send(new ProducerRecord("game-score","message1")).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
消息记录类ProducerRecord

消息记录类中,记录了需要发送的消息内容以及要发送到的主题、分区等内容。类的定义如下:

package org.apache.kafka.clients.producer;

public class ProducerRecord {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

topic:必须字段,表示该消息记录record发送到那个topic。

value:必须字段,表示消息内容。

partition:可选字段,要发送到哪个分区partition。

如果record中设置了partition,则发往该partition;

如果没有设置partition,但指定key值,则会根据key序列化之后的字节数组的hashcode进行取模运算,得到partition。

如果没有设置partition和key,则producer会采用迭代方式(类似于随机数)。

key:可选字段,消息记录的key,可用于计算选定partition。

timestamp:可选字段,时间戳;表示该条消息记录的创建时间createtime,如果不指定,则默认使用producer的当前时间。

headers:可选字段。

默认partition算法

kafka producer的partition制定策略为:

如果record中设置了partition,则发往该partition;

如果没有设置partition,但指定key值,则会根据key序列化之后的字节数组的hashcode进行取模运算,得到partition。

如果没有设置partition和key,则producer会采用类似于轮询方式(但不是严格轮询,而是类似于随机数)。

具体算法源代码如下:

package org.apache.kafka.clients.producer.internals;

import ...

public class DefaultPartitioner implements Partitioner {

    // ...
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 从集群中获取该topic分区列表及分区数量。
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
       
        if (keyBytes == null) {
            // 没有指定key值,及key值序列化之后为null,则获取下一个可用的partition值
            int nextValue = this.nextValue(topic);
            // 获取该topic可用的分区列表
            List availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                // 可用分区列表大于0时,
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                // toPositive:确保为正数,Math.abs(Integer.MIN_VALUE)为负数,所以不能用。
                // toPositive(Integer.MIN_VALUE) == 0 
                // toPositive(-1) == 2147483647 
                // 取余
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // 使用murmur2 hash算法,求得值,在取余
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    // 获取下一个值
    private int nextValue(String topic) {
        
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    ...
}
自定义Partitioner

除了使用默认的Partitioner之外,也可以使用自定义的Partitioner,已实现更好的分区均衡。

package myproducers;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class ConstantPartioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 固定永远返回1,即全部放在1分区
        return 1;
    }

    public void close() {

    }

    public void configure(Map configs) {
    }
}

在构建KafkaProducer对象时,在配置信息中,将自定义的Partitioner类配置进去:

kafkaProps.put("partitioner.class", "myproducer.ConstantPartitioner");
小结

本文章介绍了kafka producer中两个比较独立概念,在实际开发过程中,可作为我们程序的扩展点。后一篇文章将继续围绕KafkaProducer的配置细节进行分析,以了解Kafka发送过程中的更多的细节和机制。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/8398.html

相关文章

  • 大数据实Kafka产者编程3)——Interceptor & Partitione

    摘要:前言在上一篇文章大数据实践生产者编程发送流程中,对自定义和自定义做了简单介绍,没有做深入讲解。必须字段,表示消息内容。如果没有设置和,则会采用类似于轮询方式但不是严格轮询,而是类似于随机数。 前言 在上一篇文章【大数据实践】Kafka生产者编程(2)——producer发送流程中,对自定义Interceptor和自定义Partitioner做了简单介绍,没有做深入讲解。因此,在本文章中...

    worldligang 评论0 收藏0
  • 大数据实Kafka产者编程(5)——ProducerConfig详解(下)

    摘要:前言上一篇文章大数据实践生产者编程详解上中,对的相关配置项进行了部分介绍,在本文章中,将继续完成剩下配置项的介绍。在前面的文章大数据实践生产者编程中,对有详细讲解。重要性低类型默认值,表示不能被使用配置,用于事务的递交。 前言 上一篇文章【大数据实践】Kafka生产者编程(4)——ProducerConfig详解(上)中,对kafka producer的相关配置项进行了部分介绍,在本文...

    codecook 评论0 收藏0
  • 大数据实Kafka产者编程(5)——ProducerConfig详解(下)

    摘要:前言上一篇文章大数据实践生产者编程详解上中,对的相关配置项进行了部分介绍,在本文章中,将继续完成剩下配置项的介绍。在前面的文章大数据实践生产者编程中,对有详细讲解。重要性低类型默认值,表示不能被使用配置,用于事务的递交。 前言 上一篇文章【大数据实践】Kafka生产者编程(4)——ProducerConfig详解(上)中,对kafka producer的相关配置项进行了部分介绍,在本文...

    sarva 评论0 收藏0
  • 大数据实Kafka产者编程(2)——producer发送流程

    摘要:必须字段,表示消息内容。检查长度是否超过限制根据配置项和进行检查,超出任何一项就会抛出异常。过滤掉过期的,对于过期的,会通过通知发送失败。 前言 在上一篇文章【大数据实践】Kafka生产者编程(1)——KafkaProducer详解中,主要对KafkaProducer类中的函数进行了详细的解释,但仅针对其中的一些方法,对于producer背后的原理、机制,没有做深入讲解。因此,在本文章...

    tianlai 评论0 收藏0
  • 大数据实Kafka产者编程(1)——KafkaProducer详解

    摘要:从上述构造函数可以看出,可以通过和两种形式传递配置信息,用于构造对象,配置信息均为对。获取内部的和,用于后续该产生的事务性消息。该方法的返回值为该被发送到的分区的元数据,如偏移量,创建时间等。 前言 在文章【大数据实践】游戏事件处理系统系列文章中中,我们已经做到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为...

    yuanzhanghu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<