资讯专栏INFORMATION COLUMN

kafka0.8生产者实例

leiyi / 1667人阅读

摘要:序本文简单介绍下的的的实例。初始化准备消息发送同步发送异步发送默认使用

本文简单介绍下kafka0.8的client的producer的实例。

maven
        
            org.apache.kafka
            kafka_2.10
            0.8.2.2
        
初始化
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerAddr);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(props)
准备消息
                String dataKey = UUID.randomUUID().toString();
                String dataValue = UUID.randomUUID().toString();

                ProducerRecord producerRecord = new ProducerRecord<>(
                        topic,
                        dataKey,
                        dataValue
                );
发送 同步发送
producer.send(producerRecord).get();
异步发送(默认)
producer.send(producerRecord);
使用callback
producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if(exception != null){
                            LOGGER.error("send msg to {},error:{}",metadata.topic(),exception);
                        }
                    }
                });
doc

kafka08-documentation

Kafka Clients (At-Most-Once, At-Least-Once, Exactly-Once, and Avro Client)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/33846.html

相关文章

  • kafka0.8消费者实例

    摘要:序这里简单展示一下如何使用的去消费一个。初始化客户端设置的的超时时间不设置则永远阻塞直到有新消息来并发消费取决于的值默认为是抛出异常而不是抛出注意事项消费者实例数每个实例的消费线程数的数量,否则多余的就浪费了。 序 这里简单展示一下如何使用kafka0.8的client去消费一个topic。 maven org.apache.kafka ...

    junbaor 评论0 收藏0
  • kafka0.8产者异常处理

    摘要:序本文简单解析一下版本中的的异常处理。具体跟打交道的时候,请求失败网络链接失败或是返回异常,则会根据重试次数重新入队。 序 本文简单解析一下kafka0.8.2.2版本中的java producer的异常处理。 概况 showImg(https://segmentfault.com/img/bVV8Zg?w=808&h=255); kafka的java producer的发送是异步,主...

    Caicloud 评论0 收藏0
  • kafka0.8.2消费者实例

    maven org.apache.kafka kafka_2.10 0.8.2.2 高级api实例 public class NativeConsumer { private static final String TOPIC = kafkatopic; public void exec() throws UnsupportedEncodingExc...

    Tony_Zby 评论0 收藏0
  • 聊聊kafka0.8的topic的partition以及topicCountMap

    摘要:序本文主要研究下版本的与的的关系。如果增减,,会导致,后对应的会发生变化。比如减少一个,然后之后,对应的会进行重新调整映射。的是,针对该是线程的数量。因此实际分布式部署的时候,其实例个数每个的个数的值。 序 本文主要研究下kafka0.8版本api的topicCountMap与topic的partition的关系。 partition 物理上把topic分成一个或多个partition...

    Hancock_Xu 评论0 收藏0
  • kafka0.8产者配置参数解析

    摘要:序本文主要讲解一下生产者的几个配置参数。设置此值是表示在响应请求之前不能再向同一个发送请求。注意设置此参数是为了避免消息乱序默认为。 序 本文主要讲解一下kafka生产者的几个配置参数。 参数及重要程度列表 static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, ...

    dance 评论0 收藏0

发表评论

0条评论

leiyi

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<