资讯专栏INFORMATION COLUMN

SpringBoot + KafKa集群的集成

suxier / 1043人阅读

摘要:新增,修改,删除等主题配置类杨攀年月日下午管理员,委派给以创建在应用程序上下文中定义的主题的管理员。杨攀年月日下午配置实例的连接地址的管理客户端,用于创建修改删除主题等杨攀年月日下午创建一个新的的,如果中的已经存在,则忽略。

简介

本文主要讲在springboot2中,如何通过自定义的配置来集成,并可以比较好的扩展性,同时集成多个kafka集群

引入依赖

引入kafka的依赖

        
        
            org.springframework.kafka
            spring-kafka
        
配置文件

添加配置文件,默认添加一个kafka的集群,

topinfo:
     # kafka集群配置 ,bootstrap-servers 是必须的
   kafka:
      # 生产者的kafka集群地址
      bootstrap-servers:  192.168.90.225:9092,192.168.90.226:9092,192.168.90.227:9092 
      producer: 
         topic-name:  topinfo-01
         
      consumer:
         group-id:  ci-data
         

如果多个,则配置多个kafka的集群配置即可

添加属性配置类

添加对应的属性配置类,如果是多个kafka集群,则可以填多个即可,注意对应的@ConfigurationProperties。

package com.topinfo.ci.dataex.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import com.topinfo.ci.dataex.bean.Consumer;
import com.topinfo.ci.dataex.bean.Producer;

/**
 * @Description: kafka 属性配置
 * @Author:杨攀
 * @Since:2019年7月10日上午10:35:18
 */
@ConfigurationProperties(prefix = "topinfo.kafka")
@Component
public class KafKaConfiguration {

    /**
     * @Fields bootstrapServer : 集群的地址
     */
    private String bootstrapServers;

    private Producer producer;

    private Consumer consumer;

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public Producer getProducer() {
        return producer;
    }

    public void setProducer(Producer producer) {
        this.producer = producer;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

}
添加kafka配置类

kafka的配置类中, 主要注意的方法:

生产者工厂方法: producerFactory()
生产者KafkaTemplate :kafkaTemplate()

消费者的工厂方法:consumerFactory()
消费者的监听容器工厂方法: kafkaListenerContainerFactory()

如果对应的是对个集群,需要多配置几个对应的这几个方法即可。

package com.topinfo.ci.dataex.config;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
 * @Description: kafka配置类
 * @Author:杨攀
 * @Since:2019年7月10日下午3:06:58
 */
@Configuration
public class KafKaConfig {

    @Autowired
    private KafKaConfiguration configuration;

     
    
    /**
     * @Description: 生产者的配置
     * @Author:杨攀
     * @Since: 2019年7月10日下午1:41:06
     * @return
     */
    public Map producerConfigs() {

        Map props = new HashMap();
        // 集群的服务器地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        //  消息缓存
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 生产者重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        // 指定ProducerBatch(消息累加器中BufferPool中的)可复用大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,  4096);
        // 生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // key 和 value 的序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        // 客户端id
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.topinfo");

        return props;
    }

    /**
     * @Description: 生产者工厂
     * @Author:杨攀
     * @Since: 2019年7月10日下午2:10:04
     * @return
     */
    @Bean
    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }

    /**
     * @Description: KafkaTemplate
     * @Author:杨攀
     * @Since: 2019年7月10日下午2:10:47
     * @return
     */
    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }


    // ------------------------------------------------------------------------------------------------------------

    /**
     * @Description: 消费者配置
     * @Author:杨攀
     * @Since: 2019年7月10日下午1:48:36
     * @return
     */
    public Map consumerConfigs() {

        Map props = new HashMap();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumer().getGroupId());
        // 自动位移提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动位移提交间隔时间
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        // 消费组失效超时时间
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        // 位移丢失和位移越界后的恢复起始位置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // key 和 value 的反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

    /**
     * @Description: 消费者工厂
     * @Author:杨攀
     * @Since: 2019年7月10日下午2:14:13
     * @return
     */
    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * @Description: kafka 监听容器工厂
     * @Author:杨攀
     * @Since: 2019年7月10日下午2:50:44
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 要创建的消费者数量(10 个线程并发处理)
        factory.setConcurrency(10);

        return factory;
    }

}
主题配置类

主要是可以对主题进行管理。新增,修改,删除等

package com.topinfo.ci.dataex.config;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
 * @Description: kafka 主题 配置类
 * @Author:杨攀
 * @Since:2019年7月10日下午3:06:58
 */
@Configuration
public class KafKaTopicConfig {

    @Autowired
    private KafKaConfiguration configuration;

    /**
     *@Description: kafka管理员,委派给AdminClient以创建在应用程序上下文中定义的主题的管理员。
     *@Author:杨攀
     *@Since: 2019年7月10日下午3:14:23
     *@return
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        
        Map props = new HashMap<>();
        
        // 配置Kafka实例的连接地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }

    /**
     *@Description: kafka的管理客户端,用于创建、修改、删除主题等
     *@Author:杨攀
     *@Since: 2019年7月10日下午3:15:01
     *@return
     */
    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }
    
    /**
     * @Description: 创建一个新的 topinfo 的Topic,如果kafka中topinfo 的topic已经存在,则忽略。
     * @Author:杨攀
     * @Since: 2019年7月10日上午11:13:28
     * @return
     */
    @Bean
    public NewTopic topinfo() {

        // 主题名称
        String topicName = configuration.getProducer().getTopicName();
        // 第二个参数是分区数, 第三个参数是副本数量,确保集群中配置的数目大于等于副本数量
        return new NewTopic(topicName, 2, (short) 2);
    }

}
生产者测试

生产者在发送消息的时候,使用对应的kafkaTemplate即可,如果是多个,需要注意导入的是对应的kafkaTemplate。

package com.topinfo.ci.dataex.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.topinfo.ci.dataex.config.KafKaConfig;

@RestController
@RequestMapping("kafka")
public class TestKafKaProducerController {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @RequestMapping("send")
    public String send(String name) {
        
        ListenableFuture>  future = kafkaTemplate.send("topinfo", name);
        
        future.addCallback(new ListenableFutureCallback>() {

            @Override
            public void onSuccess(SendResult result) {
                System.out.println("生产者-发送消息成功:" + result.toString());
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("生产者-发送消息失败:" + ex.getMessage());
            }
        });
        
        
        return "test-ok";
    }
    
}
消费者测试

消费者需要在接收的方法上添加@KafkaListener,用于监听对应的topic,可以配置topic多个。

package com.topinfo.ci.dataex.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.topinfo.ci.dataex.config.KafKaConfig;

/**
 * @Description: kafka消费者
 * @Author:杨攀
 * @Since:2019年7月10日上午11:24:31
 */
@Component
public class KafKaConsumer {

    private final Logger logger = LoggerFactory.getLogger(KafKaConsumer.class);

    
    /**
     * @Description: 可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
     * @Author:杨攀
     * @Since: 2019年7月10日上午11:26:16
     * @param record
     */
    @KafkaListener(topics = { "topinfo" })
    public void receive(ConsumerRecord record) {

        logger.info("消费得到的消息---key: " + record.key());
        logger.info("消费得到的消息---value: " + record.value().toString());
        
    }

}

如果多个集群的情况下,需要在KafkaListener监听注解上添加containerFactory,对应配置中的监听容器工厂。

/**
     * @Description: 可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
     * @Author:杨攀
     * @Since: 2019年7月10日上午11:26:16
     * @param record
     */
    @KafkaListener(topics = { "topinfo" }, containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord record) {

        logger.info("消费得到的消息---key: " + record.key());
        logger.info("消费得到的消息---value: " + record.value().toString());
        
    }

好了, 至此所有的配置就差不多了。

最后还有一项, 看到下面的绿色按钮没,来,点一下,乖! O(∩_∩)O哈哈~ ...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/34008.html

相关文章

  • Springboot集成Kafka

    摘要:支持通过服务器和消费机集群来分区消息。如果数据产生速度大于向发送的速度,会阻塞或者抛出异常,以来表明。这项设置将和能够使用的总内存相关,但并不是一个硬性的限制,因为不是使用的所有内存都是用于缓存。 Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常...

    daydream 评论0 收藏0
  • 2019年java架构师视频

    摘要:并发专题一内存模型原理实现机制单例模式并发基础并发集合原子操作基本类型线程池互联网工程专题二简介安装工具编程介绍,入门程序仓库依赖管理简介常用操作命令 01.并发专题(一)2018-09-11(1)-Java内存模型2018-09-13(2)-synchronized原理2018-09-16(3)-volatile实现机制2018-09-18(4)-DCL-单例模式2018-09-21...

    Object 评论0 收藏0
  • 日志平台(网关层) - 基于Openresty+ELKF+Kafka

    摘要:现在用方式调用接口,中使用方式输入内容日志平台网关层基于。日志平台网关层基于到此为止,提取经过网关的接口信息,并将其写入日志文件就完成了,所有的接口日志都写入了文件中。 背景介绍 1、问题现状与尝试 没有做日志记录的线上系统,绝对是给系统运维人员留下的坑。尤其是前后端分离的项目,后端的接口日志可以解决对接、测试和运维时的很多问题。之前项目上发布的接口都是通过Oracle Service...

    yvonne 评论0 收藏0
  • 日志平台(网关层) - 基于Openresty+ELKF+Kafka

    摘要:现在用方式调用接口,中使用方式输入内容日志平台网关层基于。日志平台网关层基于到此为止,提取经过网关的接口信息,并将其写入日志文件就完成了,所有的接口日志都写入了文件中。 背景介绍 1、问题现状与尝试 没有做日志记录的线上系统,绝对是给系统运维人员留下的坑。尤其是前后端分离的项目,后端的接口日志可以解决对接、测试和运维时的很多问题。之前项目上发布的接口都是通过Oracle Service...

    xumenger 评论0 收藏0
  • 2018年第28周-Kafka环境搭建和其Java例子

    摘要:安装下载配置文件我部署在服务器都在目录修改文件的和,要集群上唯一如只能让内网访问,则需配置这个启动先在每个服务器启动再在每个服务器启动以下命令的例子项目配置项目使用和和,文件如下 安装kafka 下载 wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz 配置文件 我部署在服务器...

    izhuhaodev 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<