资讯专栏INFORMATION COLUMN

搭建 kafka 运行环境

J4ck_Chan / 549人阅读

摘要:准备环境系统下载并解压启动启动启动创建我修改了默认的端口查看创建的查看的详细信息测试运行并在控制台中输一些消息测试注意需要开两个终端搭建开发环境生产者

准备环境

系统

</>复制代码

  1. CentOS release 6.6 (Final)

下载并解压

</>复制代码

  1. wget http://mirror.bit.edu.cn/apache/kafka/0.8.0/kafka_2.10-0.8.0.tgz
  2. tar -zxvf kafka_2.10-0.8.0.tgz`

启动

启动Zookeeper

</>复制代码

  1. bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka

</>复制代码

  1. bin/kafka-server-start.sh config/server.properties &

创建topic
我修改了默认的端口

</>复制代码

  1. bin/kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic testweixuan &

查看创建的topic

</>复制代码

  1. bin/kafka-topics.sh --list --zookeeper localhost:2182

查看topic的详细信息

</>复制代码

  1. bin/kafka-topics.sh --describe --zookeeper localhost:2182

测试producer
运行producer并在控制台中输一些消息:

</>复制代码

  1. bin/kafka-console-producer.sh --broker-list 192.168.1.116:2182:9092 --topic testweixuan

测试consumer

</>复制代码

  1. bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2182 --topic testweixuan --from-beginning

注意:需要开两个终端

搭建开发环境 生产者代码

</>复制代码

  1. package com.fengtang;
  2. import java.util.Properties;
  3. import kafka.javaapi.producer.Producer;
  4. import kafka.producer.KeyedMessage;
  5. import kafka.producer.ProducerConfig;
  6. /**
  7. * Create by fengtang
  8. * 2015/10/8 0008
  9. * KafkaDemo_01
  10. */
  11. public class KafkaProducer {
  12. private final Producer producer;
  13. public final static String TOPIC = "TEST-TOPIC";
  14. private KafkaProducer() {
  15. Properties props = new Properties();
  16. /**
  17. * 此处配置的是kafka的端口
  18. */
  19. props.put("metadata.broker.list", "192.168.1.116:9092");
  20. /**
  21. * 配置value的序列化类
  22. */
  23. props.put("serializer.class", "kafka.serializer.StringEncoder");
  24. /**
  25. * 配置key的序列化类
  26. */
  27. props.put("key.serializer.class", "kafka.serializer.StringEncoder");
  28. props.put("request.required.acks", "-1");
  29. producer = new Producer<>(new ProducerConfig(props));
  30. }
  31. void produce() {
  32. int messageNo = 1000;
  33. final int COUNT = 10000;
  34. while (messageNo < COUNT) {
  35. String key = String.valueOf(messageNo);
  36. String data = "hello kafka message " + key;
  37. producer.send(new KeyedMessage<>(TOPIC, key, data));
  38. System.out.println(data);
  39. messageNo++;
  40. }
  41. }
  42. public static void main(String[] args) {
  43. new KafkaProducer().produce();
  44. }
  45. }
消费者代码

</>复制代码

  1. package com.fengtang;
  2. import kafka.consumer.ConsumerConfig;
  3. import kafka.consumer.ConsumerIterator;
  4. import kafka.consumer.KafkaStream;
  5. import kafka.javaapi.consumer.ConsumerConnector;
  6. import kafka.serializer.StringDecoder;
  7. import kafka.utils.VerifiableProperties;
  8. import java.util.HashMap;
  9. import java.util.List;
  10. import java.util.Map;
  11. import java.util.Properties;
  12. /**
  13. * Create by fengtang
  14. * 2015/10/8 0008
  15. * KafkaDemo_01
  16. */
  17. public class KafkaConsumer {
  18. private final ConsumerConnector consumer;
  19. private KafkaConsumer() {
  20. Properties props = new Properties();
  21. /**
  22. * zookeeper 配置
  23. */
  24. props.put("zookeeper.connect", "192.168.1.116:2182");
  25. /**
  26. * group 代表一个消费组
  27. */
  28. props.put("group.id", "jd-group");
  29. /**
  30. * zk连接超时
  31. */
  32. props.put("zookeeper.session.timeout.ms", "400000");
  33. props.put("zookeeper.sync.time.ms", "200");
  34. props.put("auto.commit.interval.ms", "1000");
  35. props.put("auto.offset.reset", "smallest");
  36. /**
  37. * 序列化类
  38. */
  39. props.put("serializer.class", "kafka.serializer.StringEncoder");
  40. ConsumerConfig config = new ConsumerConfig(props);
  41. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
  42. }
  43. void consume() {
  44. Map topicCountMap = new HashMap();
  45. topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));
  46. StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
  47. StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
  48. Map>> consumerMap =
  49. consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
  50. KafkaStream stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
  51. ConsumerIterator it = stream.iterator();
  52. while (it.hasNext())
  53. System.out.println(it.next().message());
  54. }
  55. public static void main(String[] args) {
  56. new KafkaConsumer().consume();
  57. }
  58. }

pom.xml

</>复制代码

  1. 4.0.0
  2. com.fengtang
  3. kafkademo
  4. 1.0-SNAPSHOT
  5. jar
  6. kafkademo
  7. http://maven.apache.org
  8. UTF-8
  9. org.apache.kafka
  10. kafka_2.10
  11. 0.8.0
  12. log4j
  13. log4j
  14. 1.2.15
  15. jmxtools
  16. com.sun.jdmk
  17. jmxri
  18. com.sun.jmx
  19. jms
  20. javax.jms
  21. mail
  22. javax.mail
解决报错

</>复制代码

  1. kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

需要改动config文件夹下的server.properties中的以下两个属性

</>复制代码

  1. zookeeper.connect=localhost:2181改成zookeeper.connect=192.168.1.116 (自己的服务器IP地址):2181

以及默认注释掉的 #host.name=localhost 改成 host.name=192.168.1.116 (自己的服务器IP地址)

成功截图

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64659.html

相关文章

  • 大数据入门指南(GitHub开源项目)

    摘要:项目地址前言大数据技术栈思维导图大数据常用软件安装指南一分布式文件存储系统分布式计算框架集群资源管理器单机伪集群环境搭建集群环境搭建常用命令的使用基于搭建高可用集群二简介及核心概念环境下的安装部署和命令行的基本使用常用操作分区表和分桶表视图 项目GitHub地址:https://github.com/heibaiying... 前 言 大数据技术栈思维导图 大数据常用软件安装指...

    guyan0319 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<