资讯专栏INFORMATION COLUMN

Springboot集成Kafka

daydream / 609人阅读

摘要:支持通过服务器和消费机集群来分区消息。如果数据产生速度大于向发送的速度,会阻塞或者抛出异常,以来表明。这项设置将和能够使用的总内存相关,但并不是一个硬性的限制,因为不是使用的所有内存都是用于缓存。

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka服务器和消费机集群来分区消息。支持Hadoop并行数据加载。

Springboot的基本搭建和配置我在之前的文章已经给出代码示例了,如果还不了解的话可以先按照 SpringMVC配置太多?试试SpringBoot 进行学习哦。 那么如今很火的Springboot与kafka怎么完美的结合呢?多说无宜,放码过来 (talk is cheap,show me your code)!

安装Kafka

因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookeeper先安装上,然后将kafka安装好就可以了。 下面我给出Mac安装的步骤以及需要注意的点吧,windows的配置除了所在位置不太一样其他几乎没什么不同。

brew install kafka 

对,就是这么简单,mac上一个命令就可以搞定了,这个安装过程可能需要等一会儿,应该是和网络状况有关系。安装提示信息可能有错误消息,如"Error: Could not link: /usr/local/share/doc/homebrew" 这个没关系,自动忽略掉了。 最终我们看到下面的样子就成功咯。

 ==> Summary 🍺/usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB 

安装的配置文件位置如下,根据自己的需要修改端口号什么的就可以了。

安装的zoopeeper和kafka的位置 /usr/local/Cellar/

配置文件 /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties

启动zookeeper
  ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & 
启动kafka
 ./bin/kafka-server-start /usr/local/etc/kafka/server.properties & 

为kafka创建Topic,topic 名为test,可以配置成自己想要的名字,回头再代码中配置正确就可以了。

 ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 
代码示例pom.xml
    
        org.springframework.boot
        spring-boot-starter-parent
        2.0.2.RELEASE
        
    

    
        UTF-8
        UTF-8
        1.8
    

    
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.kafka
            spring-kafka
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            com.google.code.gson
            gson
            2.8.2
        
        
            org.springframework.boot
            spring-boot-starter-web
            RELEASE
        

    
application.yml
server:
  servlet:
    context-path: /
  port: 8080
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    #生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
    producer:
      #每批次发送消息的数量
      batch-size: 16
      #设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 0
      #producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消费者的配置
    consumer:
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: latest
      #是否开启自动提交
      enable-auto-commit: true
      #自动提交的时间间隔
      auto-commit-interval: 100
      #key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value的解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #在/usr/local/etc/kafka/consumer.properties中有配置
      group-id: test-consumer-group
Producer 消息生产者
@Component
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static Gson gson = new GsonBuilder().create();

    //发送消息方法
    public void send() {
        Message message = new Message();
        message.setId("KFK_"+System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        kafkaTemplate.send("test", gson.toJson(message));
    }

}
public class Message {

    private String id;

    private String msg;

    private Date sendTime;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}
Consumer 消息消费者
public class Consumer {

    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord record){

        Optional kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("---->"+record);
            System.out.println("---->"+message);

        }

    }
}
测试接口用例

这里我们用一个接口来测试我们的消息发送会不会被消费者接收。

@RestController
@RequestMapping("/kafka")
public class SendController {

    @Autowired
    private Producer producer;

    @RequestMapping(value = "/send")
    public String send() {
        producer.send();
        return "{"code":0}";
    }
}

在Springboot启动类启动后在浏览器访问http://127.0.0.1:8080/kafka/send,我们可以再IDE控制台中看到输出的结果,这时候我们的整合基本上就完成啦。 具体代码可以在https://github.com/xiaour/Spr... 获取,也欢迎大家Start⭐️我的项目。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/33930.html

相关文章

  • SpringBoot + KafKa集群的集成

    摘要:新增,修改,删除等主题配置类杨攀年月日下午管理员,委派给以创建在应用程序上下文中定义的主题的管理员。杨攀年月日下午配置实例的连接地址的管理客户端,用于创建修改删除主题等杨攀年月日下午创建一个新的的,如果中的已经存在,则忽略。 简介 本文主要讲在springboot2中,如何通过自定义的配置来集成,并可以比较好的扩展性,同时集成多个kafka集群 引入依赖 引入kafka的依赖 ...

    suxier 评论0 收藏0
  • 聊聊spring对kafka集成方式

    摘要:序本文主要简单梳理梳理应用中生产消费消息的一些使用选择。配置配置收发信息基于构建,在环境中又稍作加工,也稍微有点封装了具体详见实例以及属性配置与集成总结的消费能力很低的情况下的处理方案 序 本文主要简单梳理梳理java应用中生产/消费kafka消息的一些使用选择。 可用类库 kafka client spring for apache kafka spring integration...

    Elle 评论0 收藏0
  • 在JHipster(SpringBoot)中集成Kafka消息队列

    摘要:在中集成消息队列前置条件服务器已经配置好。参见在机器上部署消息队列使用和来进行集成。消息消费者服务器在调试过程中,可以在服务器端,用命令行来接收消息或发布消息,分别调试发布和消费过程是否正常。 在JHipster(SpringBoot)中集成Kafka消息队列 前置条件: Kafka服务器已经配置好。参见在Ubuntu机器上部署Kafka消息队列 使用Spring Cloud Stre...

    CloudwiseAPM 评论0 收藏0
  • 日志平台(网关层) - 基于Openresty+ELKF+Kafka

    摘要:现在用方式调用接口,中使用方式输入内容日志平台网关层基于。日志平台网关层基于到此为止,提取经过网关的接口信息,并将其写入日志文件就完成了,所有的接口日志都写入了文件中。 背景介绍 1、问题现状与尝试 没有做日志记录的线上系统,绝对是给系统运维人员留下的坑。尤其是前后端分离的项目,后端的接口日志可以解决对接、测试和运维时的很多问题。之前项目上发布的接口都是通过Oracle Service...

    yvonne 评论0 收藏0
  • 日志平台(网关层) - 基于Openresty+ELKF+Kafka

    摘要:现在用方式调用接口,中使用方式输入内容日志平台网关层基于。日志平台网关层基于到此为止,提取经过网关的接口信息,并将其写入日志文件就完成了,所有的接口日志都写入了文件中。 背景介绍 1、问题现状与尝试 没有做日志记录的线上系统,绝对是给系统运维人员留下的坑。尤其是前后端分离的项目,后端的接口日志可以解决对接、测试和运维时的很多问题。之前项目上发布的接口都是通过Oracle Service...

    xumenger 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<