摘要:发送消息大小问题本文实验的版本为版本消息概述中的消息指的就是一条里面除了携带发送的数据之外还包含发往的发往的分区头信息数据数据时间戳生产消息过长在生产者发送消息的时候并不是上面所有的信息都算在发送的消息大小详情见下面代码上面的代码会将序列
Kafka发送消息大小问题
⚠️ 本文实验的Kafka版本为2.11版本.
消息概述kafka中的消息指的就是一条ProducerRecord,里面除了携带发送的数据之外,还包含:
topic 发往的Topic
partition 发往的分区
headers 头信息
key 数据
value 数据
timestamp-long 时间戳
Producer生产消息过长在生产者发送消息的时候,并不是上面所有的信息都算在发送的消息大小.详情见下面代码.
上面的代码会将value序列化成字节数组,参与序列化的有topic,headers,key. 用来验证value是否超出长度的是ensureValidRecordSize(serializedSize);方法.
ensureValidRecordSize从两个方面验证,一个是maxRequestSize(max.request.size),另一个是totalMemorySize(buffer.memory), 只有当value的长度同时小于时,消息才可以正常发送.
private void ensureValidRecordSize(int size) { if (size > this.maxRequestSize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the maximum request size you have configured with the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration."); if (size > this.totalMemorySize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration."); }
单条消息过长或产生如下错误.
这里有个注意的点,如果只是单纯的发送消息,没有用Callback进行监控或者用Future进行获得结果,在消息过长的情况下,不会主动发出提示,
使用Future接收结果Futuresend = kafkaProducer.send(new ProducerRecord<>("topic", "key", "value")); RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata);
Future类中get()方法, @throws ExecutionException 如果计算抛出异常,该方法将会抛出该异常.
/** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException;使用Callback进行监控
先看Kafka专门为回调写的接口.
// 英文注释省略,总的来说: 用于异步回调,当消息发送server已经被确认之后,就会调用该方法 // 该方法中的肯定有一个参数不为null,如果没有异常产生,则metadata有数据,如果有异常则相反 public void onCompletion(RecordMetadata metadata, Exception exception);
kafkaProducer.send(new ProducerRecord<>("topic", "key", "value"), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } } });日志Level=DEBUG
将日志的消息级别设置为DEBUG,也会给标准输出输出该警告信息.
Future和Callback总结通过上面两种比较,不难发现Future是Java并发标准库中,并不是专门为kafka而设计,需要显示捕获异常,而Callback接口是kafka提供标准回调措施,所以应尽可能采用后者.
服务端接收消息限制在生产者有一个限制消息的参数,而在服务端也有限制消息的参数,该参数就是
message.max.bytes,默认为1000012B (大约1MB),服务端可以接收不到1MB的数据.(在新客户端producer,消息总是经过分批group into batch的数据,详情见RecordBatch接口).
/** * A record batch is a container for records. In old versions of the record format (versions 0 and 1), * a batch consisted always of a single record if no compression was enabled, but could contain * many records otherwise. Newer versions (magic versions 2 and above) will generally contain many records * regardless of compression. * 在旧版本不开启消息压缩的情况下,一个batch只包含一条数据 * 在新版本中总是会包含多条消息,不会去考虑消息是否压缩 */ public interface RecordBatch extends Iterable设置Broker端接收消息大小{ ... }
修改broker端的可以接收的消息大小,需要在broker端server.properties文件中添加message.max.bytes=100000. 数值可以修改成自己想要的,单位是byte.
生产端消息大于broker会发生什么如果生产者设置的消息发送大小为1MB,而broker端设置的消息大小为512KB会发生什么?
答案就是broker会拒绝该消息,生产者会返回一个RecordTooLargeException. 该消息是不会被消费者消费.提示的信息为: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
消费者也会进行消息限制,这里介绍有关三个限制消费的参数
fetch.max.bytes 服务端消息合集(多条)能返回的大小
fetch.min.bytes 服务端最小返回消息的大小
fetch.max.wait.ms 最多等待时间
如果fetch.max.wait.ms设置的时间到达,即使可以返回的消息总大小没有满足fetch.min.bytes设置的值,也会进行返回.
fetch.max.bytes设置过小如果fetch.max.bytes设置过小会发生什么? 会是不满足条件的数据一条都不返回吗? 我们可以根据文档来查看一下.
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress.
英文的大意就是: fetch.max.bytes 表示服务端能返回消息的总大小. 消息是通过分批次返回给消费者. 如果在分区中的第一个消息批次大于这个值,那么该消息批次依然会返回给消费者,保证流程运行.
可以得出结论: 消费端的参数只会影响消息读取的大小.
实践fetch.max.bytes设置过小properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024); properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1); ... while (true) { ConsumerRecordsrecords = kafkaConsumer.poll(Duration.ofSeconds(Integer.MAX_VALUE)); System.out.println(records.count()); }
启动消费者,添加上面三个参数. 指定消息批次最小最大返回的大小以及允许抓取最长的等待时间. 最后将返回的消息总数输出到标准输出.
实验结果: 因为每次发送的消息都要大于1024B,所以消费者每个批次只能返回一条数据. 最终会输出1...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/8668.html
摘要:发送消息大小问题本文实验的版本为版本消息概述中的消息指的就是一条里面除了携带发送的数据之外还包含发往的发往的分区头信息数据数据时间戳生产消息过长在生产者发送消息的时候并不是上面所有的信息都算在发送的消息大小详情见下面代码上面的代码会将序列 Kafka发送消息大小问题 ⚠️ 本文实验的Kafka版本为2.11版本. 消息概述 kafka中的消息指的就是一条ProducerRecord,里...
摘要:设计与原理详解应用场景日志收集消息系统解耦生产者和消费者缓存消息。流式处理比如的发布对象是。在中,当前消息的是由来维护的。通常情况下,这被认为与常数时间等价,但这对磁盘操作来说是不对的。为了解决这个问题,设计了一个标准字节消息。 Kafka 设计与原理详解 [TOC] kafka 应用场景 日志收集 消息系统 解耦生产者和消费者、缓存消息。 用户活动跟踪: 就是我们在做的。 运营指标...
摘要:从上述构造函数可以看出,可以通过和两种形式传递配置信息,用于构造对象,配置信息均为对。获取内部的和,用于后续该产生的事务性消息。该方法的返回值为该被发送到的分区的元数据,如偏移量,创建时间等。 前言 在文章【大数据实践】游戏事件处理系统系列文章中中,我们已经做到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为...
摘要:从上述构造函数可以看出,可以通过和两种形式传递配置信息,用于构造对象,配置信息均为对。获取内部的和,用于后续该产生的事务性消息。该方法的返回值为该被发送到的分区的元数据,如偏移量,创建时间等。 前言 在文章【大数据实践】游戏事件处理系统系列文章中中,我们已经做到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为...
摘要:前言上一篇文章大数据实践生产者编程详解上中,对的相关配置项进行了部分介绍,在本文章中,将继续完成剩下配置项的介绍。在前面的文章大数据实践生产者编程中,对有详细讲解。重要性低类型默认值,表示不能被使用配置,用于事务的递交。 前言 上一篇文章【大数据实践】Kafka生产者编程(4)——ProducerConfig详解(上)中,对kafka producer的相关配置项进行了部分介绍,在本文...
阅读 835·2021-10-12 10:11
阅读 1434·2021-09-03 10:28
阅读 770·2019-08-30 15:53
阅读 2094·2019-08-30 14:15
阅读 2819·2019-08-30 14:09
阅读 1081·2019-08-29 17:24
阅读 881·2019-08-26 18:27
阅读 1180·2019-08-26 11:57