摘要:簡介是語言編寫的,開源的分布式消息隊列中間件,其設計的目的是用來大規模地處理每天數以十億計級別的消息。
簡介
NSQ是Go語言編寫的,開源的分布式消息隊列中間件,其設計的目的是用來大規模地處理每天數以十億計級別的消息。NSQ 具有分布式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規模生成環境下應用的產品。
NSQ在國內公司用的很少,在使用當中愈發的覺得驚喜,比如他的簡單易用、部署快捷,再比如之前比較困擾的 延時定時消息,發現nsq 也支持,官方文檔比較全,咨詢問題時回復也非常的耐心和即時,所以我覺得有必要發布一篇文章來介紹下nsq,惠及大眾。
nsq 有三個必要的組建nsqd、nsqlookupd、nsqadmin 其中nsqd 和 nsqlookup是必須部署的 下面我們一一介紹。
nsqd :
負責接收消息,存儲隊列和將消息發送給客戶端,nsqd 可以多機器部署,當你使用客戶端向一個topic發送消息時,可以配置多個nsqd地址,消息會隨機的分配到各個nsqd上,nsqd優先把消息存儲到內存channel中,當內存channel滿了之后,則把消息寫到磁盤文件中。他監聽了兩個tcp端口,一個用來服務客戶端,一個用來提供http的接口 ,nsqd 啟動時置頂下nsqlookupd地址即可:
nsqd –lookupd-tcp-address=127.0.0.1:4160
也可以指定端口 與數據目錄
nsqd –lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4154 -http-address=”0.0.0.0:4155″ –data-path=/data/nsqdata
其他配置項可詳見官網
nsqlookupd:
主要負責服務發現 負責nsqd的心跳、狀態監測,給客戶端、nsqadmin提供nsqd地址與狀態
nsqadmin:
nsqadmin是一個web管理界面 啟動方式如下:
nsqadmin –lookupd-http-address=127.0.0.1:4161
channel詳情頁示例圖如下 ,empty可以清空當前channel的信息,delete刪除當前channel, pause是暫停消息消費。
圖中也有幾個比較重要的參數 depth當前的積壓量,in-flight代表已經投遞還未消費掉的消息,deferred是未消費的定時(延時)消息數,ready count比較重要,go的客戶端是通過設置max-in-flight 除以客戶端連接數得到的,代表一次推給客戶端多少條消息,或者客戶端準備一次性接受多少條消息,謹慎設置其值,因為可能造成服務器壓力,如果消費能力比較弱,rdy建議設置的低一點比如3
Topic 和 Channel
其實nsqd相當于kafka當中的分區,channel和consumers客戶端的多個連接 相當于kafka的消費組,但nsq比kafka使用方式便捷概念上更容易理解
拋開與kafka的對比,nsq的topic 可以設置多個channel,因為有可能有多個業務方需要定值topic的消息,這樣互不影響,
當然一個消息會發送topic下的所有channel,然后會分配到不同客戶端的連接上,如下圖。
這篇文章主要介紹nsq的使用,源碼就不展開講,如果有興趣的同學多的話 過幾天我會再開一篇專門敘述nsq的源碼與分析。
這里提下延時消息:
nsq支持延時消息的投遞,比如我想這條消息5分鐘之后才被投遞出去被客戶端消費,較于普通的消息投遞,多了個毫秒數,默認支持最大的毫秒數為3600000毫秒也就是60分鐘,不過這個值可以在nsqd 啟動的時候 用 -max-req-timeout參數修改最大值。
延時消息可用于以下場景,比如一個訂單超過30分鐘未付款,修改其狀態 或者給客戶發短信提醒,比如之前看到的滴滴打車訂單完成后 一定時間內未評價的可以未其設置默認值,再比如用戶的積分過期,等等場景避免了全表掃描,異步處理,kafka不支持延時消息的投遞,目前知道支持的有rabbitmq rocketmq,但是rabbitmq 有坑,有可能會超時投遞,而rocketmq只有阿里云付費版支持的比較好。
nsq延時消息的實現是用最小堆算法完成,作者繼承實現heap的一系類接口,專門寫了一個pqueque最小堆的優先隊列,在internal/pequeque 目錄可以看到相關實現,pub的時候如果chanMsg.deferred != 0則會調用channel.PutMessageDeferred方法,最終會調用繼承了go heap接口的pqueque.push方法
延時消息的處理 和普通消息一樣都是 nsqd/protocol_v2.go下messagePump 中把消息發送給客戶端 然后在queueScanWorker中分別處理,pop是peekAndShift方法中,拿當前時間 和 deferred[0]對比如果大于 就彈出發送給客戶端 如下代碼:
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: now := time.Now().UnixNano() dirty := false if c.processInFlightQueue(now) { dirty = true } if c.processDeferredQueue(now) { dirty = true } responseCh <- dirty case <-closeCh: return } } } func (c *Channel) processDeferredQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() if c.Exiting() { return false } dirty := false for { c.deferredMutex.Lock() item, _ := c.deferredPQ.PeekAndShift(t) c.deferredMutex.Unlock() if item == nil { goto exit } dirty = true msg := item.Value.(*Message) _, err := c.popDeferredMessage(msg.ID) if err != nil { goto exit } c.put(msg) } exit: return dirty } func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) { if pq.Len() == 0 { return nil, 0 } item := (*pq)[0] if item.Priority > max { return nil, item.Priority - max } heap.Remove(pq, 0) return item, 0 }
php和go的客戶端的使用
官網客戶端鏈接:Client Libraries php客戶端之前官網有一個5年前比較老的客戶端,已經沒人維護 甚至無法運行,于是我貢獻了一個php72擴展版本 php-nsq,速度塊了近三倍,正在逐步完善,支持各種配置與特性,目前已被官網收納,簡單介紹下使用 順便求下star
php-nsq pub :
$nsqd_addr = array( "127.0.0.1:4150", "127.0.0.1:4154" ); $nsq = new Nsq(); $is_true = $nsq->connect_nsqd($nsqd_addr); for($i = 0; $i < 20; $i++){ $nsq->publish("test", "nihao"); }
php-nsq 延時pub :
參數 僅僅多一個毫秒參數,so easy!
$deferred = new Nsq(); $isTrue = $deferred->connectNsqd($nsqdAddr); for($i = 0; $i < 20; $i++){ $deferred->deferredPublish("test", "message daly", 3000); // 第三值默認范圍 millisecond default : [0 < millisecond < 3600000] ,可以更改 上面已提到 }
php-nsq sub :
拋異常消息可以自動重試,重試時間可以有retry_delay_time設定,多少時間后再次接收被重試的消息
$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd tcp addr $nsq = new Nsq(); $config = array( "topic" => "test", "channel" => "struggle", "rdy" => 2, //optional , default 1 "connect_num" => 1, //optional , default 1 "retry_delay_time" => 5000, //optional, default 0 , after 5000 msec, message will be retried ); $nsq->subscribe($nsq_lookupd, $config, function($msg){ echo $msg->payload; echo $msg->attempts; echo $msg->message_id; echo $msg->timestamp; });
go client pub
package main import ( "github.com/nsqio/go-nsq" ) var producer *nsq.Producer func main() { nsqd := "127.0.0.1:4150" producer, err := nsq.NewProducer(nsqd, nsq.NewConfig()) producer.Publish("test", []byte("nihao")) if err != nil { panic(err) } }
go client sub
package main import ( "fmt" "sync" "github.com/nsqio/go-nsq" ) type NSQHandler struct { } func (this *NSQHandler) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } func testNSQ() { waiter := sync.WaitGroup{} waiter.Add(1) go func() { defer waiter.Done() config:=nsq.NewConfig() config.MaxInFlight=9 //建立多個連接 for i := 0; i<10; i++ { consumer, err := nsq.NewConsumer("test", "struggle", config) if nil != err { fmt.Println("err", err) return } consumer.AddHandler(&NSQHandler{}) err = consumer.ConnectToNSQD("127.0.0.1:4150") if nil != err { fmt.Println("err", err) return } } select{} }() waiter.Wait() } func main() { testNSQ(); }
同時此篇文章 更新到了自己博客
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/28127.html
摘要:因為公司在業務中需要用到消息隊列產品,我選用了基于開源的產品,記錄下我遇到的那些部署中的坑。 因為公司在業務中需要用到消息隊列產品,我選用了基于golang開源的nsq產品,記錄下我遇到的那些部署中的坑。首先安裝nsq,這個沒什么好說的,我是直接在官網下載bin文件,直接部署的,環境是centOS 6.7,安裝在/opt/nsq-0.3.7.linux-amd64.go1.6目錄下;其...
摘要:一條消息除了基本的元數據之外,其余內容為消息體。消息的元數據主要包括了消息在服務端產生時的時間戳,服務端對于該消息的下發次數,消息。作為的消費者,從消費消息后通過進行處理。 在系列文章前面幾篇中,介紹了 NSQ 改造的過程和幾個基礎特性,本文中我們繼續介紹幾個高級特性及其使用場景,這些都是結合有贊業務場景總結提煉出來的重要功能。 NSQ 拓展消息格式的設計 有贊中間件在 NSQ 中引入...
摘要:并且注冊回調函數。在重寫的回調函數中,實現了的訂閱功能消息的處理簡單封裝了重復消息的判斷沒有消費消息的重新投遞引入就是構造方法引入的實例化同時,重寫的方法。所以當執行腳本的時候,也就是啟動了對應的服務。當然更好的是使用協程。 集合 swoole 的框架設計為了減少理解度,我盡量的從源頭開始引入 1. nsq 案例中是使用 swoole 結合一個php 框架實現的是 NSQ 訂閱功能。...
摘要:業務對賬平臺的核心目的,就是及時發現類似問題,并及時修復。這對對賬平臺的吞吐量造成了挑戰。五健康度對賬中心可以拿到業務系統及其所在整個鏈路的數據一致性信息。在分布式環境下,沒有人能回避數據一致性問題,我們對此充滿著敬畏。 一、引子 根據CAP原理,分布式系統無法在保證了可用性(Availability)和分區容忍性(Partition)之后,繼續保證一致性(Consistency)。我...
閱讀 3174·2023-04-25 19:09
閱讀 3885·2021-10-22 09:54
閱讀 1757·2021-09-29 09:35
閱讀 2914·2021-09-08 09:45
閱讀 2255·2021-09-06 15:00
閱讀 2773·2019-08-29 15:32
閱讀 1038·2019-08-28 18:30
閱讀 375·2019-08-26 13:43