资讯专栏INFORMATION COLUMN

php+kafka+zookeeper+logstash

Moxmi / 414人阅读

摘要:本文主要实现的目标是连接并且成功发送消息给。发送消息网上找了一圈,终于找到一个可以用的也可以用代码如下发送消息到不同的参考文章最后附一张截图

本文主要实现的目标是php连接kafka并且成功发送消息给kafka。为了验证这个连接和发送,另外配置了logstash监听kafka相对应的消息,然后转发到redis,原来我不知道对kafka比较陌生,不知道怎么看里面的消息内容(我知道安装包里有个consumer和producer的脚本) ^ _ ^

消息发送路径:php->kafka->logstash->redis

1.安装kafka

下载地址:https://kafka.apache.org/

下载解压后进入根目录,

bin/zookeeper-server-start.sh config/zookeeper.properties &  开启zookeeper
bin/kafka-server-start.sh config/server.properties & 开启kafka

另开一个终端然后

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka

这样就创建了一个topic为kafka的消息通道

如果这个步骤成功的话,可以通过另开终端发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka

执行之后就可以输入消息发送了。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka --from-beginning

来接受上面终端发送的消息

2.安装php zookeeper扩展,并且用php发送消息

安装扩展之前需要安装zookeeper的c client,扩展有依赖,步骤如下

1.通过apt-get来安装(楼主用的是ubuntu)

2.通过源码安装,地址:https://github.com/apache/zoo...

下载下来

cd zookeeper

./configure

make && sudo make install

按照如上步骤,/usr/local/bin目录下就会多一个cli_mt

php的zookeeper的源码可以去pecl.php.net下载,然后老步骤

phpize

./configure --with-libzookeeper=/usr/local/bin/cli_mt (如果你安装扩展的php不是默认的php,则需要带上--with-php-config参数)

make && sudo make install

最后别忘了添加extension=zookeeper.so到php.ini
3.配置logstash

下载地址:https://www.elastic.co/downlo...

修改配置文件,由于楼主的logstash版本已经是5.2的了,所以又是一阵谷歌,才发现很多网上的配置都是1.2版本的,已经不兼容了。

input{

    kafka{
    
        bootstrap_servers=>"localhost:9092"
        
        topics=>["kafka"]
    
    }

}

output{

    redis{
    
        host=>"127.0.0.1"
        
        port=>6379
        
        key=>"kafka"
        
        data_type=>"list"
        
        password=>"123456"
    
    }

}

4.php发送消息

网上找了一圈,终于找到一个可以用的

https://github.com/nmred/kafk...

也可以用

composer require "nmred/kafka-php"

php代码如下:

require "./vendor/autoload.php";

$produce = KafkaProduce::getInstance("10.37.129.2:2181", 3000);

$produce->setRequireAck(-1);

$topicName = "kafka";

$partitions = $produce->getAvailablePartitions($topicName);

$partCount = count($partitions);

var_dump("$partCount:".$partCount);

$count = 0;

$message = json_encode(array("uid" => $count, "age" => $count%100, "datetime" => date("Y-m-d H:i:s")));

//发送消息到不同的partition

$partitionId = $count%$partCount;

$produce->setMessages($topicName, $partitionId, array($message));

$result = $produce->send();

var_dump($result);

参考文章:http://blog.kazaff.me/2015/06...

最后附一张截图

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/22525.html

相关文章

  • 日志平台(网关层) - 基于Openresty+ELKF+Kafka

    摘要:现在用方式调用接口,中使用方式输入内容日志平台网关层基于。日志平台网关层基于到此为止,提取经过网关的接口信息,并将其写入日志文件就完成了,所有的接口日志都写入了文件中。 背景介绍 1、问题现状与尝试 没有做日志记录的线上系统,绝对是给系统运维人员留下的坑。尤其是前后端分离的项目,后端的接口日志可以解决对接、测试和运维时的很多问题。之前项目上发布的接口都是通过Oracle Service...

    xumenger 评论0 收藏0
  • 基于Docker的日志分析平台(三) 快速入门

    摘要:是一个日志收集器,支持非常多的输入源和输出源。这个库支持展开文件路径,而且会记录一个叫的数据库文件来跟踪被监听的日志文件的当前读取位置。 1.Zookeeper 对于Zookeeper我们用一条简单的命令来测试一下: echo ruok|nc localhost 2181 你应该可以看到: imok 2.Kafka Kafka 是由 Linked 开发并且开源的一套分布式的流平台,它类...

    afishhhhh 评论0 收藏0

发表评论

0条评论

Moxmi

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<