摘要:添加依赖会自动配置,接下来只要配置属性文件和主题名配置。配置配置主题和消费者组新建添加添加自己的注入模版类至此就可以跑起来了,有什么不明白的可以留言。
pom.xml添加maven依赖
org.springframework.boot spring-boot-starter-parent 2.0.2.RELEASE org.springframework.kafka spring-kafka
spring boot会自动配置kafka,接下来只要配置yml属性文件和主题名配置。
application.yml配置kafkaspring: kafka: bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092 producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1 consumer: enable-auto-commit: false auto-commit-interval: 100ms key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000application.yml配置主题和消费者组
kafka: topic: group-id: topicGroupId topic-name: - topic1 - topic2 - topic3
新建KafkaTopicProperties
@ConfigurationProperties("kafka.topic") public class KafkaTopicProperties implements Serializable { private String groupId; private String[] topicName; public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String[] getTopicName() { return topicName; } public void setTopicName(String[] topicName) { this.topicName = topicName; }
添加KafkaTopicConfiguration
@Configuration @EnableConfigurationProperties(KafkaTopicProperties.class) public class KafkaTopicConfiguration { private final KafkaTopicProperties properties; public KafkaTopicConfiguration(KafkaTopicProperties properties) { this.properties = properties; } @Bean public String[] kafkaTopicName() { return properties.getTopicName(); } @Bean public String topicGroupId() { return properties.getGroupId(); } }添加自己的service
@Service public class IndicatorService { private Logger LOG = LoggerFactory.getLogger(IndicatorService.class); private final KafkaTemplatekafkaTemplate; /** * 注入KafkaTemplate * @param kafkaTemplate kafka模版类 */ @Autowired public IndicatorService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}") public void processMessage(ConsumerRecord record) { LOG.info("kafka processMessage start"); LOG.info("processMessage, topic = {}, msg = {}", record.topic(), record.value()); // do something ... LOG.info("kafka processMessage end"); } public void sendMessage(String topic, String data) { LOG.info("kafka sendMessage start"); ListenableFuture > future = kafkaTemplate.send(topic, data); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable ex) { LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data); } @Override public void onSuccess(SendResult result) { LOG.info("kafka sendMessage success topic = {}, data = {}",topic, data); } }); LOG.info("kafka sendMessage end"); } }
至此就可以跑起来了,有什么不明白的可以留言。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/33946.html
摘要:序本文主要简单梳理梳理应用中生产消费消息的一些使用选择。配置配置收发信息基于构建,在环境中又稍作加工,也稍微有点封装了具体详见实例以及属性配置与集成总结的消费能力很低的情况下的处理方案 序 本文主要简单梳理梳理java应用中生产/消费kafka消息的一些使用选择。 可用类库 kafka client spring for apache kafka spring integration...
摘要:支持通过服务器和消费机集群来分区消息。如果数据产生速度大于向发送的速度,会阻塞或者抛出异常,以来表明。这项设置将和能够使用的总内存相关,但并不是一个硬性的限制,因为不是使用的所有内存都是用于缓存。 Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常...
摘要:还自动配置发送和接收消息所需的基础设施。支持是一个轻量级的可靠的可伸缩的可移植的消息代理,基于协议,使用通过协议进行通信。 32. 消息传递 Spring框架为与消息传递系统集成提供了广泛的支持,从使用JmsTemplate简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTempla...
摘要:作为微服务的基础设施之一,背靠强大的生态社区,支撑技术体系。微服务实践为系列讲座,专题直播节,时长高达小时,包括目前最流行技术,深入源码分析,授人以渔的方式,帮助初学者深入浅出地掌握,为高阶从业人员抛砖引玉。 简介 目前业界最流行的微服务架构正在或者已被各种规模的互联网公司广泛接受和认可,业已成为互联网开发人员必备技术。无论是互联网、云计算还是大数据,Java平台已成为全栈的生态体系,...
摘要:作为微服务的基础设施之一,背靠强大的生态社区,支撑技术体系。微服务实践为系列讲座,专题直播节,时长高达小时,包括目前最流行技术,深入源码分析,授人以渔的方式,帮助初学者深入浅出地掌握,为高阶从业人员抛砖引玉。 简介 目前业界最流行的微服务架构正在或者已被各种规模的互联网公司广泛接受和认可,业已成为互联网开发人员必备技术。无论是互联网、云计算还是大数据,Java平台已成为全栈的生态体系,...
阅读 841·2021-11-22 09:34
阅读 1485·2021-11-19 09:40
阅读 1479·2021-11-02 14:48
阅读 3415·2021-10-13 09:40
阅读 2798·2021-09-23 11:20
阅读 3475·2019-08-30 15:56
阅读 2656·2019-08-30 15:53
阅读 3089·2019-08-30 14:09