摘要:本文主要實(shí)現(xiàn)的目標(biāo)是連接并且成功發(fā)送消息給。發(fā)送消息網(wǎng)上找了一圈,終于找到一個(gè)可以用的也可以用代碼如下發(fā)送消息到不同的參考文章最后附一張截圖
本文主要實(shí)現(xiàn)的目標(biāo)是php連接kafka并且成功發(fā)送消息給kafka。為了驗(yàn)證這個(gè)連接和發(fā)送,另外配置了logstash監(jiān)聽(tīng)kafka相對(duì)應(yīng)的消息,然后轉(zhuǎn)發(fā)到redis,原來(lái)我不知道對(duì)kafka比較陌生,不知道怎么看里面的消息內(nèi)容(我知道安裝包里有個(gè)consumer和producer的腳本) ^ _ ^
消息發(fā)送路徑:php->kafka->logstash->redis
1.安裝kafka
下載地址:https://kafka.apache.org/
下載解壓后進(jìn)入根目錄,
bin/zookeeper-server-start.sh config/zookeeper.properties & 開(kāi)啟zookeeper bin/kafka-server-start.sh config/server.properties & 開(kāi)啟kafka
另開(kāi)一個(gè)終端然后
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka
這樣就創(chuàng)建了一個(gè)topic為kafka的消息通道
如果這個(gè)步驟成功的話,可以通過(guò)另開(kāi)終端發(fā)送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka
執(zhí)行之后就可以輸入消息發(fā)送了。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka --from-beginning
來(lái)接受上面終端發(fā)送的消息
2.安裝php zookeeper擴(kuò)展,并且用php發(fā)送消息
安裝擴(kuò)展之前需要安裝zookeeper的c client,擴(kuò)展有依賴,步驟如下
1.通過(guò)apt-get來(lái)安裝(樓主用的是ubuntu)
2.通過(guò)源碼安裝,地址:https://github.com/apache/zoo...
下載下來(lái)
cd zookeeper ./configure make && sudo make install
按照如上步驟,/usr/local/bin目錄下就會(huì)多一個(gè)cli_mt
php的zookeeper的源碼可以去pecl.php.net下載,然后老步驟
phpize ./configure --with-libzookeeper=/usr/local/bin/cli_mt (如果你安裝擴(kuò)展的php不是默認(rèn)的php,則需要帶上--with-php-config參數(shù)) make && sudo make install
最后別忘了添加extension=zookeeper.so到php.ini
3.配置logstash
下載地址:https://www.elastic.co/downlo...
修改配置文件,由于樓主的logstash版本已經(jīng)是5.2的了,所以又是一陣谷歌,才發(fā)現(xiàn)很多網(wǎng)上的配置都是1.2版本的,已經(jīng)不兼容了。
input{ kafka{ bootstrap_servers=>"localhost:9092" topics=>["kafka"] } } output{ redis{ host=>"127.0.0.1" port=>6379 key=>"kafka" data_type=>"list" password=>"123456" } }
4.php發(fā)送消息
網(wǎng)上找了一圈,終于找到一個(gè)可以用的
https://github.com/nmred/kafk...
也可以用
composer require "nmred/kafka-php"
php代碼如下:
require "./vendor/autoload.php"; $produce = KafkaProduce::getInstance("10.37.129.2:2181", 3000); $produce->setRequireAck(-1); $topicName = "kafka"; $partitions = $produce->getAvailablePartitions($topicName); $partCount = count($partitions); var_dump("$partCount:".$partCount); $count = 0; $message = json_encode(array("uid" => $count, "age" => $count%100, "datetime" => date("Y-m-d H:i:s"))); //發(fā)送消息到不同的partition $partitionId = $count%$partCount; $produce->setMessages($topicName, $partitionId, array($message)); $result = $produce->send(); var_dump($result);
參考文章:http://blog.kazaff.me/2015/06...
最后附一張截圖
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/22525.html
摘要:現(xiàn)在用方式調(diào)用接口,中使用方式輸入內(nèi)容日志平臺(tái)網(wǎng)關(guān)層基于。日志平臺(tái)網(wǎng)關(guān)層基于到此為止,提取經(jīng)過(guò)網(wǎng)關(guān)的接口信息,并將其寫(xiě)入日志文件就完成了,所有的接口日志都寫(xiě)入了文件中。 背景介紹 1、問(wèn)題現(xiàn)狀與嘗試 沒(méi)有做日志記錄的線上系統(tǒng),絕對(duì)是給系統(tǒng)運(yùn)維人員留下的坑。尤其是前后端分離的項(xiàng)目,后端的接口日志可以解決對(duì)接、測(cè)試和運(yùn)維時(shí)的很多問(wèn)題。之前項(xiàng)目上發(fā)布的接口都是通過(guò)Oracle Service...
摘要:是一個(gè)日志收集器,支持非常多的輸入源和輸出源。這個(gè)庫(kù)支持展開(kāi)文件路徑,而且會(huì)記錄一個(gè)叫的數(shù)據(jù)庫(kù)文件來(lái)跟蹤被監(jiān)聽(tīng)的日志文件的當(dāng)前讀取位置。 1.Zookeeper 對(duì)于Zookeeper我們用一條簡(jiǎn)單的命令來(lái)測(cè)試一下: echo ruok|nc localhost 2181 你應(yīng)該可以看到: imok 2.Kafka Kafka 是由 Linked 開(kāi)發(fā)并且開(kāi)源的一套分布式的流平臺(tái),它類...
閱讀 1870·2021-09-22 15:45
閱讀 1654·2019-08-30 15:55
閱讀 1838·2019-08-29 11:16
閱讀 3315·2019-08-26 11:44
閱讀 714·2019-08-23 17:58
閱讀 2704·2019-08-23 12:25
閱讀 1640·2019-08-22 17:15
閱讀 3618·2019-08-22 16:09