资讯专栏INFORMATION COLUMN

Spring Boot 中使用 kafka

xiaowugui666 / 2572人阅读

摘要:是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区多副本冗余,因此被广泛用于大规模消息数据处理应用。支持及多种其它语言客户端,可与等其它大数据工具结合使用。

Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。

环境安装

搭建高吞吐量 Kafka 分布式发布订阅消息 集群

测试用例 Github 代码

代码我已放到 Github ,导入spring-boot-kafka 项目

github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-kafka

添加依赖

在项目中添加 kafka-clients 依赖


    org.apache.kafka
    kafka-clients
    0.10.2.0


    org.springframework.kafka
    spring-kafka
启用 kafka
@Configuration
@EnableKafka
public class KafkaConfiguration {

}
消息生产者
@Component
public class MsgProducer {

    private static final Logger log = LoggerFactory.getLogger(MsgProducer.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topicName, String jsonData) {
        log.info("向kafka推送数据:[{}]", jsonData);
        try {
            kafkaTemplate.send(topicName, jsonData);
        } catch (Exception e) {
            log.error("发送数据出错!!!{}{}", topicName, jsonData);
            log.error("发送数据出错=====>", e);
        }

        //消息发送的监听器,用于回调返回信息
        kafkaTemplate.setProducerListener(new ProducerListener() {
            @Override
            public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
            }

            @Override
            public void onError(String topic, Integer partition, String key, String value, Exception exception) {
            }

            @Override
            public boolean isInterestedInSuccess() {
                log.info("数据发送完毕");
                return false;
            }
        });
    }

}
消息消费者
@Component
public class MsgConsumer {

    @KafkaListener(topics = {"topic-1","topic-2"})
    public void processMessage(String content) {

        System.out.println("消息被消费"+content);
    }
    
}
参数配置

application.properties

#kafka
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=YZ-PTEST-APP-HADOOP-02:9092,YZ-PTEST-APP-HADOOP-04:9092
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
# 指定默认消费者group id
spring.kafka.consumer.group-id=myGroup
# 指定默认topic id
spring.kafka.template.default-topic=topic-1
启动服务
@SpringBootApplication
@ComponentScan(value = {"io.ymq.kafka"})
public class Startup {

    public static void main(String[] args) {
        SpringApplication.run(Startup.class, args);
    }
}
单元测试
import io.ymq.kafka.MsgProducer;
import io.ymq.kafka.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
 * 描述: 测试 kafka
 *
 * @author yanpenglei
 * @create 2017-10-16 18:45
 **/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class BaseTest {

    @Autowired
    private MsgProducer msgProducer;

    @Test
    public void test() throws Exception {

        msgProducer.sendMessage("topic-1", "topic--------1");
        msgProducer.sendMessage("topic-2", "topic--------2");
    }
}

消息生产者,响应

2017-10-17 15:54:44.814  INFO 2960 --- [           main] io.ymq.kafka.MsgProducer                 : 向kafka推送数据:[topic--------1]
2017-10-17 15:54:44.860  INFO 2960 --- [           main] io.ymq.kafka.MsgProducer                 : 向kafka推送数据:[topic--------2]
2017-10-17 15:54:44.878  INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer                 : 数据发送完毕
2017-10-17 15:54:44.878  INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer                 : 数据发送完毕

消息消费者,响应

消息被消费topic--------1
消息被消费topic--------2

代码我已放到 Github ,导入spring-boot-kafka 项目

github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-kafka

遇到一些坑
[2017-10-16 19:20:08.340] - 14884 严重 [main] --- org.springframework.kafka.support.LoggingProducerListener: Exception thrown when sending a message with key="null" and payload="topic--------2" to topic topic-2:

经调试发现 kafka 连接是用的主机名,所以修改 hosts

C:WindowsSystem32driversetchosts

10.32.32.149 YZ-PTEST-APP-HADOOP-02
10.32.32.154 YZ-PTEST-APP-HADOOP-04
Contact

作者:鹏磊

出处:http://www.ymq.io

Email:admin@souyunku.com

版权归作者所有,转载请注明出处

Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/33877.html

相关文章

  • spring for kafka自动配置及配置属性

    摘要:序本文主要列一下的一些以及属性配置这个版本使用的是版本使用的是版本几个关键配置类配置属性公共配置配置属性配置 序 本文主要列一下spring for apache kafka的一些auto config以及属性配置 maven org.springframework.kafka spring-kafka 1.2.3.RELEASE 这个版本使用的是kafka clien...

    Hancock_Xu 评论0 收藏0
  • SpringBoot Kafka 整合使用

    摘要:注意一定要亲自自己安装实践,接下来我们将这两个进行整合。创建项目项目整体架构使用创建项目,这个很简单了,这里不做过多的讲解。 showImg(http://ww4.sinaimg.cn/large/006tNc79gy1g5iatph25rj30u00gw0yj.jpg); 前提 假设你了解过 SpringBoot 和 Kafka。 1、SpringBoot 如果对 SpringBoo...

    BigTomato 评论0 收藏0
  • Spring Boot 参考指南(消息传递)

    摘要:还自动配置发送和接收消息所需的基础设施。支持是一个轻量级的可靠的可伸缩的可移植的消息代理,基于协议,使用通过协议进行通信。 32. 消息传递 Spring框架为与消息传递系统集成提供了广泛的支持,从使用JmsTemplate简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTempla...

    Doyle 评论0 收藏0
  • SpringBoot Kafka 整合使用

    摘要:注意一定要亲自自己安装实践,接下来我们将这两个进行整合。创建项目项目整体架构使用创建项目,这个很简单了,这里不做过多的讲解。需要注意的是这里的需要和消息发送类中设置的一致。 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/ showImg(https://segmentfault.com/img...

    ghnor 评论0 收藏0
  • SpringBoot Kafka 整合使用

    摘要:注意一定要亲自自己安装实践,接下来我们将这两个进行整合。创建项目项目整体架构使用创建项目,这个很简单了,这里不做过多的讲解。需要注意的是这里的需要和消息发送类中设置的一致。 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/ showImg(https://segmentfault.com/img...

    LuDongWei 评论0 收藏0

发表评论

0条评论

阅读需要支付1元查看
<