国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

Swoole 源碼分析——Server模塊之Start

3fuyu / 3213人閱讀

摘要:是緩存區(qū)高水位線,達(dá)到了說(shuō)明緩沖區(qū)即將滿了創(chuàng)建線程函數(shù)用于將監(jiān)控的存放于中向中添加監(jiān)聽(tīng)的文件描述符等待所有的線程開(kāi)啟事件循環(huán)利用創(chuàng)建線程,線程啟動(dòng)函數(shù)是保存監(jiān)聽(tīng)本函數(shù)將用于監(jiān)聽(tīng)的存放到當(dāng)中,并設(shè)置相應(yīng)的屬性

Server 的啟動(dòng)

server 啟動(dòng)之前,swoole 首先要調(diào)用 php_swoole_register_callbackPHP 的回調(diào)函數(shù)注冊(cè)到 server 的對(duì)象函數(shù)中去

之后調(diào)用 php_swoole_server_before_start 創(chuàng)建 swReactorThread 數(shù)組對(duì)象、workers 進(jìn)程池對(duì)象

最后調(diào)用 swServer_start 函數(shù)創(chuàng)建 reactor 線程,workmanager 等進(jìn)程,開(kāi)啟事件循環(huán)

PHP_METHOD(swoole_server, start)
{
    zval *zobject = getThis();
    int ret;

    swServer *serv = swoole_get_object(getThis());
    if (serv->gs->start > 0)
    {
        swoole_php_fatal_error(E_WARNING, "server is running. unable to execute swoole_server->start.");
        RETURN_FALSE;
    }

    php_swoole_register_callback(serv);

    //-------------------------------------------------------------
    serv->onReceive = php_swoole_onReceive;

    php_swoole_server_before_start(serv, zobject TSRMLS_CC);

    ret = swServer_start(serv);
    if (ret < 0)
    {
        swoole_php_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error);
        RETURN_LONG(ret);
    }
    RETURN_TRUE;
}

注冊(cè) PHP 回調(diào)函數(shù)
void php_swoole_register_callback(swServer *serv)
{
    /*
     * optional callback
     */
    if (php_sw_server_callbacks[SW_SERVER_CB_onStart] != NULL)
    {
        serv->onStart = php_swoole_onStart;
    }
    serv->onShutdown = php_swoole_onShutdown;
    /**
     * require callback, set the master/manager/worker PID
     */
    serv->onWorkerStart = php_swoole_onWorkerStart;

    if (php_sw_server_callbacks[SW_SERVER_CB_onWorkerStop] != NULL)
    {
        serv->onWorkerStop = php_swoole_onWorkerStop;
    }
    if (php_sw_server_callbacks[SW_SERVER_CB_onWorkerExit] != NULL)
    {
        serv->onWorkerExit = php_swoole_onWorkerExit;
    }
    /**
     * UDP Packet
     */
    if (php_sw_server_callbacks[SW_SERVER_CB_onPacket] != NULL)
    {
        serv->onPacket = php_swoole_onPacket;
    }
    /**
     * Task Worker
     */
    if (php_sw_server_callbacks[SW_SERVER_CB_onTask] != NULL)
    {
        serv->onTask = php_swoole_onTask;
    }
    if (php_sw_server_callbacks[SW_SERVER_CB_onFinish] != NULL)
    {
        serv->onFinish = php_swoole_onFinish;
    }
    if (php_sw_server_callbacks[SW_SERVER_CB_onWorkerError] != NULL)
    {
        serv->onWorkerError = php_swoole_onWorkerError;
    }
    if (php_sw_server_callbacks[SW_SERVER_CB_onManagerStart] != NULL)
    {
        serv->onManagerStart = php_swoole_onManagerStart;
    }
    if (php_sw_server_callbacks[SW_SERVER_CB_onManagerStop] != NULL)
    {
        serv->onManagerStop = php_swoole_onManagerStop;
    }
    if (php_sw_server_callbacks[SW_SERVER_CB_onPipeMessage] != NULL)
    {
        serv->onPipeMessage = php_swoole_onPipeMessage;
    }
    if (php_sw_server_callbacks[SW_SERVER_CB_onBufferFull] != NULL)
    {
        serv->onBufferFull = php_swoole_onBufferFull;
    }
    if (php_sw_server_callbacks[SW_SERVER_CB_onBufferEmpty] != NULL || serv->send_yield)
    {
        serv->onBufferEmpty = php_swoole_onBufferEmpty;
    }
}
創(chuàng)建 reactor 線程池對(duì)象與 work 進(jìn)程池對(duì)象

php_swoole_server_before_start 主要調(diào)用 swServer_create 函數(shù)

swServer_create 函數(shù)主要任務(wù)是 swReactorThread_create 創(chuàng)建 reactor 多線程

void php_swoole_server_before_start(swServer *serv, zval *zobject TSRMLS_DC)
{
    /**
     * create swoole server
     */
    if (swServer_create(serv) < 0)
    {
        swoole_php_fatal_error(E_ERROR, "failed to create the server. Error: %s", sw_error);
        return;
    }

}

int swServer_create(swServer *serv)
{
    if (SwooleG.main_reactor)
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_MUST_CREATED_BEFORE_CLIENT, "The swoole_server must create before client");
        return SW_ERR;
    }

    SwooleG.factory = &serv->factory;
    serv->factory.ptr = serv;
    /**
     * init current time
     */
    swServer_update_time(serv);

#ifdef SW_REACTOR_USE_SESSION
    serv->session_list = sw_shm_calloc(SW_SESSION_LIST_SIZE, sizeof(swSession));
    if (serv->session_list == NULL)
    {
        swError("sw_shm_calloc(%ld) for session_list failed", SW_SESSION_LIST_SIZE * sizeof(swSession));
        return SW_ERR;
    }
#endif

    if (serv->factory_mode == SW_MODE_SINGLE)
    {
        return swReactorProcess_create(serv);
    }
    else
    {
        return swReactorThread_create(serv);
    }
}
swReactorThread_create 創(chuàng)建線程池對(duì)象

函數(shù)首先申請(qǐng)內(nèi)存構(gòu)建 reactor_threads 用于存儲(chǔ)多線程的各種信息,創(chuàng)建 connection_list 保存已建立連接的 socket 信息

利用 swFactoryThread_create 創(chuàng)建 reactor 多線程

int swReactorThread_create(swServer *serv)
{
    int ret = 0;
    /**
     * init reactor thread pool
     */
    serv->reactor_threads = SwooleG.memory_pool->alloc(SwooleG.memory_pool, (serv->reactor_num * sizeof(swReactorThread)));
    if (serv->reactor_threads == NULL)
    {
        swError("calloc[reactor_threads] fail.alloc_size=%d", (int )(serv->reactor_num * sizeof(swReactorThread)));
        return SW_ERR;
    }

    /**
     * alloc the memory for connection_list
     */
    if (serv->factory_mode == SW_MODE_PROCESS)
    {
        serv->connection_list = sw_shm_calloc(serv->max_connection, sizeof(swConnection));
    }
    else
    {
        serv->connection_list = sw_calloc(serv->max_connection, sizeof(swConnection));
    }

    //create factry object
    if (serv->factory_mode == SW_MODE_PROCESS)
    {
        if (serv->worker_num < 1)
        {
            swError("Fatal Error: serv->worker_num < 1");
            return SW_ERR;
        }
        ret = swFactoryProcess_create(&(serv->factory), serv->worker_num);
    }

    if (ret < 0)
    {
        swError("create factory failed");
        return SW_ERR;
    }
    return SW_OK;
}
swFactoryProcess_create 創(chuàng)建進(jìn)程池對(duì)象
int swFactoryProcess_create(swFactory *factory, int worker_num)
{
    swFactoryProcess *object;
    object = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swFactoryProcess));
    if (object == NULL)
    {
        swWarn("[Master] malloc[object] failed");
        return SW_ERR;
    }

    factory->object = object;
    factory->dispatch = swFactoryProcess_dispatch;
    factory->finish = swFactoryProcess_finish;
    factory->start = swFactoryProcess_start;
    factory->notify = swFactoryProcess_notify;
    factory->shutdown = swFactoryProcess_shutdown;
    factory->end = swFactoryProcess_end;

    return SW_OK;
}
swServer_start 函數(shù)

swServer_start 函數(shù)是啟動(dòng)整個(gè) swoole 的關(guān)鍵

swServer_start_check 函數(shù)用于檢查各種回調(diào)函數(shù)已經(jīng)被正確設(shè)置

如果當(dāng)前 swoole 是守護(hù)程序(daemonize),那么要設(shè)置日志輸出目錄,調(diào)用 daemon 函數(shù)設(shè)置自身進(jìn)程會(huì)話

從內(nèi)存池中申請(qǐng)構(gòu)建 worker 對(duì)象,設(shè)置全局共享對(duì)象 event_workers

申請(qǐng) reactor 線程的 buffer_input

如果存在 task_worker 進(jìn)程,那么申請(qǐng) worker 進(jìn)程與 task_worker 進(jìn)程用于通訊的 pipe

如果存在用戶 task 進(jìn)程,要設(shè)置用戶 task 進(jìn)程的 id

factory->start(factory) 啟動(dòng)創(chuàng)建 managerworkertask_workeruser_task_worker 進(jìn)程

swServer_signal_init 進(jìn)行信號(hào)初始化

swServer_start_proxy 創(chuàng)建 reactor 多線程,開(kāi)啟事件循環(huán)

int swServer_start(swServer *serv)
{
    swFactory *factory = &serv->factory;
    int ret;

    ret = swServer_start_check(serv);
    if (ret < 0)
    {
        return SW_ERR;
    }
    if (SwooleG.hooks[SW_GLOBAL_HOOK_BEFORE_SERVER_START])
    {
        swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_START, serv);
    }
    //cann"t start 2 servers at the same time, please use process->exec.
    if (!sw_atomic_cmp_set(&serv->gs->start, 0, 1))
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_ONLY_START_ONE, "must only start one server.");
        return SW_ERR;
    }
    //init loggger
    if (SwooleG.log_file)
    {
        swLog_init(SwooleG.log_file);
    }
    //run as daemon
    if (serv->daemonize > 0)
    {
        /**
         * redirect STDOUT to log file
         */
        if (SwooleG.log_fd > STDOUT_FILENO)
        {
            swoole_redirect_stdout(SwooleG.log_fd);
        }
        /**
         * redirect STDOUT_FILENO/STDERR_FILENO to /dev/null
         */
        else
        {
            SwooleG.null_fd = open("/dev/null", O_WRONLY);
            if (SwooleG.null_fd > 0)
            {
                swoole_redirect_stdout(SwooleG.null_fd);
            }
            else
            {
                swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "open(/dev/null) failed. Error: %s[%d]", strerror(errno), errno);
            }
        }

        if (daemon(0, 1) < 0)
        {
            return SW_ERR;
        }
    }

    //master pid
    serv->gs->master_pid = getpid();
    serv->gs->now = serv->stats->start_time = time(NULL);

    serv->send = swServer_tcp_send;
    serv->sendwait = swServer_tcp_sendwait;
    serv->sendfile = swServer_tcp_sendfile;
    serv->close = swServer_tcp_close;

    serv->workers = SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->worker_num * sizeof(swWorker));
    if (serv->workers == NULL)
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "gmalloc[server->workers] failed.");
        return SW_ERR;
    }

    /**
     * store to swProcessPool object
     */
    serv->gs->event_workers.workers = serv->workers;
    serv->gs->event_workers.worker_num = serv->worker_num;
    serv->gs->event_workers.use_msgqueue = 0;

    int i;
    for (i = 0; i < serv->worker_num; i++)
    {
        serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers;
    }

#ifdef SW_USE_RINGBUFFER
    for (i = 0; i < serv->reactor_num; i++)
    {
        serv->reactor_threads[i].buffer_input = swRingBuffer_new(SwooleG.serv->buffer_input_size, 1);
        if (!serv->reactor_threads[i].buffer_input)
        {
            return SW_ERR;
        }
    }
#endif

    /*
     * For swoole_server->taskwait, create notify pipe and result shared memory.
     */
    if (serv->task_worker_num > 0 && serv->worker_num > 0)
    {
        serv->task_result = sw_shm_calloc(serv->worker_num, sizeof(swEventData));
        serv->task_notify = sw_calloc(serv->worker_num, sizeof(swPipe));
        for (i = 0; i < serv->worker_num; i++)
        {
            if (swPipeNotify_auto(&serv->task_notify[i], 1, 0))
            {
                return SW_ERR;
            }
        }
    }

    /**
     * user worker process
     */
    if (serv->user_worker_list)
    {
        swUserWorker_node *user_worker;
        i = 0;
        LL_FOREACH(serv->user_worker_list, user_worker)
        {
            user_worker->worker->id = serv->worker_num + serv->task_worker_num + i;
            i++;
        }
    }

    //factory start
    if (factory->start(factory) < 0)
    {
        return SW_ERR;
    }
    //signal Init
    swServer_signal_init(serv);

    //write PID file
    if (serv->pid_file)
    {
        ret = snprintf(SwooleTG.buffer_stack->str, SwooleTG.buffer_stack->size, "%d", getpid());
        swoole_file_put_contents(serv->pid_file, SwooleTG.buffer_stack->str, ret);
    }
    if (serv->factory_mode == SW_MODE_SINGLE)
    {
        ret = swReactorProcess_start(serv);
    }
    else
    {
        ret = swServer_start_proxy(serv);
    }
    swServer_free(serv);
    serv->gs->start = 0;
    //remove PID file
    if (serv->pid_file)
    {
        unlink(serv->pid_file);
    }
    return SW_OK;
}
daemon

如果想要進(jìn)程 daemon 化,必要的步驟如下:

切換目錄為根目錄

stdinstdoutstderr 重定向到 /dev/null

fork 開(kāi)啟一個(gè)新進(jìn)程

退出父進(jìn)程,在子進(jìn)程中開(kāi)啟一個(gè)新的會(huì)話

int daemon(int nochdir, int noclose)
{
    pid_t pid;

    if (!nochdir && chdir("/") != 0)
    {
        swWarn("chdir() failed. Error: %s[%d]", strerror(errno), errno);
        return -1;
    }

    if (!noclose)
    {
        int fd = open("/dev/null", O_RDWR);
        if (fd < 0)
        {
            swWarn("open() failed. Error: %s[%d]", strerror(errno), errno);
            return -1;
        }

        if (dup2(fd, 0) < 0 || dup2(fd, 1) < 0 || dup2(fd, 2) < 0)
        {
            close(fd);
            swWarn("dup2() failed. Error: %s[%d]", strerror(errno), errno);
            return -1;
        }

        close(fd);
    }

    pid = fork();
    if (pid < 0)
    {
        swWarn("fork() failed. Error: %s[%d]", strerror(errno), errno);
        return -1;
    }
    if (pid > 0)
    {
        _exit(0);
    }
    if (setsid() < 0)
    {
        swWarn("setsid() failed. Error: %s[%d]", strerror(errno), errno);
        return -1;
    }
    return 0;
}
factory->start 開(kāi)啟 managerwork 進(jìn)程

swServer_get_worker 函數(shù)用于從 event_workers

swWorker_create 函數(shù)用于初始化 send_shmlock

swManager_start 函數(shù)用于啟動(dòng) manager 進(jìn)程

static int swFactoryProcess_start(swFactory *factory)
{
    int i;
    swServer *serv = factory->ptr;
    swWorker *worker;

    for (i = 0; i < serv->worker_num; i++)
    {
        worker = swServer_get_worker(serv, i);
        if (swWorker_create(worker) < 0)
        {
            return SW_ERR;
        }
    }

    serv->reactor_pipe_num = serv->worker_num / serv->reactor_num;

    //必須先啟動(dòng)manager進(jìn)程組,否則會(huì)帶線程fork
    if (swManager_start(factory) < 0)
    {
        swWarn("swFactoryProcess_manager_start failed.");
        return SW_ERR;
    }
    //主進(jìn)程需要設(shè)置為直寫(xiě)模式
    factory->finish = swFactory_finish;
    return SW_OK;
}

static sw_inline swWorker* swServer_get_worker(swServer *serv, uint16_t worker_id)
{
    //Event Worker
    if (worker_id < serv->worker_num)
    {
        return &(serv->gs->event_workers.workers[worker_id]);
    }

    //Task Worker
    uint16_t task_worker_max = serv->task_worker_num + serv->worker_num;
    if (worker_id < task_worker_max)
    {
        return &(serv->gs->task_workers.workers[worker_id - serv->worker_num]);
    }

    //User Worker
    uint16_t user_worker_max = task_worker_max + serv->user_worker_num;
    if (worker_id < user_worker_max)
    {
        return &(serv->user_workers[worker_id - task_worker_max]);
    }

    return NULL;
}

int swWorker_create(swWorker *worker)
{
    /**
     * Create shared memory storage
     */
    worker->send_shm = sw_shm_malloc(SwooleG.serv->buffer_output_size);
    if (worker->send_shm == NULL)
    {
        swWarn("malloc for worker->store failed.");
        return SW_ERR;
    }
    swMutex_create(&worker->lock, 1);

    return SW_OK;
}
swManager_start 函數(shù)

首先需要準(zhǔn)備好 pipes 作為 master 進(jìn)程與 worker 進(jìn)行的通訊管道

設(shè)置每個(gè) worker 進(jìn)程的 pipe_master(master 進(jìn)程向 worker 進(jìn)程傳遞消息)、pipe_worker(worker 進(jìn)程向 master 進(jìn)程傳遞消息)

如果存在 task_worker 進(jìn)程,需要調(diào)用 swServer_create_task_worker 函數(shù)創(chuàng)建 serv->gs->task_workers,之后將對(duì)其進(jìn)行初始化

如果存在 user_workers 進(jìn)程,那么就要?jiǎng)?chuàng)建相應(yīng)的 serv->user_workers,并初始化

調(diào)用 fork,啟動(dòng) manager 進(jìn)程

manager 進(jìn)程中,調(diào)用 swServer_close_listen_port 關(guān)閉監(jiān)聽(tīng)的 socket

對(duì)于 task_worker 進(jìn)程,利用 swProcessPool_start 啟動(dòng) task_worker 進(jìn)程

對(duì)于 worker 進(jìn)程,調(diào)用 swManager_spawn_worker 啟動(dòng) worker 進(jìn)程

對(duì)于 user_worker 進(jìn)程,調(diào)用 swManager_spawn_user_worker 啟動(dòng) user_worker 進(jìn)程

調(diào)用 swManager_loop 進(jìn)行事件循環(huán),管理 worker 等進(jìn)程

void swServer_store_pipe_fd(swServer *serv, swPipe *p)
{
    int master_fd = p->getFd(p, SW_PIPE_MASTER);

    serv->connection_list[p->getFd(p, SW_PIPE_WORKER)].object = p;
    serv->connection_list[master_fd].object = p;

    if (master_fd > swServer_get_minfd(serv))
    {
        swServer_set_minfd(serv, master_fd);
    }
}

int swManager_start(swFactory *factory)
{
    swFactoryProcess *object = factory->object;
    int i;
    pid_t pid;
    swServer *serv = factory->ptr;

    object->pipes = sw_calloc(serv->worker_num, sizeof(swPipe));
    if (object->pipes == NULL)
    {
        swError("malloc[worker_pipes] failed. Error: %s [%d]", strerror(errno), errno);
        return SW_ERR;
    }

    //worker進(jìn)程的pipes
    for (i = 0; i < serv->worker_num; i++)
    {
        if (swPipeUnsock_create(&object->pipes[i], 1, SOCK_DGRAM) < 0)
        {
            return SW_ERR;
        }
        serv->workers[i].pipe_master = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_MASTER);
        serv->workers[i].pipe_worker = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_WORKER);
        serv->workers[i].pipe_object = &object->pipes[i];
        swServer_store_pipe_fd(serv, serv->workers[i].pipe_object);
    }

    if (serv->task_worker_num > 0)
    {
        if (swServer_create_task_worker(serv) < 0)
        {
            return SW_ERR;
        }

        swProcessPool *pool = &serv->gs->task_workers;
        swTaskWorker_init(pool);

        swWorker *worker;
        for (i = 0; i < serv->task_worker_num; i++)
        {
            worker = &pool->workers[i];
            if (swWorker_create(worker) < 0)
            {
                return SW_ERR;
            }
            if (serv->task_ipc_mode == SW_TASK_IPC_UNIXSOCK)
            {
                swServer_store_pipe_fd(SwooleG.serv, worker->pipe_object);
            }
        }
    }

    //User Worker Process
    if (serv->user_worker_num > 0)
    {
        serv->user_workers = SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->user_worker_num * sizeof(swWorker));
        if (serv->user_workers == NULL)
        {
            swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "gmalloc[server->user_workers] failed.");
            return SW_ERR;
        }
        swUserWorker_node *user_worker;
        i = 0;
        LL_FOREACH(serv->user_worker_list, user_worker)
        {
            memcpy(&serv->user_workers[i], user_worker->worker, sizeof(swWorker));
            if (swWorker_create(&serv->user_workers[i]) < 0)
            {
                return SW_ERR;
            }
            i++;
        }
    }

    serv->message_box = swChannel_new(65536, sizeof(swWorkerStopMessage), SW_CHAN_LOCK | SW_CHAN_SHM);
    if (serv->message_box == NULL)
    {
        return SW_ERR;
    }

    pid = fork();
    switch (pid)
    {
    //fork manager process
    case 0:
        //wait master process
        SW_START_SLEEP;
        if (serv->gs->start == 0)
        {
            return SW_OK;
        }
        swServer_close_listen_port(serv);

        /**
         * create task worker process
         */
        if (serv->task_worker_num > 0)
        {
            swProcessPool_start(&serv->gs->task_workers);
        }
        /**
         * create worker process
         */
        for (i = 0; i < serv->worker_num; i++)
        {
            //close(worker_pipes[i].pipes[0]);
            pid = swManager_spawn_worker(factory, i);
            if (pid < 0)
            {
                swError("fork() failed.");
                return SW_ERR;
            }
            else
            {
                serv->workers[i].pid = pid;
            }
        }
        /**
         * create user worker process
         */
        if (serv->user_worker_list)
        {
            swUserWorker_node *user_worker;
            LL_FOREACH(serv->user_worker_list, user_worker)
            {
                /**
                 * store the pipe object
                 */
                if (user_worker->worker->pipe_object)
                {
                    swServer_store_pipe_fd(serv, user_worker->worker->pipe_object);
                }
                swManager_spawn_user_worker(serv, user_worker->worker);
            }
        }

        SwooleG.process_type = SW_PROCESS_MANAGER;
        SwooleG.pid = getpid();
        exit(swManager_loop(factory));
        break;

        //master process
    default:
        serv->gs->manager_pid = pid;
        break;
    case -1:
        swError("fork() failed.");
        return SW_ERR;
    }
    return SW_OK;
}
swManager_spawn_worker 啟動(dòng) worker 進(jìn)程
static pid_t swManager_spawn_worker(swFactory *factory, int worker_id)
{
    pid_t pid;
    int ret;

    pid = fork();

    //fork() failed
    if (pid < 0)
    {
        swWarn("Fork Worker failed. Error: %s [%d]", strerror(errno), errno);
        return SW_ERR;
    }
    //worker child processor
    else if (pid == 0)
    {
        ret = swWorker_loop(factory, worker_id);
        exit(ret);
    }
    //parent,add to writer
    else
    {
        return pid;
    }
}
swManager_spawn_user_worker 啟動(dòng) user_worker 進(jìn)程
pid_t swManager_spawn_user_worker(swServer *serv, swWorker* worker)
{
    pid_t pid = fork();

    if (pid < 0)
    {
        swWarn("Fork Worker failed. Error: %s [%d]", strerror(errno), errno);
        return SW_ERR;
    }
    //child
    else if (pid == 0)
    {
        SwooleG.process_type = SW_PROCESS_USERWORKER;
        SwooleWG.worker = worker;
        SwooleWG.id = worker->id;
        worker->pid = getpid();
        //close tcp listen socket
        if (serv->factory_mode == SW_MODE_SINGLE)
        {
            swServer_close_port(serv, SW_TRUE);
        }
        serv->onUserWorkerStart(serv, worker);
        exit(0);
    }
    //parent
    else
    {
        if (worker->pid)
        {
            swHashMap_del_int(serv->user_worker_map, worker->pid);
        }
        worker->pid = pid;
        swHashMap_add_int(serv->user_worker_map, pid, worker);
        return pid;
    }
}
swServer_start_proxy 開(kāi)啟 reactor 多線程

直到這個(gè)時(shí)候,main_reactor 才真正的被創(chuàng)建出來(lái),并進(jìn)行初始化

如果當(dāng)前系統(tǒng)支持 signalfd,那么就要調(diào)用 swSignalfd_setup 函數(shù)對(duì) signalfd 進(jìn)行初始化

對(duì)于 listen_list 里面的 tcp 監(jiān)聽(tīng) socket,需要調(diào)用 swPort_listen 進(jìn)行監(jiān)聽(tīng)

stream_fd 是為了 worker 準(zhǔn)備的,對(duì)于 master 進(jìn)程,直接關(guān)閉即可

swReactorThread_start 函數(shù)用于創(chuàng)建 reactor 線程

如果系統(tǒng)不支持時(shí)間輪算法,那么就要利用 swHeartbeatThread_start 啟動(dòng)一個(gè)進(jìn)程,專(zhuān)門(mén)踢掉空閑的連接

對(duì)于定時(shí)任務(wù),利用 swTimer_init 初始化 SwooleG.timer

設(shè)置 master 主線程的線程特有數(shù)據(jù)

利用 main_reactor->wait 等待新的連接

static int swServer_start_proxy(swServer *serv)
{
    int ret;
    swReactor *main_reactor = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swReactor));

    ret = swReactor_create(main_reactor, SW_REACTOR_MAXEVENTS);
    if (ret < 0)
    {
        swWarn("Reactor create failed");
        return SW_ERR;
    }

    main_reactor->thread = 1;
    main_reactor->socket_list = serv->connection_list;
    main_reactor->disable_accept = 0;
    main_reactor->enable_accept = swServer_enable_accept;

#ifdef HAVE_SIGNALFD
    if (SwooleG.use_signalfd)
    {
        swSignalfd_setup(main_reactor);
    }
#endif

    //set listen socket options
    swListenPort *ls;
    LL_FOREACH(serv->listen_list, ls)
    {
        if (swSocket_is_dgram(ls->type))
        {
            continue;
        }
        if (swPort_listen(ls) < 0)
        {
            return SW_ERR;
        }
    }

    if (serv->stream_fd > 0)
    {
        close(serv->stream_fd);
    }

    /**
     * create reactor thread
     */
    ret = swReactorThread_start(serv, main_reactor);
    if (ret < 0)
    {
        swWarn("ReactorThread start failed");
        return SW_ERR;
    }

#ifndef SW_USE_TIMEWHEEL
    /**
     * heartbeat thread
     */
    if (serv->heartbeat_check_interval >= 1 && serv->heartbeat_check_interval <= serv->heartbeat_idle_time)
    {
        swTrace("hb timer start, time: %d live time:%d", serv->heartbeat_check_interval, serv->heartbeat_idle_time);
        swHeartbeatThread_start(serv);
    }
#endif

    /**
     * master thread loop
     */
    SwooleTG.type = SW_THREAD_MASTER;
    SwooleTG.factory_target_worker = -1;
    SwooleTG.factory_lock_target = 0;
    SwooleTG.id = serv->reactor_num;
    SwooleTG.update_time = 1;

    SwooleG.main_reactor = main_reactor;
    SwooleG.pid = getpid();
    SwooleG.process_type = SW_PROCESS_MASTER;

    /**
     * set a special id
     */
    main_reactor->id = serv->reactor_num;
    main_reactor->ptr = serv;
    main_reactor->setHandle(main_reactor, SW_FD_LISTEN, swServer_master_onAccept);

    if (serv->hooks[SW_SERVER_HOOK_MASTER_START])
    {
        swServer_call_hook(serv, SW_SERVER_HOOK_MASTER_START, serv);
    }

    /**
     * init timer
     */
    if (swTimer_init(1000) < 0)
    {
        return SW_ERR;
    }
    /**
     * 1 second timer, update serv->gs->now
     */
    if (SwooleG.timer.add(&SwooleG.timer, 1000, 1, serv, swServer_master_onTimer) == NULL)
    {
        return SW_ERR;
    }

    if (serv->onStart != NULL)
    {
        serv->onStart(serv);
    }

    return main_reactor->wait(main_reactor, NULL);
}
swPort_listen 開(kāi)啟端口監(jiān)聽(tīng)

tcp_defer_accept :當(dāng)一個(gè)TCP連接有數(shù)據(jù)發(fā)送時(shí)才觸發(fā) accept

tcp_fastopen: 開(kāi)啟 TCP 快速握手特性。此項(xiàng)特性,可以提升 TCP 短連接的響應(yīng)速度,在客戶端完成握手的第三步,發(fā)送 SYN 包時(shí)攜帶數(shù)據(jù)。

open_tcp_keepalive: 在 TCP 中有一個(gè) Keep-Alive 的機(jī)制可以檢測(cè)死連接,應(yīng)用層如果對(duì)于死鏈接周期不敏感或者沒(méi)有實(shí)現(xiàn)心跳機(jī)制,可以使用操作系統(tǒng)提供的 keepalive 機(jī)制來(lái)踢掉死鏈接。

buffer_high_watermark 是緩存區(qū)高水位線,達(dá)到了說(shuō)明緩沖區(qū)即將滿了

int swPort_listen(swListenPort *ls)
{
    int sock = ls->sock;
    int option = 1;

    //listen stream socket
    if (listen(sock, ls->backlog) < 0)
    {
        swWarn("listen(%s:%d, %d) failed. Error: %s[%d]", ls->host, ls->port, ls->backlog, strerror(errno), errno);
        return SW_ERR;
    }

#ifdef TCP_DEFER_ACCEPT
    if (ls->tcp_defer_accept)
    {
        if (setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, (const void*) &ls->tcp_defer_accept, sizeof(int)) < 0)
        {
            swSysError("setsockopt(TCP_DEFER_ACCEPT) failed.");
        }
    }
#endif

#ifdef TCP_FASTOPEN
    if (ls->tcp_fastopen)
    {
        if (setsockopt(sock, IPPROTO_TCP, TCP_FASTOPEN, (const void*) &ls->tcp_fastopen, sizeof(int)) < 0)
        {
            swSysError("setsockopt(TCP_FASTOPEN) failed.");
        }
    }
#endif

#ifdef SO_KEEPALIVE
    if (ls->open_tcp_keepalive == 1)
    {
        if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *) &option, sizeof(option)) < 0)
        {
            swSysError("setsockopt(SO_KEEPALIVE) failed.");
        }
#ifdef TCP_KEEPIDLE
        setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, (void*) &ls->tcp_keepidle, sizeof(int));
        setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, (void *) &ls->tcp_keepinterval, sizeof(int));
        setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, (void *) &ls->tcp_keepcount, sizeof(int));
#endif
    }
#endif

    ls->buffer_high_watermark = ls->socket_buffer_size * 0.8;
    ls->buffer_low_watermark = 0;

    return SW_OK;
}
swReactorThread_start 創(chuàng)建 reactor 線程

swServer_store_listen_socket 函數(shù)用于將監(jiān)控的 socket 存放于 connection_list

main_reactor 中添加監(jiān)聽(tīng)的 socket 文件描述符

pthread_barrier_initpthread_barrier_wait 等待所有的 reactor 線程開(kāi)啟事件循環(huán)

利用 pthread_create 創(chuàng)建 reactor 線程,線程啟動(dòng)函數(shù)是 swReactorThread_loop

int swReactorThread_start(swServer *serv, swReactor *main_reactor_ptr)
{
    swThreadParam *param;
    swReactorThread *thread;
    pthread_t pidt;
    int i;

    swServer_store_listen_socket(serv);

#ifdef HAVE_REUSEPORT
    SwooleG.reuse_port = 0;
#endif

    swListenPort *ls;
    LL_FOREACH(serv->listen_list, ls)
    {
        if (ls->type == SW_SOCK_UDP || ls->type == SW_SOCK_UDP6 || ls->type == SW_SOCK_UNIX_DGRAM)
        {
            continue;
        }
        main_reactor_ptr->add(main_reactor_ptr, ls->sock, SW_FD_LISTEN);
    }

#ifdef HAVE_PTHREAD_BARRIER
    //init thread barrier
    pthread_barrier_init(&serv->barrier, NULL, serv->reactor_num + 1);
#endif

    //create reactor thread
    for (i = 0; i < serv->reactor_num; i++)
    {
        thread = &(serv->reactor_threads[i]);
        param = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swThreadParam));
        if (param == NULL)
        {
            swError("malloc failed");
            return SW_ERR;
        }

        param->object = serv;
        param->pti = i;

        if (pthread_create(&pidt, NULL, (void * (*)(void *)) swReactorThread_loop, (void *) param) < 0)
        {
            swError("pthread_create[tcp_reactor] failed. Error: %s[%d]", strerror(errno), errno);
        }
        thread->thread_id = pidt;
    }
#ifdef HAVE_PTHREAD_BARRIER
    //wait reactor thread
    pthread_barrier_wait(&serv->barrier);
#else
    SW_START_SLEEP;
#endif

    return SW_OK;
}
swServer_store_listen_socket 保存監(jiān)聽(tīng)

本函數(shù)將用于監(jiān)聽(tīng)的 socket 存放到 connection_list 當(dāng)中,并設(shè)置相應(yīng)的 info 屬性;

void swServer_store_listen_socket(swServer *serv)
{
    swListenPort *ls;
    int sockfd;
    LL_FOREACH(serv->listen_list, ls)
    {
        sockfd = ls->sock;
        //save server socket to connection_list
        serv->connection_list[sockfd].fd = sockfd;
        //socket type
        serv->connection_list[sockfd].socket_type = ls->type;
        //save listen_host object
        serv->connection_list[sockfd].object = ls;

        if (swSocket_is_dgram(ls->type))
        {
            if (ls->type == SW_SOCK_UDP)
            {
                serv->connection_list[sockfd].info.addr.inet_v4.sin_port = htons(ls->port);
            }
            else if (ls->type == SW_SOCK_UDP6)
            {
                SwooleG.serv->udp_socket_ipv6 = sockfd;
                serv->connection_list[sockfd].info.addr.inet_v6.sin6_port = htons(ls->port);
            }
        }
        else
        {
            //IPv4
            if (ls->type == SW_SOCK_TCP)
            {
                serv->connection_list[sockfd].info.addr.inet_v4.sin_port = htons(ls->port);
            }
            //IPv6
            else if (ls->type == SW_SOCK_TCP6)
            {
                serv->connection_list[sockfd].info.addr.inet_v6.sin6_port = htons(ls->port);
            }
        }
        if (sockfd >= 0)
        {
            swServer_set_minfd(serv, sockfd);
            swServer_set_maxfd(serv, sockfd);
        }
    }
}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/29215.html

相關(guān)文章

  • Swoole 源碼分析——Client模塊Send

    摘要:當(dāng)此時(shí)的套接字不可寫(xiě)的時(shí)候,會(huì)自動(dòng)放入緩沖區(qū)中。當(dāng)大于高水線時(shí),會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。寫(xiě)就緒狀態(tài)當(dāng)監(jiān)控到套接字進(jìn)入了寫(xiě)就緒狀態(tài)時(shí),就會(huì)調(diào)用函數(shù)。如果為,說(shuō)明此時(shí)異步客戶端雖然建立了連接,但是還沒(méi)有調(diào)用回調(diào)函數(shù),因此這時(shí)要調(diào)用函數(shù)。 前言 上一章我們說(shuō)了客戶端的連接 connect,對(duì)于同步客戶端來(lái)說(shuō),連接已經(jīng)建立成功;但是對(duì)于異步客戶端來(lái)說(shuō),此時(shí)可能還在進(jìn)行 DNS 的解析,on...

    caozhijian 評(píng)論0 收藏0
  • Swoole 源碼分析——Reactor模塊ReactorBase

    前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對(duì)象的首地址,ptr 是擁有 Reactor 對(duì)象的類(lèi)的指針, event_nu...

    baukh789 評(píng)論0 收藏0
  • Swoole 源碼分析——Server模塊Signal信號(hào)處理

    摘要:在創(chuàng)建進(jìn)程和線程之間,主線程開(kāi)始進(jìn)行信號(hào)處理函數(shù)的設(shè)置。事件循環(huán)結(jié)束前會(huì)調(diào)用函數(shù),該函數(shù)會(huì)檢查并執(zhí)行相應(yīng)的信號(hào)處理函數(shù)。 前言 信號(hào)處理是網(wǎng)絡(luò)庫(kù)不可或缺的一部分,不論是 ALARM、SIGTERM、SIGUSR1、SIGUSR2、SIGPIPE 等信號(hào)對(duì)程序的控制,還是 reactor、read、write 等操作被信號(hào)中斷的處理,都關(guān)系著整個(gè)框架程序的正常運(yùn)行。 Signal 數(shù)據(jù)...

    Nosee 評(píng)論0 收藏0
  • Swoole 源碼分析——Server模塊TaskWorker事件循環(huán)

    摘要:函數(shù)事件循環(huán)在事件循環(huán)時(shí),如果使用的是消息隊(duì)列,那么就不斷的調(diào)用從消息隊(duì)列中取出數(shù)據(jù)。獲取后的數(shù)據(jù)調(diào)用回調(diào)函數(shù)消費(fèi)消息之后,向中發(fā)送空數(shù)據(jù),告知進(jìn)程已消費(fèi),并且關(guān)閉新連接。 swManager_start 創(chuàng)建進(jìn)程流程 task_worker 進(jìn)程的創(chuàng)建可以分為三個(gè)步驟:swServer_create_task_worker 申請(qǐng)所需的內(nèi)存、swTaskWorker_init 初始化...

    用戶83 評(píng)論0 收藏0
  • Swoole 源碼分析——Server模塊Stream 模式

    摘要:新建可以看到,自動(dòng)采用包長(zhǎng)檢測(cè)的方法該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)值得注意的是第三個(gè)參數(shù)代表是否異步。發(fā)送數(shù)據(jù)函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲(chǔ)在,等著寫(xiě)事件就緒之后調(diào)用發(fā)送數(shù)據(jù)。 swReactorThread_dispatch 發(fā)送數(shù)據(jù) reactor 線程會(huì)通過(guò) swReactorThread_dispatch 發(fā)送數(shù)據(jù),當(dāng)采用 stream 發(fā)送數(shù)據(jù)的時(shí)候,會(huì)調(diào)用 sw...

    wums 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<