摘要:是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区多副本冗余,因此被广泛用于大规模消息数据处理应用。支持及多种其它语言客户端,可与等其它大数据工具结合使用。
Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。
环境安装搭建高吞吐量 Kafka 分布式发布订阅消息 集群
测试用例 Github 代码代码我已放到 Github ,导入spring-boot-kafka 项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-kafka
添加依赖在项目中添加 kafka-clients 依赖
启用 kafkaorg.apache.kafka kafka-clients 0.10.2.0 org.springframework.kafka spring-kafka
@Configuration @EnableKafka public class KafkaConfiguration { }消息生产者
@Component public class MsgProducer { private static final Logger log = LoggerFactory.getLogger(MsgProducer.class); @Autowired private KafkaTemplate消息消费者kafkaTemplate; public void sendMessage(String topicName, String jsonData) { log.info("向kafka推送数据:[{}]", jsonData); try { kafkaTemplate.send(topicName, jsonData); } catch (Exception e) { log.error("发送数据出错!!!{}{}", topicName, jsonData); log.error("发送数据出错=====>", e); } //消息发送的监听器,用于回调返回信息 kafkaTemplate.setProducerListener(new ProducerListener () { @Override public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) { } @Override public void onError(String topic, Integer partition, String key, String value, Exception exception) { } @Override public boolean isInterestedInSuccess() { log.info("数据发送完毕"); return false; } }); } }
@Component public class MsgConsumer { @KafkaListener(topics = {"topic-1","topic-2"}) public void processMessage(String content) { System.out.println("消息被消费"+content); } }参数配置
application.properties
#kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=YZ-PTEST-APP-HADOOP-02:9092,YZ-PTEST-APP-HADOOP-04:9092 # 指定listener 容器中的线程数,用于提高并发量 spring.kafka.listener.concurrency=3 # 每次批量发送消息的数量 spring.kafka.producer.batch-size=1000 # 指定默认消费者group id spring.kafka.consumer.group-id=myGroup # 指定默认topic id spring.kafka.template.default-topic=topic-1启动服务
@SpringBootApplication @ComponentScan(value = {"io.ymq.kafka"}) public class Startup { public static void main(String[] args) { SpringApplication.run(Startup.class, args); } }单元测试
import io.ymq.kafka.MsgProducer; import io.ymq.kafka.run.Startup; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * 描述: 测试 kafka * * @author yanpenglei * @create 2017-10-16 18:45 **/ @RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class BaseTest { @Autowired private MsgProducer msgProducer; @Test public void test() throws Exception { msgProducer.sendMessage("topic-1", "topic--------1"); msgProducer.sendMessage("topic-2", "topic--------2"); } }
消息生产者,响应
2017-10-17 15:54:44.814 INFO 2960 --- [ main] io.ymq.kafka.MsgProducer : 向kafka推送数据:[topic--------1] 2017-10-17 15:54:44.860 INFO 2960 --- [ main] io.ymq.kafka.MsgProducer : 向kafka推送数据:[topic--------2] 2017-10-17 15:54:44.878 INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer : 数据发送完毕 2017-10-17 15:54:44.878 INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer : 数据发送完毕
消息消费者,响应
消息被消费topic--------1 消息被消费topic--------2
代码我已放到 Github ,导入spring-boot-kafka 项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-kafka
遇到一些坑[2017-10-16 19:20:08.340] - 14884 严重 [main] --- org.springframework.kafka.support.LoggingProducerListener: Exception thrown when sending a message with key="null" and payload="topic--------2" to topic topic-2:
经调试发现 kafka 连接是用的主机名,所以修改 hosts
C:WindowsSystem32driversetchosts 10.32.32.149 YZ-PTEST-APP-HADOOP-02 10.32.32.154 YZ-PTEST-APP-HADOOP-04Contact
作者:鹏磊
出处:http://www.ymq.io
Email:admin@souyunku.com
版权归作者所有,转载请注明出处
Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/33877.html
摘要:序本文主要列一下的一些以及属性配置这个版本使用的是版本使用的是版本几个关键配置类配置属性公共配置配置属性配置 序 本文主要列一下spring for apache kafka的一些auto config以及属性配置 maven org.springframework.kafka spring-kafka 1.2.3.RELEASE 这个版本使用的是kafka clien...
摘要:注意一定要亲自自己安装实践,接下来我们将这两个进行整合。创建项目项目整体架构使用创建项目,这个很简单了,这里不做过多的讲解。 showImg(http://ww4.sinaimg.cn/large/006tNc79gy1g5iatph25rj30u00gw0yj.jpg); 前提 假设你了解过 SpringBoot 和 Kafka。 1、SpringBoot 如果对 SpringBoo...
摘要:还自动配置发送和接收消息所需的基础设施。支持是一个轻量级的可靠的可伸缩的可移植的消息代理,基于协议,使用通过协议进行通信。 32. 消息传递 Spring框架为与消息传递系统集成提供了广泛的支持,从使用JmsTemplate简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTempla...
摘要:注意一定要亲自自己安装实践,接下来我们将这两个进行整合。创建项目项目整体架构使用创建项目,这个很简单了,这里不做过多的讲解。需要注意的是这里的需要和消息发送类中设置的一致。 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/ showImg(https://segmentfault.com/img...
摘要:注意一定要亲自自己安装实践,接下来我们将这两个进行整合。创建项目项目整体架构使用创建项目,这个很简单了,这里不做过多的讲解。需要注意的是这里的需要和消息发送类中设置的一致。 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/ showImg(https://segmentfault.com/img...
阅读 1667·2021-11-25 09:43
阅读 2016·2021-11-19 09:40
阅读 3050·2021-11-18 13:12
阅读 2700·2021-10-13 09:40
阅读 2092·2021-09-30 09:47
阅读 1575·2021-09-29 09:35
阅读 3530·2021-09-29 09:34
阅读 472·2021-08-24 10:00