摘要:序本文简单介绍下的的的实例。初始化准备消息发送同步发送异步发送默认使用
序
本文简单介绍下kafka0.8的client的producer的实例。
maven初始化org.apache.kafka kafka_2.10 0.8.2.2
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerAddr); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer准备消息producer = new KafkaProducer<>(props)
String dataKey = UUID.randomUUID().toString(); String dataValue = UUID.randomUUID().toString(); ProducerRecord发送 同步发送producerRecord = new ProducerRecord<>( topic, dataKey, dataValue );
producer.send(producerRecord).get();异步发送(默认)
producer.send(producerRecord);使用callback
producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ LOGGER.error("send msg to {},error:{}",metadata.topic(),exception); } } });doc
kafka08-documentation
Kafka Clients (At-Most-Once, At-Least-Once, Exactly-Once, and Avro Client)
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/33846.html
摘要:序这里简单展示一下如何使用的去消费一个。初始化客户端设置的的超时时间不设置则永远阻塞直到有新消息来并发消费取决于的值默认为是抛出异常而不是抛出注意事项消费者实例数每个实例的消费线程数的数量,否则多余的就浪费了。 序 这里简单展示一下如何使用kafka0.8的client去消费一个topic。 maven org.apache.kafka ...
摘要:序本文简单解析一下版本中的的异常处理。具体跟打交道的时候,请求失败网络链接失败或是返回异常,则会根据重试次数重新入队。 序 本文简单解析一下kafka0.8.2.2版本中的java producer的异常处理。 概况 showImg(https://segmentfault.com/img/bVV8Zg?w=808&h=255); kafka的java producer的发送是异步,主...
maven org.apache.kafka kafka_2.10 0.8.2.2 高级api实例 public class NativeConsumer { private static final String TOPIC = kafkatopic; public void exec() throws UnsupportedEncodingExc...
摘要:序本文主要研究下版本的与的的关系。如果增减,,会导致,后对应的会发生变化。比如减少一个,然后之后,对应的会进行重新调整映射。的是,针对该是线程的数量。因此实际分布式部署的时候,其实例个数每个的个数的值。 序 本文主要研究下kafka0.8版本api的topicCountMap与topic的partition的关系。 partition 物理上把topic分成一个或多个partition...
摘要:序本文主要讲解一下生产者的几个配置参数。设置此值是表示在响应请求之前不能再向同一个发送请求。注意设置此参数是为了避免消息乱序默认为。 序 本文主要讲解一下kafka生产者的几个配置参数。 参数及重要程度列表 static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, ...
阅读 1786·2021-11-22 09:34
阅读 2669·2021-11-19 09:40
阅读 1012·2021-11-16 11:45
阅读 721·2021-09-04 16:41
阅读 2949·2019-08-29 16:40
阅读 2584·2019-08-29 15:34
阅读 2539·2019-08-29 13:11
阅读 1636·2019-08-29 12:58