摘要:實現發布發送消息到主題是啥我的博客有寫這個東西傳送門想要實現需要使用到中的函數函數是什么此次使用的是網上開源案例其中使用的是系列函數什么是系列函數大概意思是正如你所指出的,是核心內置的,始終可用,而套接字是很少包含的擴展的一部分。
php實現mqtt發布/發送 消息到主題
mqtt是啥?我的博客有寫這個東西:傳送門
php想要實現mqtt需要使用到php中的socket函數;
socket函數是什么?
此次使用的是網上開源mqtt案例:其中使用的是 stream_socket_xxxx 系列函數
什么是stream_socket_xxxx系列函數
大概意思是:
正如你所指出的,"stream"是PHP核心(內置的,始終可用),而"套接字"是很少包含的擴展的一部分。除此之外,它們幾乎完全相同。您可以同時使用TCP和UDP兩種流,也可以使用阻塞和非阻塞模式,這些模式涵蓋了所有用例的99%。MQTT類代碼:我能想到的唯一常見的例外是ICMP。例如,"ping"。但是,看起來目前還沒有一種安全的方式來從PHP執行ICMP。這種調用需要通過套接字擴展來實現SOCK_RAW,這需要執行“root”權限。此外,并非所有路由器都會在TCP,UDP和ICMP之外路由其他數據包類型。這限制了套接字擴展的實用性。
/* phpMQTT */ class Mqtt { private $socket; /* holds the socket */ private $msgid = 1; /* counter for message id */ public $keepalive = 10; /* default keepalive timmer */ public $timesinceping; /* host unix time, used to detect disconects */ public $topics = array(); /* used to store currently subscribed topics */ public $debug = false; /* should output debug messages */ public $address; /* broker address */ public $port; /* broker port */ public $clientid; /* client id sent to brocker */ public $will; /* stores the will of the client */ private $username; /* stores username */ private $password; /* stores password */ public $cafile; function __construct($address, $port, $clientid, $cafile = NULL){ $this->broker($address, $port, $clientid, $cafile); } /* sets the broker details */ function broker($address, $port, $clientid, $cafile = NULL){ $this->address = $address; $this->port = $port; $this->clientid = $clientid; $this->cafile = $cafile; } function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){ while($this->connect($clean, $will, $username, $password)==false){ sleep(10); } return true; } /* connects to the broker inputs: $clean: should the client send a clean session flag */ function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){ if($will) $this->will = $will; if($username) $this->username = $username; if($password) $this->password = $password; if ($this->cafile) { $socketContext = stream_context_create(["ssl" => [ "verify_peer_name" => true, "cafile" => $this->cafile ]]); $this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext); } else { $this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT); } if (!$this->socket ) { if($this->debug) error_log("stream_socket_create() $errno, $errstr "); return false; } stream_set_timeout($this->socket, 5); stream_set_blocking($this->socket, 0); $i = 0; $buffer = ""; $buffer .= chr(0x00); $i++; $buffer .= chr(0x06); $i++; $buffer .= chr(0x4d); $i++; $buffer .= chr(0x51); $i++; $buffer .= chr(0x49); $i++; $buffer .= chr(0x73); $i++; $buffer .= chr(0x64); $i++; $buffer .= chr(0x70); $i++; $buffer .= chr(0x03); $i++; //No Will $var = 0; if($clean) $var+=2; //Add will info to header if($this->will != NULL){ $var += 4; // Set will flag $var += ($this->will["qos"] << 3); //Set will qos if($this->will["retain"]) $var += 32; //Set will retain } if($this->username != NULL) $var += 128; //Add username to header if($this->password != NULL) $var += 64; //Add password to header $buffer .= chr($var); $i++; //Keep alive $buffer .= chr($this->keepalive >> 8); $i++; $buffer .= chr($this->keepalive & 0xff); $i++; $buffer .= $this->strwritestring($this->clientid,$i); //Adding will to payload if($this->will != NULL){ $buffer .= $this->strwritestring($this->will["topic"],$i); $buffer .= $this->strwritestring($this->will["content"],$i); } if($this->username) $buffer .= $this->strwritestring($this->username,$i); if($this->password) $buffer .= $this->strwritestring($this->password,$i); $head = " "; $head{0} = chr(0x10); $head{1} = chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer); $string = $this->read(4); if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){ if($this->debug) echo "Connected to Broker "; }else{ error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x) ", ord($string{0}),ord($string{3}))); return false; } $this->timesinceping = time(); return true; } /* read: reads in so many bytes */ function read($int = 8192, $nb = false){ // print_r(socket_get_status($this->socket)); $string=""; $togo = $int; if($nb){ return fread($this->socket, $togo); } while (!feof($this->socket) && $togo>0) { $fread = fread($this->socket, $togo); $string .= $fread; $togo = $int - strlen($string); } return $string; } /* subscribe: subscribes to topics */ function subscribe($topics, $qos = 0){ $i = 0; $buffer = ""; $id = $this->msgid; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; foreach($topics as $key => $topic){ $buffer .= $this->strwritestring($key,$i); $buffer .= chr($topic["qos"]); $i++; $this->topics[$key] = $topic; } $cmd = 0x80; //$qos $cmd += ($qos << 1); $head = chr($cmd); $head .= chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer, $i); $string = $this->read(2); $bytes = ord(substr($string,1,1)); $string = $this->read($bytes); } /* ping: sends a keep alive ping */ function ping(){ $head = " "; $head = chr(0xc0); $head .= chr(0x00); fwrite($this->socket, $head, 2); if($this->debug) echo "ping sent "; } /* disconnect: sends a proper disconect cmd */ function disconnect(){ $head = " "; $head{0} = chr(0xe0); $head{1} = chr(0x00); fwrite($this->socket, $head, 2); } /* close: sends a proper disconect, then closes the socket */ function close(){ $this->disconnect(); stream_socket_shutdown($this->socket, STREAM_SHUT_WR); } /* publish: publishes $content on a $topic */ function publish($topic, $content, $qos = 0, $retain = 0){ $i = 0; $buffer = ""; $buffer .= $this->strwritestring($topic,$i); //$buffer .= $this->strwritestring($content,$i); if($qos){ $id = $this->msgid++; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; } $buffer .= $content; $i+=strlen($content); $head = " "; $cmd = 0x30; if($qos) $cmd += $qos << 1; if($retain) $cmd += 1; $head{0} = chr($cmd); $head .= $this->setmsglength($i); fwrite($this->socket, $head, strlen($head)); fwrite($this->socket, $buffer, $i); } /* message: processes a recieved topic */ function message($msg){ $tlen = (ord($msg{0})<<8) + ord($msg{1}); $topic = substr($msg,2,$tlen); $msg = substr($msg,($tlen+2)); $found = 0; foreach($this->topics as $key=>$top){ if( preg_match("/^".str_replace("#",".*", str_replace("+","[^/]*", str_replace("/","/", str_replace("$","$", $key))))."$/",$topic) ){ if(is_callable($top["function"])){ call_user_func($top["function"],$topic,$msg); $found = 1; } } } if($this->debug && !$found) echo "msg recieved but no match in subscriptions "; } /* proc: the processing loop for an "allways on" client set true when you are doing other stuff in the loop good for watching something else at the same time */ function proc( $loop = true){ if(1){ $sockets = array($this->socket); $w = $e = NULL; $cmd = 0; //$byte = fgetc($this->socket); if(feof($this->socket)){ if($this->debug) echo "eof receive going to reconnect for good measure "; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } $byte = $this->read(1, true); if(!strlen($byte)){ if($loop){ usleep(100000); } }else{ $cmd = (int)(ord($byte)/16); if($this->debug) echo "Recevid: $cmd "; $multiplier = 1; $value = 0; do{ $digit = ord($this->read(1)); $value += ($digit & 127) * $multiplier; $multiplier *= 128; }while (($digit & 128) != 0); if($this->debug) echo "Fetching: $value "; if($value) $string = $this->read($value); if($cmd){ switch($cmd){ case 3: $this->message($string); break; } $this->timesinceping = time(); } } if($this->timesinceping < (time() - $this->keepalive )){ if($this->debug) echo "not found something so ping "; $this->ping(); } if($this->timesinceping<(time()-($this->keepalive*2))){ if($this->debug) echo "not seen a package in a while, disconnecting "; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } } return 1; } /* getmsglength: */ function getmsglength(&$msg, &$i){ $multiplier = 1; $value = 0 ; do{ $digit = ord($msg{$i}); $value += ($digit & 127) * $multiplier; $multiplier *= 128; $i++; }while (($digit & 128) != 0); return $value; } /* setmsglength: */ function setmsglength($len){ $string = ""; do{ $digit = $len % 128; $len = $len >> 7; // if there are more digits to encode, set the top bit of this digit if ( $len > 0 ) $digit = ($digit | 0x80); $string .= chr($digit); }while ( $len > 0 ); return $string; } /* strwritestring: writes a string to a buffer */ function strwritestring($str, &$i){ $ret = " "; $len = strlen($str); $msb = $len >> 8; $lsb = $len % 256; $ret = chr($msb); $ret .= chr($lsb); $ret .= $str; $i += ($len+2); return $ret; } function printstr($string){ $strlen = strlen($string); for($j=0;$j<$strlen;$j++){ $num = ord($string{$j}); if($num > 31) $chr = $string{$j}; else $chr = " "; printf("%4d: %08b : 0x%02x : %s ",$j,$num,$num,$chr); } } }實現部分 發送到主題
// 發送給訂閱號信息,創建socket,無sam隊列 $server = "127.0.0.1"; // 服務代理地址(mqtt服務端地址) $port = 1883; // 通信端口 $username = ""; // 用戶名(如果需要) $password = ""; // 密碼(如果需要 $client_id = "clientx9293670xxctr"; // 設置你的連接客戶端id $mqtt = new Mqtt($server, $port, $client_id); //實例化MQTT類 if ($mqtt->connect(true, NULL, $username, $password)) { //如果創建鏈接成功 $mqtt->publish("xxx3809293670ctr", "setr=3xxxxxxxxx", 0); // 發送到 xxx3809293670ctr 的主題 一個信息 內容為 setr=3xxxxxxxxx Qos 為 0 $mqtt->close(); //發送后關閉鏈接 } else { echo "Time out! "; }訂閱主題
/*// 訂閱信息,接收一個信息后退出 $server = "127.0.0.1"; // 服務代理地址(mqtt服務端地址) $port = 1883; // 通信端口 $username = ""; // 用戶名(如果需要) $password = ""; // 密碼(如果需要 $client_id = "clientx9293670xxctr"; // 設置你的連接客戶端id $mqtt = new Mqtt($server, $port, $client_id); if(!$mqtt->connect(true, NULL, $username, $password)) { //鏈接不成功再重復執行監聽連接 exit(1); } $topics["SN69143809293670state"] = array("qos" => 0, "function" => "procmsg"); // 訂閱主題為 SN69143809293670state qos為0 $mqtt->subscribe($topics, 0); while($mqtt->proc()){ } //死循環監聽 $mqtt->close(); function procmsg($topic, $msg){ //信息回調函數 打印信息 echo "Msg Recieved: " . date("r") . " "; echo "Topic: {$topic} "; echo " $msg "; $xxx = json_decode($msg); var_dump($xxxxxx->aa); die; }
這是php實現方法,如果用php做發送端還是不錯的.但是
我被這個圖片打擊了,區塊鏈應用還真提莫的是js寫起來跟簡單;
我最終寫出的mqtt api 使用的是node;為什么?
node.js實現mqtt
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/28481.html
摘要:,消息隊列遙測傳輸是開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。會發生消息丟失或重復。只有一次,確保消息到達一次。此外,國內很多企業都廣泛使用作為手機客戶端與服務器端推送消息的協議。 前幾天寫了一下MQTT協議實現推送數據傳輸,所以我會不定期的更新一下關注MQTT的知識。 MQTT: MQTT(Message Queuing Telemetry Transport,消息隊列...
摘要:協議版本介紹互聯網的基礎網絡協議是協議消息隊列遙測傳輸是基于協議棧而構建的已成為通信的標準為什么選擇有多好多好多么牛逼我就不說了說的再多不如一個一個試試完了做比對剩下的那個就是要選擇的實在不想這樣搞技術就跟著一線走發布和訂閱模型協議在網絡中 mqtt 協議版本: 3.1.1 MQTT 介紹 互聯網的基礎網絡協議是 TCP/IP協議. MQTT(消息隊列遙測傳輸)是基于 TCP/IP 協...
摘要:時間就是金錢,效率就是生命本教程助力開發者使用協議快速產品化。摘要借助具備及聯網功能的,快速部署到客戶產品上,助力開發,縮短開發周期,快速實現產品商業化。 時間就是金錢,效率就是生命 本教程助力開發者使用MQTT協議快速產品化。 摘要 借助具備MQTT及聯網功能的DTU,快速部署到客戶產品...
閱讀 1624·2021-11-16 11:45
閱讀 2555·2021-09-29 09:48
閱讀 3320·2021-09-07 10:26
閱讀 1848·2021-08-16 10:50
閱讀 1878·2019-08-30 15:44
閱讀 2706·2019-08-28 18:03
閱讀 1907·2019-08-27 10:54
閱讀 1832·2019-08-26 14:01