资讯专栏INFORMATION COLUMN

PHP下kafka的实践

Codeing_ls / 1113人阅读

摘要:消息以为类别记录将消息种子分类每一类的消息称之为一个主题。这意味着生产者不等待来自同步完成的确认继续发送下一条批消息。这意味着在已成功收到的数据并得到确认后发送下一条。三种机制,性能依次递减吞吐量降低,数据健壮性则依次递增。

kafka 简介

</>复制代码

  1. Kafka 是一种高吞吐量的分布式发布订阅消息系统
kafka角色必知

</>复制代码

  1. producer:生产者。
  2. consumer:消费者。
  3. topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分类, 每一类的消息称之为一个主题(Topic)。
  4. broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic), 并从Broker拉数据,从而消费这些已发布的消息。
经典模型

</>复制代码

  1. 1. 一个主题下的分区不能小于消费者数量,即一个主题下消费者数量不能大于分区属,大了就浪费了空闲了
  2. 2. 一个主题下的一个分区可以同时被不同消费组其中某一个消费者消费
  3. 3. 一个主题下的一个分区只能被同一个消费组的一个消费者消费

常用参数说明 request.required.acks

</>复制代码

  1. Kafka producerack有3中机制,初始化producer时的producerconfig可以通过配置request.required.acks不同的值来实现。
  2. 0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。
  3. 1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。
  4. -1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。
  5. 此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。
  6. 三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。
auto.offset.reset

</>复制代码

  1. 1. earliest:自动将偏移重置为最早的偏移量
  2. 2. latest:自动将偏移量重置为最新的偏移量(默认)
  3. 3. none:如果consumer group没有发现先前的偏移量,则向consumer抛出异常。
  4. 4. 其他的参数:向consumer抛出异常(无效参数)
kafka安装和简单测试 安装kafka(不需要安装,解包即可)

</>复制代码

  1. # 官方下载地址:http://kafka.apache.org/downloads
  2. # wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
  3. tar -xzf kafka_2.12-1.1.1.tgz
  4. cd kafka_2.12-1.1.0
启动kafka server

</>复制代码

  1. # 需先启动zookeeper
  2. # -daemon 可启动后台守护模式
  3. bin/zookeeper-server-start.sh config/zookeeper.properties
  4. bin/kafka-server-start.sh config/server.properties
启动kafka客户端测试

</>复制代码

  1. # 创建一个话题,test话题2个分区
  2. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
  3. Created topic "test".
  4. # 显示所有话题
  5. bin/kafka-topics.sh --list --zookeeper localhost:2181
  6. test
  7. # 显示话题信息
  8. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
  9. Topic:test PartitionCount:2 ReplicationFactor:1 Configs:
  10. Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  11. Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
  12. # 启动一个生产者(输入消息)
  13. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  14. [等待输入自己的内容 出现>输入即可]
  15. >i am a new msg !
  16. >i am a good msg ?
  17. # 启动一个消费者(等待消息)
  18. # 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
  19. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  20. [等待消息]
  21. i am a new msg !
  22. i am a good msg ?
安装kafka的php扩展

</>复制代码

  1. # 先安装rdkfka库文件
  2. git clone https://github.com/edenhill/librdkafka.git
  3. cd librdkafka/
  4. ./configure
  5. make
  6. sudo make install
  7. git clone https://github.com/arnaud-lb/php-rdkafka.git
  8. cd php-rdkafka
  9. phpize
  10. ./configure
  11. make all -j 5
  12. sudo make install
  13. vim [php]/php.ini
  14. extension=rdkafka.so
php代码实践 生产者

</>复制代码

  1. setDrMsgCb(function ($kafka, $message) {
  2. file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
  3. });
  4. $conf->setErrorCb(function ($kafka, $err, $reason) {
  5. file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
  6. });
  7. $rk = new RdKafkaProducer($conf);
  8. $rk->setLogLevel(LOG_DEBUG);
  9. $rk->addBrokers("127.0.0.1");
  10. $cf = new RdKafkaTopicConf();
  11. // -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
  12. // 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
  13. $cf->set("request.required.acks", 0);
  14. $topic = $rk->newTopic("test", $cf);
  15. $option = "qkl";
  16. for ($i = 0; $i < 20; $i++) {
  17. //RD_KAFKA_PARTITION_UA自动选择分区
  18. //$option可选
  19. $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
  20. }
  21. $len = $rk->getOutQLen();
  22. while ($len > 0) {
  23. $len = $rk->getOutQLen();
  24. var_dump($len);
  25. $rk->poll(50);
  26. }
运行生产者

</>复制代码

  1. php producer.php
  2. # output
  3. int(20)
  4. int(20)
  5. int(20)
  6. int(20)
  7. int(0)
  8. # 你可以查看你刚才上面启动的消费者shell应该会输出消息
  9. qkl . 0
  10. qkl . 1
  11. qkl . 2
  12. qkl . 3
  13. qkl . 4
  14. qkl . 5
  15. qkl . 6
  16. qkl . 7
  17. qkl . 8
  18. qkl . 9
  19. qkl . 10
  20. qkl . 11
  21. qkl . 12
  22. qkl . 13
  23. qkl . 14
  24. qkl . 15
  25. qkl . 16
  26. qkl . 17
  27. qkl . 18
  28. qkl . 19
Low Level 消费者

</>复制代码

  1. setDrMsgCb(function ($kafka, $message) {
  2. file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
  3. });
  4. $conf->setErrorCb(function ($kafka, $err, $reason) {
  5. file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
  6. });
  7. //设置消费组
  8. $conf->set("group.id", "myConsumerGroup");
  9. $rk = new RdKafkaConsumer($conf);
  10. $rk->addBrokers("127.0.0.1");
  11. $topicConf = new RdKafkaTopicConf();
  12. $topicConf->set("request.required.acks", 1);
  13. //在interval.ms的时间内自动提交确认、建议不要启动
  14. //$topicConf->set("auto.commit.enable", 1);
  15. $topicConf->set("auto.commit.enable", 0);
  16. $topicConf->set("auto.commit.interval.ms", 100);
  17. // 设置offset的存储为file
  18. //$topicConf->set("offset.store.method", "file");
  19. // 设置offset的存储为broker
  20. $topicConf->set("offset.store.method", "broker");
  21. //$topicConf->set("offset.store.path", __DIR__);
  22. //smallest:简单理解为从头开始消费,其实等价于上面的 earliest
  23. //largest:简单理解为从最新的开始消费,其实等价于上面的 latest
  24. //$topicConf->set("auto.offset.reset", "smallest");
  25. $topic = $rk->newTopic("test", $topicConf);
  26. // 参数1消费分区0
  27. // RD_KAFKA_OFFSET_BEGINNING 重头开始消费
  28. // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
  29. // RD_KAFKA_OFFSET_END 最后一条消费
  30. $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
  31. //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
  32. //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
  33. while (true) {
  34. //参数1表示消费分区,这里是分区0
  35. //参数2表示同步阻塞多久
  36. $message = $topic->consume(0, 12 * 1000);
  37. if (is_null($message)) {
  38. sleep(1);
  39. echo "No more messages
  40. ";
  41. continue;
  42. }
  43. switch ($message->err) {
  44. case RD_KAFKA_RESP_ERR_NO_ERROR:
  45. var_dump($message);
  46. break;
  47. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  48. echo "No more messages; will wait for more
  49. ";
  50. break;
  51. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  52. echo "Timed out
  53. ";
  54. break;
  55. default:
  56. throw new Exception($message->errstr(), $message->err);
  57. break;
  58. }
  59. }
High LEVEL消费者

</>复制代码

  1. assign();
  2. // $kafka->assign([new RdKafkaTopicPartition("qkl01", 0, 0)]);
  3. break;
  4. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  5. echo "Revoke: ";
  6. var_dump($partitions);
  7. $kafka->assign(NULL);
  8. break;
  9. default:
  10. throw new Exception($err);
  11. }
  12. }
  13. // Set a rebalance callback to log partition assignments (optional)
  14. $conf->setRebalanceCb(function(RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
  15. rebalance($kafka, $err, $partitions);
  16. });
  17. // Configure the group.id. All consumer with the same group.id will consume
  18. // different partitions.
  19. $conf->set("group.id", "test-110-g100");
  20. // Initial list of Kafka brokers
  21. $conf->set("metadata.broker.list", "192.168.216.122");
  22. $topicConf = new RdKafkaTopicConf();
  23. $topicConf->set("request.required.acks", -1);
  24. //在interval.ms的时间内自动提交确认、建议不要启动
  25. $topicConf->set("auto.commit.enable", 0);
  26. //$topicConf->set("auto.commit.enable", 0);
  27. $topicConf->set("auto.commit.interval.ms", 100);
  28. // 设置offset的存储为file
  29. $topicConf->set("offset.store.method", "file");
  30. $topicConf->set("offset.store.path", __DIR__);
  31. // 设置offset的存储为broker
  32. // $topicConf->set("offset.store.method", "broker");
  33. // Set where to start consuming messages when there is no initial offset in
  34. // offset store or the desired offset is out of range.
  35. // "smallest": start from the beginning
  36. $topicConf->set("auto.offset.reset", "smallest");
  37. // Set the configuration to use for subscribed/assigned topics
  38. $conf->setDefaultTopicConf($topicConf);
  39. $consumer = new RdKafkaKafkaConsumer($conf);
  40. //$KafkaConsumerTopic = $consumer->newTopic("qkl01", $topicConf);
  41. // Subscribe to topic "test"
  42. $consumer->subscribe(["qkl01"]);
  43. echo "Waiting for partition assignment... (make take some time when
  44. ";
  45. echo "quickly re-joining the group after leaving it.)
  46. ";
  47. while (true) {
  48. $message = $consumer->consume(120*1000);
  49. switch ($message->err) {
  50. case RD_KAFKA_RESP_ERR_NO_ERROR:
  51. var_dump($message);
  52. // $consumer->commit($message);
  53. // $KafkaConsumerTopic->offsetStore(0, 20);
  54. break;
  55. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  56. echo "No more messages; will wait for more
  57. ";
  58. break;
  59. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  60. echo "Timed out
  61. ";
  62. break;
  63. default:
  64. throw new Exception($message->errstr(), $message->err);
  65. break;
  66. }
  67. }
消费组特别说明

</>复制代码

  1. 特别注意,High LEVEL消费者设置的消费组,kafka服务器才会记录, Low Level消费者设置的消费组,服务器不会记录

具体查看消费组信息,你可以翻阅本篇文章

查看服务器元数据(topic/partition/broker)

</>复制代码

  1. setDrMsgCb(function ($kafka, $message) {
  2. file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
  3. });
  4. $conf->setErrorCb(function ($kafka, $err, $reason) {
  5. printf("Kafka error: %s (reason: %s)
  6. ", rd_kafka_err2str($err), $reason);
  7. });
  8. $conf->set("group.id", "myConsumerGroup");
  9. $rk = new RdKafkaConsumer($conf);
  10. $rk->addBrokers("127.0.0.1");
  11. $allInfo = $rk->metadata(true, NULL, 60e3);
  12. $topics = $allInfo->getTopics();
  13. echo rd_kafka_offset_tail(100);
  14. echo "--";
  15. echo count($topics);
  16. echo "--";
  17. foreach ($topics as $topic) {
  18. $topicName = $topic->getTopic();
  19. if ($topicName == "__consumer_offsets") {
  20. continue ;
  21. }
  22. $partitions = $topic->getPartitions();
  23. foreach ($partitions as $partition) {
  24. // $rf = new ReflectionClass(get_class($partition));
  25. // foreach ($rf->getMethods() as $f) {
  26. // var_dump($f);
  27. // }
  28. // die();
  29. $topPartition = new RdKafkaTopicPartition($topicName, $partition->getId());
  30. echo "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
  31. echo "offset:" . ($topPartition->getOffset()) . PHP_EOL;
  32. }
  33. }
如果需远端生产和消费

</>复制代码

  1. vim config/server.properties
  2. advertised.listeners=PLAINTEXT://ip:9092
  3. # ip 未你kafka的外网ip即可
分享一个打包好的php-rdkafka的类库

https://github.com/qkl9527/php-rdkafka-class

参考文献

Kafka文档

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29117.html

相关文章

  • PHPkafka常用脚本实践

    摘要:阅读本教程前最好先尝试阅读下的实践自带命令实践尝试实践的知识创建话题生产消息消费消息话题信息获取消费组获取消费组的自带的命令安装目录的目录下代表我们会使用的脚本 阅读本教程前最好先尝试阅读:PHP下kafka的实践 自带命令实践 尝试实践的kafka知识: 创建话题 生产消息 消费消息 话题信息 获取消费组 获取消费组的offset 自带的命令 # kafka安装目录的bin目录下...

    caiyongji 评论0 收藏0
  • PHPer书单

    摘要:想提升自己,还得多看书多看书多看书下面是我收集到的一些程序员应该看得书单及在线教程,自己也没有全部看完。共勉吧当然,如果你有好的书想分享给大家的或者觉得书单不合理,可以去通过进行提交。讲师温铭,软件基金会主席,最佳实践作者。 想提升自己,还得多看书!多看书!多看书!下面是我收集到的一些PHP程序员应该看得书单及在线教程,自己也没有全部看完。共勉吧!当然,如果你有好的书想分享给大家的或者...

    jimhs 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<