摘要:對(duì)象的創(chuàng)建在中,最為高效的機(jī)制就是。該數(shù)據(jù)結(jié)構(gòu)中是的,用于在函數(shù)接受就緒的事件。為了能夠更為簡(jiǎn)便在調(diào)用后獲取的類型,并不會(huì)僅僅向函數(shù)添加,而是會(huì)添加類型,該數(shù)據(jù)結(jié)構(gòu)中包含文件描述符和文件類型。
Epoll 對(duì)象的創(chuàng)建
在 linux 中,最為高效的 reactor 機(jī)制就是 epoll。swReactor 的 object 會(huì)存儲(chǔ) epoll 的對(duì)象 swReactorEpoll_s。該數(shù)據(jù)結(jié)構(gòu)中 epfd 是 epoll 的 id,events 用于在 epoll_wait 函數(shù)接受就緒的事件。
該函數(shù)最重要的是 epoll_create,該函數(shù)會(huì)創(chuàng)建 epoll 對(duì)象
typedef struct swReactorEpoll_s swReactorEpoll; struct swReactorEpoll_s { int epfd; struct epoll_event *events; }; int swReactorEpoll_create(swReactor *reactor, int max_event_num) { //create reactor object swReactorEpoll *reactor_object = sw_malloc(sizeof(swReactorEpoll)); if (reactor_object == NULL) { swWarn("malloc[0] failed."); return SW_ERR; } bzero(reactor_object, sizeof(swReactorEpoll)); reactor->object = reactor_object; reactor->max_event_num = max_event_num; reactor_object->events = sw_calloc(max_event_num, sizeof(struct epoll_event)); if (reactor_object->events == NULL) { swWarn("malloc[1] failed."); sw_free(reactor_object); return SW_ERR; } //epoll create reactor_object->epfd = epoll_create(512); if (reactor_object->epfd < 0) { swWarn("epoll_create failed. Error: %s[%d]", strerror(errno), errno); sw_free(reactor_object); return SW_ERR; } //binding method reactor->add = swReactorEpoll_add; reactor->set = swReactorEpoll_set; reactor->del = swReactorEpoll_del; reactor->wait = swReactorEpoll_wait; reactor->free = swReactorEpoll_free; return SW_OK; }Epoll 添加監(jiān)聽(tīng)
swReactorEpoll_event_set 函數(shù)用于轉(zhuǎn)化可讀(SW_EVENT_READ)、可寫(xiě)(SW_EVENT_WRITE )的狀態(tài)為 epoll 函數(shù)可用的 EPOLLIN、EPOLLOUT、EPOLLERR
static sw_inline int swReactorEpoll_event_set(int fdtype) { uint32_t flag = 0; if (swReactor_event_read(fdtype)) { flag |= EPOLLIN; } if (swReactor_event_write(fdtype)) { flag |= EPOLLOUT; } if (swReactor_event_error(fdtype)) { //flag |= (EPOLLRDHUP); flag |= (EPOLLRDHUP | EPOLLHUP | EPOLLERR); } return flag; }
swReactorEpoll_add 函數(shù)用于為 reactor 添加新的文件描述符進(jìn)行監(jiān)控
添加 fd 最為重要的的是利用 epoll_ctl 函數(shù)的 EPOLL_CTL_ADD 命令。為了能夠更為簡(jiǎn)便在調(diào)用 epoll_wait 后獲取 fd 的類型,并不會(huì)僅僅向 epoll_ctl 函數(shù)添加 fd,而是會(huì)添加 swFd 類型,該數(shù)據(jù)結(jié)構(gòu)中包含文件描述符和文件類型。
swReactor_add 函數(shù)用于更新 reactor->socket_list 的 fdtype 與 events
最后需要自增 event_num 的數(shù)值
typedef struct _swFd { uint32_t fd; uint32_t fdtype; } swFd; static int swReactorEpoll_add(swReactor *reactor, int fd, int fdtype) { swReactorEpoll *object = reactor->object; struct epoll_event e; swFd fd_; bzero(&e, sizeof(struct epoll_event)); fd_.fd = fd; fd_.fdtype = swReactor_fdtype(fdtype); e.events = swReactorEpoll_event_set(fdtype); swReactor_add(reactor, fd, fdtype); memcpy(&(e.data.u64), &fd_, sizeof(fd_)); if (epoll_ctl(object->epfd, EPOLL_CTL_ADD, fd, &e) < 0) { swSysError("add events[fd=%d#%d, type=%d, events=%d] failed.", fd, reactor->id, fd_.fdtype, e.events); swReactor_del(reactor, fd); return SW_ERR; } swTraceLog(SW_TRACE_EVENT, "add event[reactor_id=%d, fd=%d, events=%d]", reactor->id, fd, swReactor_events(fdtype)); reactor->event_num++; return SW_OK; } static sw_inline void swReactor_add(swReactor *reactor, int fd, int type) { swConnection *socket = swReactor_get(reactor, fd); socket->fdtype = swReactor_fdtype(type); socket->events = swReactor_events(type); socket->removed = 0; }Epoll 修改監(jiān)聽(tīng)
修改監(jiān)聽(tīng)主要調(diào)用 epoll_ctl 的 EPOLL_CTL_MOD 命令
static int swReactorEpoll_set(swReactor *reactor, int fd, int fdtype) { swReactorEpoll *object = reactor->object; swFd fd_; struct epoll_event e; int ret; bzero(&e, sizeof(struct epoll_event)); e.events = swReactorEpoll_event_set(fdtype); if (e.events & EPOLLOUT) { assert(fd > 2); } fd_.fd = fd; fd_.fdtype = swReactor_fdtype(fdtype); memcpy(&(e.data.u64), &fd_, sizeof(fd_)); ret = epoll_ctl(object->epfd, EPOLL_CTL_MOD, fd, &e); if (ret < 0) { swSysError("reactor#%d->set(fd=%d|type=%d|events=%d) failed.", reactor->id, fd, fd_.fdtype, e.events); return SW_ERR; } swTraceLog(SW_TRACE_EVENT, "set event[reactor_id=%d, fd=%d, events=%d]", reactor->id, fd, swReactor_events(fdtype)); //execute parent method swReactor_set(reactor, fd, fdtype); return SW_OK; }Epoll 刪除監(jiān)聽(tīng)
修改監(jiān)聽(tīng)主要調(diào)用 epoll_ctl 的 EPOLL_CTL_DEL 命令
最后需要更新 event_num
static int swReactorEpoll_del(swReactor *reactor, int fd) { swReactorEpoll *object = reactor->object; if (epoll_ctl(object->epfd, EPOLL_CTL_DEL, fd, NULL) < 0) { swSysError("epoll remove fd[%d#%d] failed.", fd, reactor->id); return SW_ERR; } swTraceLog(SW_TRACE_REACTOR, "remove event[reactor_id=%d|fd=%d]", reactor->id, fd); reactor->event_num = reactor->event_num <= 0 ? 0 : reactor->event_num - 1; swReactor_del(reactor, fd); return SW_OK; }Epoll 監(jiān)聽(tīng)等待就緒
swReactorEpoll_wait 是 reactor 的核心,該函數(shù)最重要的就是調(diào)用 epoll_wait
首先需要通過(guò) timeo 參數(shù)設(shè)置 msec,利用 object->events 設(shè)置 events
epoll_wait 函數(shù)返回之后,如果 n<0,那么需要先檢查 erron,如果是 EINTR,那么說(shuō)明有信號(hào)觸發(fā),此時(shí)需要進(jìn)行信號(hào)的回調(diào)函數(shù),然后再繼續(xù)事件循環(huán)。如果不是 EINTR,那么就要返回錯(cuò)誤,結(jié)束事件循環(huán)
如果 n == 0,一般是由于 epoll_wait 已超時(shí),此時(shí)需要調(diào)用超時(shí)回調(diào)函數(shù)
如果 n > 0,那么就要從 events 中取出已經(jīng)就緒的 swFd 對(duì)象,并利用該對(duì)象的值初始化 event
接下來(lái)就要檢查 events[i].events 的值,來(lái)判斷具體是讀就緒、寫(xiě)就緒還是發(fā)生了錯(cuò)誤,值得注意的是 EPOLLRDHUP 事件,此事件代表著對(duì)端斷開(kāi)連接,這個(gè)是 linux 自從 2.6.17 的新特性
利用 swReactor_getHandle 函數(shù)取出對(duì)應(yīng)的文件描述符類型的事件回調(diào)函數(shù)
事件循環(huán)的最后調(diào)用 onFinish 函數(shù)
如果設(shè)置了 once,說(shuō)明此 reactor 只會(huì)循環(huán)一次,立即退出;否則,繼續(xù)事件循環(huán)
typedef struct _swEvent { int fd; int16_t from_id; uint8_t type; swConnection *socket; } swEvent; static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo) { swEvent event; swReactorEpoll *object = reactor->object; swReactor_handle handle; int i, n, ret, msec; int reactor_id = reactor->id; int epoll_fd = object->epfd; int max_event_num = reactor->max_event_num; struct epoll_event *events = object->events; if (reactor->timeout_msec == 0) { if (timeo == NULL) { reactor->timeout_msec = -1; } else { reactor->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000; } } reactor->start = 1; while (reactor->running > 0) { if (reactor->onBegin != NULL) { reactor->onBegin(reactor); } msec = reactor->timeout_msec; n = epoll_wait(epoll_fd, events, max_event_num, msec); if (n < 0) { if (swReactor_error(reactor) < 0) { swWarn("[Reactor#%d] epoll_wait failed. Error: %s[%d]", reactor_id, strerror(errno), errno); return SW_ERR; } else { continue; } } else if (n == 0) { if (reactor->onTimeout != NULL) { reactor->onTimeout(reactor); } continue; } for (i = 0; i < n; i++) { event.fd = events[i].data.u64; event.from_id = reactor_id; event.type = events[i].data.u64 >> 32; event.socket = swReactor_get(reactor, event.fd); //read if ((events[i].events & EPOLLIN) && !event.socket->removed) { handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type); ret = handle(reactor, &event); if (ret < 0) { swSysError("EPOLLIN handle failed. fd=%d.", event.fd); } } //write if ((events[i].events & EPOLLOUT) && !event.socket->removed) { handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type); ret = handle(reactor, &event); if (ret < 0) { swSysError("EPOLLOUT handle failed. fd=%d.", event.fd); } } //error #ifndef NO_EPOLLRDHUP if ((events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) && !event.socket->removed) #else if ((events[i].events & (EPOLLERR | EPOLLHUP)) && !event.socket->removed) #endif { //ignore ERR and HUP, because event is already processed at IN and OUT handler. if ((events[i].events & EPOLLIN) || (events[i].events & EPOLLOUT)) { continue; } handle = swReactor_getHandle(reactor, SW_EVENT_ERROR, event.type); ret = handle(reactor, &event); if (ret < 0) { swSysError("EPOLLERR handle failed. fd=%d.", event.fd); } } } if (reactor->onFinish != NULL) { reactor->onFinish(reactor); } if (reactor->once) { break; } } return 0; } static sw_inline int swReactor_error(swReactor *reactor) { switch (errno) { case EINTR: if (reactor->singal_no) { swSignal_callback(reactor->singal_no); reactor->singal_no = 0; } return SW_OK; } return SW_ERR; } static sw_inline swReactor_handle swReactor_getHandle(swReactor *reactor, int event_type, int fdtype) { if (event_type == SW_EVENT_WRITE) { return (reactor->write_handle[fdtype] != NULL) ? reactor->write_handle[fdtype] : reactor->handle[SW_FD_WRITE]; } else if (event_type == SW_EVENT_ERROR) { return (reactor->error_handle[fdtype] != NULL) ? reactor->error_handle[fdtype] : reactor->handle[SW_FD_CLOSE]; } return reactor->handle[fdtype]; }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/29217.html
摘要:當(dāng)此時(shí)的套接字不可寫(xiě)的時(shí)候,會(huì)自動(dòng)放入緩沖區(qū)中。當(dāng)大于高水線時(shí),會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。寫(xiě)就緒狀態(tài)當(dāng)監(jiān)控到套接字進(jìn)入了寫(xiě)就緒狀態(tài)時(shí),就會(huì)調(diào)用函數(shù)。如果為,說(shuō)明此時(shí)異步客戶端雖然建立了連接,但是還沒(méi)有調(diào)用回調(diào)函數(shù),因此這時(shí)要調(diào)用函數(shù)。 前言 上一章我們說(shuō)了客戶端的連接 connect,對(duì)于同步客戶端來(lái)說(shuō),連接已經(jīng)建立成功;但是對(duì)于異步客戶端來(lái)說(shuō),此時(shí)可能還在進(jìn)行 DNS 的解析,on...
摘要:新建可以看到,自動(dòng)采用包長(zhǎng)檢測(cè)的方法該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)值得注意的是第三個(gè)參數(shù)代表是否異步。發(fā)送數(shù)據(jù)函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲(chǔ)在,等著寫(xiě)事件就緒之后調(diào)用發(fā)送數(shù)據(jù)。 swReactorThread_dispatch 發(fā)送數(shù)據(jù) reactor 線程會(huì)通過(guò) swReactorThread_dispatch 發(fā)送數(shù)據(jù),當(dāng)采用 stream 發(fā)送數(shù)據(jù)的時(shí)候,會(huì)調(diào)用 sw...
前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對(duì)象的首地址,ptr 是擁有 Reactor 對(duì)象的類的指針, event_nu...
摘要:并沒(méi)有使用命名管道。的創(chuàng)建創(chuàng)建匿名管道就是調(diào)用函數(shù),程序自動(dòng)設(shè)置管道為非阻塞式。函數(shù)同樣的獲取管道文件描述符根據(jù)來(lái)決定。模塊負(fù)責(zé)為進(jìn)程創(chuàng)建與。當(dāng)線程啟動(dòng)的時(shí)候,會(huì)將加入的監(jiān)控當(dāng)中。 前言 管道是進(jìn)程間通信 IPC 的最基礎(chǔ)的方式,管道有兩種類型:命名管道和匿名管道,匿名管道專門用于具有血緣關(guān)系的進(jìn)程之間,完成數(shù)據(jù)傳遞,命名管道可以用于任何兩個(gè)進(jìn)程之間。swoole 中的管道都是匿名管道...
摘要:是緩存區(qū)高水位線,達(dá)到了說(shuō)明緩沖區(qū)即將滿了創(chuàng)建線程函數(shù)用于將監(jiān)控的存放于中向中添加監(jiān)聽(tīng)的文件描述符等待所有的線程開(kāi)啟事件循環(huán)利用創(chuàng)建線程,線程啟動(dòng)函數(shù)是保存監(jiān)聽(tīng)本函數(shù)將用于監(jiān)聽(tīng)的存放到當(dāng)中,并設(shè)置相應(yīng)的屬性 Server 的啟動(dòng) 在 server 啟動(dòng)之前,swoole 首先要調(diào)用 php_swoole_register_callback 將 PHP 的回調(diào)函數(shù)注冊(cè)到 server...
閱讀 3975·2021-11-16 11:44
閱讀 5221·2021-10-09 09:54
閱讀 2035·2019-08-30 15:44
閱讀 1686·2019-08-29 17:22
閱讀 2760·2019-08-29 14:11
閱讀 3397·2019-08-26 13:25
閱讀 2329·2019-08-26 11:55
閱讀 1600·2019-08-26 10:37