资讯专栏INFORMATION COLUMN

Spring Boot集成kafka完整版

fuchenxuan / 1003人阅读

摘要:添加依赖会自动配置,接下来只要配置属性文件和主题名配置。配置配置主题和消费者组新建添加添加自己的注入模版类至此就可以跑起来了,有什么不明白的可以留言。

pom.xml添加maven依赖

    org.springframework.boot
    spring-boot-starter-parent
    2.0.2.RELEASE



    org.springframework.kafka
    spring-kafka

spring boot会自动配置kafka,接下来只要配置yml属性文件和主题名配置。

application.yml配置kafka
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092

    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1

    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100ms
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
application.yml配置主题和消费者组
kafka:
  topic:
    group-id: topicGroupId
    topic-name:
      - topic1
      - topic2
      - topic3

新建KafkaTopicProperties

@ConfigurationProperties("kafka.topic")
public class KafkaTopicProperties implements Serializable {

    private String groupId;
    private String[] topicName;

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String[] getTopicName() {
        return topicName;
    }

    public void setTopicName(String[] topicName) {
        this.topicName = topicName;
    }

添加KafkaTopicConfiguration

@Configuration
@EnableConfigurationProperties(KafkaTopicProperties.class)
public class KafkaTopicConfiguration {

    private final KafkaTopicProperties properties;

    public KafkaTopicConfiguration(KafkaTopicProperties properties) {
        this.properties = properties;
    }

    @Bean
    public String[] kafkaTopicName() {
        return properties.getTopicName();
    }

    @Bean
    public String topicGroupId() {
        return properties.getGroupId();
    }

}
添加自己的service
@Service
public class IndicatorService {

    private Logger LOG = LoggerFactory.getLogger(IndicatorService.class);

    private final KafkaTemplate kafkaTemplate;

    /**
     * 注入KafkaTemplate
     * @param kafkaTemplate kafka模版类
     */
    @Autowired
    public IndicatorService(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }


    @KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}")
    public void processMessage(ConsumerRecord record) {
        LOG.info("kafka processMessage start");
        LOG.info("processMessage, topic = {}, msg = {}", record.topic(), record.value());

        // do something ...

        LOG.info("kafka processMessage end");
    }

    public void sendMessage(String topic, String data) {
        LOG.info("kafka sendMessage start");
        ListenableFuture> future = kafkaTemplate.send(topic, data);
        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable ex) {
                LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
            }

            @Override
            public void onSuccess(SendResult result) {
                LOG.info("kafka sendMessage success topic = {}, data = {}",topic, data);
            }
        });
        LOG.info("kafka sendMessage end");
    }
}

至此就可以跑起来了,有什么不明白的可以留言。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/33946.html

相关文章

  • 聊聊springkafka集成方式

    摘要:序本文主要简单梳理梳理应用中生产消费消息的一些使用选择。配置配置收发信息基于构建,在环境中又稍作加工,也稍微有点封装了具体详见实例以及属性配置与集成总结的消费能力很低的情况下的处理方案 序 本文主要简单梳理梳理java应用中生产/消费kafka消息的一些使用选择。 可用类库 kafka client spring for apache kafka spring integration...

    Elle 评论0 收藏0
  • Springboot集成Kafka

    摘要:支持通过服务器和消费机集群来分区消息。如果数据产生速度大于向发送的速度,会阻塞或者抛出异常,以来表明。这项设置将和能够使用的总内存相关,但并不是一个硬性的限制,因为不是使用的所有内存都是用于缓存。 Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常...

    daydream 评论0 收藏0
  • Spring Boot 参考指南(消息传递)

    摘要:还自动配置发送和接收消息所需的基础设施。支持是一个轻量级的可靠的可伸缩的可移植的消息代理,基于协议,使用通过协议进行通信。 32. 消息传递 Spring框架为与消息传递系统集成提供了广泛的支持,从使用JmsTemplate简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTempla...

    Doyle 评论0 收藏0
  • [直播视频] 《Java 微服务实践 - Spring Boot 系列》限时折扣

    摘要:作为微服务的基础设施之一,背靠强大的生态社区,支撑技术体系。微服务实践为系列讲座,专题直播节,时长高达小时,包括目前最流行技术,深入源码分析,授人以渔的方式,帮助初学者深入浅出地掌握,为高阶从业人员抛砖引玉。 简介 目前业界最流行的微服务架构正在或者已被各种规模的互联网公司广泛接受和认可,业已成为互联网开发人员必备技术。无论是互联网、云计算还是大数据,Java平台已成为全栈的生态体系,...

    Enlightenment 评论0 收藏0
  • [直播视频] 《Java 微服务实践 - Spring Boot 系列》限时折扣

    摘要:作为微服务的基础设施之一,背靠强大的生态社区,支撑技术体系。微服务实践为系列讲座,专题直播节,时长高达小时,包括目前最流行技术,深入源码分析,授人以渔的方式,帮助初学者深入浅出地掌握,为高阶从业人员抛砖引玉。 简介 目前业界最流行的微服务架构正在或者已被各种规模的互联网公司广泛接受和认可,业已成为互联网开发人员必备技术。无论是互联网、云计算还是大数据,Java平台已成为全栈的生态体系,...

    winterdawn 评论0 收藏0

发表评论

0条评论

fuchenxuan

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<