摘要:前言內(nèi)存數(shù)據(jù)結(jié)構(gòu),類似于的通道,底層基于共享內(nèi)存互斥鎖實(shí)現(xiàn),可實(shí)現(xiàn)用戶態(tài)的高性能內(nèi)存隊(duì)列。是當(dāng)前隊(duì)列占用的內(nèi)存大小,用來指定是否使用共享內(nèi)存是否使用鎖是否使用通知。
前言
內(nèi)存數(shù)據(jù)結(jié)構(gòu) Channel,類似于 Go 的 chan 通道,底層基于 共享內(nèi)存 + Mutex 互斥鎖實(shí)現(xiàn),可實(shí)現(xiàn)用戶態(tài)的高性能內(nèi)存隊(duì)列。Channel 可用于多進(jìn)程環(huán)境下,底層在讀取寫入時會自動加鎖,應(yīng)用層不需要擔(dān)心數(shù)據(jù)同步問題。
channel 在之前的文章中出現(xiàn)過,當(dāng)時用于 manager 和 worker 進(jìn)程之間進(jìn)行通信的重要數(shù)據(jù)結(jié)構(gòu),主要用于 worker 進(jìn)程通知 manager 進(jìn)程重啟相應(yīng) worker 進(jìn)程。
channel 數(shù)據(jù)結(jié)構(gòu)channel 數(shù)據(jù)結(jié)構(gòu)的屬性比較多,head 是隊(duì)列的頭部位置,tail 是隊(duì)列的尾部位置,size 是申請的隊(duì)列內(nèi)存大小,maxlen 是每個隊(duì)列元素的大小,head_tag 和 tail_tag 用于指定隊(duì)列的頭尾是否循環(huán)被重置回頭部。bytes 是當(dāng)前 channel 隊(duì)列占用的內(nèi)存大小,flag 用來指定是否使用共享內(nèi)存、是否使用鎖、是否使用 pipe 通知。mem 是 channel 的內(nèi)存首地址。
typedef struct _swChannel_item { int length; char data[0]; } swChannel_item; typedef struct _swChannel { off_t head; off_t tail; size_t size; char head_tag; char tail_tag; int num; int max_num; /** * Data length, excluding structure */ size_t bytes; int flag; int maxlen; /** * memory point */ void *mem; swLock lock; swPipe notify_fd; } swChannel;channel 隊(duì)列 swChannel_new 創(chuàng)建隊(duì)列
創(chuàng)建隊(duì)列就是根據(jù) flags 來初始化隊(duì)列的各個屬性,值得注意的是 maxlen,當(dāng)申請內(nèi)存的時候會多申請這些內(nèi)存,用來防止內(nèi)存越界。
swChannel* swChannel_new(size_t size, int maxlen, int flags) { assert(size >= maxlen); int ret; void *mem; //use shared memory if (flags & SW_CHAN_SHM) { mem = sw_shm_malloc(size + sizeof(swChannel) + maxlen); } else { mem = sw_malloc(size + sizeof(swChannel) + maxlen); } if (mem == NULL) { swWarn("swChannel_create: malloc(%ld) failed.", size); return NULL; } swChannel *object = mem; mem += sizeof(swChannel); bzero(object, sizeof(swChannel)); //overflow space object->size = size; object->mem = mem; object->maxlen = maxlen; object->flag = flags; //use lock if (flags & SW_CHAN_LOCK) { //init lock if (swMutex_create(&object->lock, 1) < 0) { swWarn("mutex init failed."); return NULL; } } //use notify if (flags & SW_CHAN_NOTIFY) { ret = swPipeNotify_auto(&object->notify_fd, 1, 1); if (ret < 0) { swWarn("notify_fd init failed."); return NULL; } } return object; }swChannel_push 入隊(duì)
入隊(duì)的時候,首先要先加鎖,然后調(diào)用 swChannel_in。
swChannel_in 邏輯很簡單,向隊(duì)列的尾部推送數(shù)據(jù),如果當(dāng)前 channel 尾部被重置,head 還未被重置,就需要先判斷剩余的內(nèi)存是否夠用。
如果當(dāng)前 channel 尾部未被重置,就可以放心的追加元素,因?yàn)?object->size 和真正申請的內(nèi)存之前還有 maxlen 可以富余,不必考慮內(nèi)存越界的問題。
int swChannel_push(swChannel *object, void *in, int data_length) { assert(object->flag & SW_CHAN_LOCK); object->lock.lock(&object->lock); int ret = swChannel_in(object, in, data_length); object->lock.unlock(&object->lock); return ret; } #define swChannel_full(ch) ((ch->head == ch->tail && ch->tail_tag != ch->head_tag) || (ch->bytes + sizeof(int) * ch->num == ch->size)) int swChannel_in(swChannel *object, void *in, int data_length) { assert(data_length <= object->maxlen); if (swChannel_full(object)) { return SW_ERR; } swChannel_item *item; int msize = sizeof(item->length) + data_length; if (object->tail < object->head) { //no enough memory space if ((object->head - object->tail) < msize) { return SW_ERR; } item = object->mem + object->tail; object->tail += msize; } else { item = object->mem + object->tail; object->tail += msize; if (object->tail >= object->size) { object->tail = 0; object->tail_tag = 1 - object->tail_tag; } } object->num++; object->bytes += data_length; item->length = data_length; memcpy(item->data, in, data_length); return SW_OK; }swChannel_push 出隊(duì)
swChannel_push 出隊(duì)的邏輯比較簡單,獲取隊(duì)列頭部位置,然后拷貝首部數(shù)據(jù)即可。當(dāng) head 超過 size 值,即可重置 head。
int swChannel_pop(swChannel *object, void *out, int buffer_length) { assert(object->flag & SW_CHAN_LOCK); object->lock.lock(&object->lock); int n = swChannel_out(object, out, buffer_length); object->lock.unlock(&object->lock); return n; } #define swChannel_empty(ch) (ch->num == 0) int swChannel_out(swChannel *object, void *out, int buffer_length) { if (swChannel_empty(object)) { return SW_ERR; } swChannel_item *item = object->mem + object->head; assert(buffer_length >= item->length); memcpy(out, item->data, item->length); object->head += (item->length + sizeof(item->length)); if (object->head >= object->size) { object->head = 0; object->head_tag = 1 - object->head_tag; } object->num--; object->bytes -= item->length; return item->length; }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/30852.html
摘要:消息隊(duì)列的接受消息隊(duì)列的接受是利用函數(shù),其中是消息的類型,該參數(shù)會取出指定類型的消息,如果設(shè)定的是爭搶模式,該值會統(tǒng)一為,否則該值就是消息發(fā)送目的的。環(huán)形隊(duì)列的消息入隊(duì)發(fā)送消息首先要確定環(huán)形隊(duì)列的隊(duì)尾。取模操作可以優(yōu)化 前言 swoole 的底層隊(duì)列有兩種:進(jìn)程間通信 IPC 的消息隊(duì)列 swMsgQueue,與環(huán)形隊(duì)列 swRingQueue。IPC 的消息隊(duì)列用于 task_wor...
摘要:修復(fù)添加超過萬個以上定時器時發(fā)生崩潰的問題增加模塊,下高性能序列化庫修復(fù)監(jiān)聽端口設(shè)置無效的問題等。線程來處理網(wǎng)絡(luò)事件輪詢,讀取數(shù)據(jù)。當(dāng)?shù)娜挝帐殖晒α艘院螅蛇@個線程將連接成功的消息告訴進(jìn)程,再由進(jìn)程轉(zhuǎn)交給進(jìn)程。此時進(jìn)程觸發(fā)事件。 本文示例代碼詳見:https://github.com/52fhy/swoo...。 簡介 Swoole是一個PHP擴(kuò)展,提供了PHP語言的異步多線程服務(wù)器...
摘要:表示的是兩個,當(dāng)其中任意一個計算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)常可見它的使用,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
閱讀 1656·2019-08-30 15:55
閱讀 978·2019-08-30 15:44
閱讀 871·2019-08-30 10:48
閱讀 2041·2019-08-29 13:42
閱讀 3188·2019-08-29 11:16
閱讀 1263·2019-08-29 11:09
閱讀 2059·2019-08-26 11:46
閱讀 619·2019-08-26 11:44