国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

微信開源mars源碼分析5—底層核心mars分析(續2)

asce1885 / 1706人閱讀

摘要:執行并根據每個連接的狀態決定后續處理,上篇已經講過,不再累述。上面的三段處理完畢后,應該是數組中不再有連接才對,這里的保險處理是對數組再進行檢查。至此跳出,算是整個連接過程完畢了。這里需要逐句分析,首先是。

最近回顧之前的文章,發現最后一篇有些著急了,很多地方沒有敘述清楚。這里先做個銜接吧。
我們還是以長連接為例,從longlink.cc看起。首先是那個線程函數__Run:
/mars-master/mars/stn/src/longlink.cc

void LongLink::__Run() {
    ......
    // 執行連接
    SOCKET sock = __RunConnect(conn_profile);
    
    // 無效的socket,更新描述文件,記錄失敗的時間節點,返回
    if (INVALID_SOCKET == sock) {
        conn_profile.disconn_time = ::gettickcount();
        conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi);
        __UpdateProfile(conn_profile);
        return;
    }
    ......
    // 執行讀寫
    __RunReadWrite(sock, errtype, errcode, conn_profile);
}

實際上核心的就2個,連接和讀寫,我們分別看下。
/mars-master/mars/stn/src/longlink.cc

SOCKET LongLink::__RunConnect(ConnectProfile& _conn_profile) {
    std::vector ip_items;
    std::vector vecaddr;
    ......
    // 賦值填充ip_items地址端口數組
    netsource_.GetLongLinkItems(ip_items, dns_util_);
    ......
    // 根據ip_items創建socket_address并加入vecaddr中
    for (unsigned int i = 0; i < ip_items.size(); ++i) {
        vecaddr.push_back(socket_address(ip_items[i].str_ip.c_str(), ip_items[i].port).v4tov6_address(isnat64));
    }
    ......
    // 創建觀察者和ComplexConnect連接核心,然后開始執行連接
    LongLinkConnectObserver connect_observer(*this, ip_items);
    ComplexConnect com_connect(kLonglinkConnTimeout, kLonglinkConnInteral, kLonglinkConnInteral, kLonglinkConnMax);
    SOCKET sock = com_connect.ConnectImpatient(vecaddr, connectbreak_, &connect_observer);
    
    // 返回socket
    return sock;
}

1.創建2個數組,地址端口item和socket_address;
2.調用netsource_.GetLongLinkItems(ip_items, dns_util_);填充IPPortItem數組;
3.根據填充好的前者數組生成socket_address填充后者數組;
4.創建連接觀察者;
5.開始執行連接;
首先看看netsource_.GetLongLinkItems是如何填充的:
/mars-master/mars/stn/src/net_source.cc

bool NetSource::GetLongLinkItems(std::vector& _ipport_items, DnsUtil& _dns_util) {
    
    ScopedLock lock(sg_ip_mutex);

    if (__GetLonglinkDebugIPPort(_ipport_items)) {
        return true;
    }
    
    lock.unlock();

     std::vector longlink_hosts = NetSource::GetLongLinkHosts();
     if (longlink_hosts.empty()) {
         xerror2("longlink host empty.");
         return false;
     }

     __GetIPPortItems(_ipport_items, longlink_hosts, _dns_util, true);

    return !_ipport_items.empty();
}

可以看到debug的優先,這里增加了調試的ip。再往下就先不貼代碼了,基本上就是之前通過SetLongLink設置進去的sg_longlink_hosts(長連接主機列表),再往上倒騰就是在MarsServiceNative.java的onCreate中通過描述文件profile設置進去的主機列表。也就是說之前早就設置好的主機列表已經存在了。
下面我們仍然要進入到上一篇提到的ComplexConnect::ConnectImpatient這個核心函數中看看。
/mars-master/mars/comm/socket/complexconnect.cc

SOCKET ComplexConnect::ConnectImpatient(const std::vector& _vecaddr, SocketSelectBreaker& _breaker, MComplexConnect* _observer) {
    ......
    // 根據地址列表,生成ConnectCheckFSM連接列表
    std::vector vecsocketfsm;

    for (unsigned int i = 0; i < _vecaddr.size(); ++i) {
        xinfo2(TSF"complex.conn %_", _vecaddr[i].url());

        ConnectCheckFSM* ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer);
        vecsocketfsm.push_back(ic);
    }
    // 下面就是對這個連接列表的各種操作了
    do {
        ......
        // 生成socketselect的封裝對象,并執行PreSelect前期準備工作
        SocketSelect sel(_breaker);
        sel.PreSelect();
        
        ......
        // 執行連接
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->PreSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;
            timeout = std::min(timeout, vecsocketfsm[i]->Timeout());
        }
        
        ......
        
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->AfterSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;

            if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckFail == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                xinfo2(TSF"index:%_, sock:%_, suc ConnectImpatient:%_:%_, RTT:(%_, %_), @%_", i, vecsocketfsm[i]->Socket(),
                       vecsocketfsm[i]->IP(), vecsocketfsm[i]->Port(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), this);
                retsocket = vecsocketfsm[i]->Socket();
                index_ = i;
                index_conn_rtt_ = vecsocketfsm[i]->Rtt();
                index_conn_totalcost_ = vecsocketfsm[i]->TotalRtt();
                vecsocketfsm[i]->Socket(INVALID_SOCKET);
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                break;
            }
        }
        
    } while (true);
}

1.數組中的每個長連接地址依次執行連接;
2.數組中的每個連接分別做后續處理(一個for循環中的三段處理);

我們首先看看vecsocketfsm[i]->PreSelect(sel, group);這句話,是由ConnectCheckFSM的父類TcpClientFSM實現的:
/mars-master/mars/comm/socket/tcpclient_fsm.cc

void TcpClientFSM::PreSelect(SocketSelect& _sel, XLogger& _log) {
    
    switch(status_) {
        case EStart: {
            PreConnectSelect(_sel, _log);
            break;
        }
        case EConnecting: {
            _sel.Write_FD_SET(sock_);
            _sel.Exception_FD_SET(sock_);
            break;
        }
        case EReadWrite: {
            PreReadWriteSelect(_sel, _log);
            break;
        }
        default:
            xassert2(false, "preselect status error");
    }
}

這里是根據這個連接的當前狀態決定前置操作的行為(開始連接、讀寫、連接中)。再往下看就是進行socket的connect。以PreConnectSelect為例,這里生產了socket,并執行了connect,最后將成功連接的socket執行_sel.Write_FD_SET(sock_);保存在了SocketSelect中。
我們來看下代碼:
/mars-master/mars/comm/socket/tcpclient_fsm.cc

void TcpClientFSM::PreConnectSelect(SocketSelect& _sel, XLogger& _log) {
    xassert2(EStart == status_, "%d", status_);
    // 執行虛函數,由子類繼承實現
    _OnCreate();

    xinfo2(TSF"addr:(%_:%_), ", addr_.ip(), addr_.port()) >> _log;

    // 生成socket
    sock_ = socket(addr_.address().sa_family, SOCK_STREAM, IPPROTO_TCP);

    if (sock_ == INVALID_SOCKET) {
        error_ = socket_errno;
        last_status_ = status_;
        status_ = EEnd;
        _OnClose(last_status_, error_, false);
        xerror2(TSF"close socket err:(%_, %_)", error_, socket_strerror(error_)) >> _log;
        return;
    }

    if (::getNetInfo() == kWifi && socket_fix_tcp_mss(sock_) < 0) {
#ifdef ANDROID
        xinfo2(TSF"wifi set tcp mss error:%0", strerror(socket_errno));
#endif
    }
    if (0 != socket_ipv6only(sock_, 0)){
        xwarn2(TSF"set ipv6only failed. error %_",strerror(socket_errno));
    }
    
    if (0 != socket_set_nobio(sock_)) {
        error_ = socket_errno;
        xerror2(TSF"close socket_set_nobio:(%_, %_)", error_, socket_strerror(error_)) >> _log;
    } else {
        xinfo2(TSF"socket:%_, ", sock_) >> _log;
    }

    if (0 != error_) {
        last_status_ = status_;
        status_ = EEnd;
        return;
    }

    start_connecttime_ = gettickcount();

    // 執行連接
    int ret = connect(sock_, &(addr_.address()), addr_.address_length());

    if (0 != ret && !IS_NOBLOCK_CONNECT_ERRNO(socket_errno)) {
        end_connecttime_ = ::gettickcount();

        error_ = socket_errno;
        xwarn2(TSF"close connect err:(%_, %_), localip:%_", error_, socket_strerror(error_), socket_address::getsockname(sock_).ip()) >> _log;
    } else {
        xinfo2("connect") >> _log;
        // 記錄socket到SocketSelect中
        _sel.Write_FD_SET(sock_);
        _sel.Exception_FD_SET(sock_);
    }

    last_status_ = status_;

    if (0 != error_)
        status_ = EEnd;
    else
        status_ = EConnecting;

    if (0 == error_) _OnConnect();
}

需要注意的是_OnCreate的調用,實際上是子類實現的,這里也就是ConnectCheckFSM實現的:

virtual void _OnCreate() { if (m_observer) m_observer->OnCreated(m_index, addr_, sock_);}

這里將觀察者與連接對象的生命周期綁在了一起,執行了觀察者的OnCreated。那么觀察者是誰呢?往上看,在LongLink::__RunConnect中生成的LongLinkConnectObserver。當然生命周期的回調并不止OnCreated一個。

回到__RunConnect中,看后續處理(for循環的三段處理)。執行AfterSelect并根據每個連接的狀態決定后續處理,上篇已經講過,不再累述。

那么何時終止這個do while循環呢?當for循環的三段處理完畢后,所有的連接過程都已經處理完畢了:

        // end of loop
        bool all_invalid = true;

        for (unsigned int i = 0; i < vecsocketfsm.size(); ++i) {
            if (NULL != vecsocketfsm[i]) {
                all_invalid = false;
                break;
            }
        }

        if (all_invalid || INVALID_SOCKET != retsocket) break;

最后枚舉一遍連接數組,每個元素檢查是否非空,如果還有非空的,就將all_invalid置為false,那么會繼續走一次do while。上面的三段處理完畢后,應該是數組中不再有連接才對,這里的保險處理是對數組再進行檢查。至此跳出do while,算是整個連接過程完畢了。

可以看到,經過了三段處理后,連接數組中只會命中一個檢測成功的連接,其他的失敗和完成的都會置為null。這里從全局看就是一個地址池的淘汰篩選機制。在三段處理的for循環中清除不合格的連接,挑出第一個找到的合格的連接,然后跳出三段后,立刻檢查整個數組是否已經就剩這一個可用了,如果不是繼續執行do while,又會去執行數組中的每個item的連接過程,再回到三段處理。也就是說所有的數組中的item都會連接一次,然后根據返回的狀態決定是否命中最終的一個socket。這是干嘛呢這么繞?我之前的理解恐怕還不透徹,現在感覺是在找一個穩定的可以讀寫狀態的連接。
第一次進入do while已經連接所有池中的item了,那么在經過了三段處理后淘汰掉不合適的和失敗的,然后再進入do while再次執行vecsocketfsm[i]->PreSelect(sel, group);的時候,已經更新了狀態并執行了不同的調用了,再經過三段處理在新的狀態下再淘汰一批,最后經過整個運轉,留下來的只能是最持久的(穩定的)唯一的一個連接,返回這個。
不得不說,這里確實巧妙,如果我寫并不會比這要好。

我們回來到longlink.cc的線程函數__Run中,當連接處理完畢后,下面繼續執行的是__RunReadWrite。我們來看看:

void LongLink::__RunReadWrite(SOCKET _sock, ErrCmdType& _errtype, int& _errcode, ConnectProfile& _profile) {
    // Alarm消息觸發處理綁定在__OnAlarm上
    Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false);
    Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false);
}

首先是2個Alarm,這里要理解就需要看看這個Alarm是個什么東西:
/mars-master/mars/comm/alarm.h

    template
    explicit Alarm(const T& _op, bool _inthread = true)
        : target_(detail::transform(_op))
        , reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))
        , runthread_(boost::bind(&Alarm::__Run, this), "alarm")
        , inthread_(_inthread)
        , seq_(0), status_(kInit)
        , after_(0) , starttime_(0) , endtime_(0)
        , reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))
#ifdef ANDROID
        , wakelock_(NULL)
#endif
    {}

構造函數。這里需要逐句分析,首先是target_(detail::transform(_op))。簡單看是個賦值語句,后面的參數需要看這個:
/mars-master/mars/comm/thread/runnable.h

// base template for no argument functor
template 
struct TransformImplement {
    static Runnable* transform(const T& t) {
        return new RunnableFunctor(t);
    }
};

template 
inline Runnable* transform(const T& t) {
    return TransformImplement::transform(t);
}

1.這里使用的是c++魔板,直接new了一個RunnableFunctor對象,這個對象是個runnable,其實就是將這個傳遞進來的參數t包裝成了一個runnable,在適當的時候調用他的run方法的時候就會調用這個t了。那么帶入到具體的內容中,這個t是_op,就是boost::bind(&LongLink::__OnAlarm, this)。這里又使用了c++的boost庫,做了bind操作,綁定了參數this也就是LongLink與函數體LongLink::__OnAlarm。好了,現在target_是個包裝好的runnable了,在適當的時候可以執行LongLink::__OnAlarm。

2.reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))。首先看MessageQueue::InstallAsyncHandler:
/mars-master/mars/comm/messagequeue/message_queue.cc

MessageHandler_t InstallAsyncHandler(const MessageQueue_t& id) {
    ASSERT(0 != id);
    return InstallMessageHandler(__AsyncInvokeHandler, false, id);
}

MessageHandler_t InstallMessageHandler(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid) {
    ASSERT(bool(_handler));

    ScopedLock lock(sg_messagequeue_map_mutex);
    const MessageQueue_t& id = _messagequeueid;

    if (sg_messagequeue_map.end() == sg_messagequeue_map.find(id)) {
        ASSERT2(false, "%" PRIu64, id);
        return KNullHandler;
    }

    HandlerWrapper* handler = new HandlerWrapper(_handler, _recvbroadcast, _messagequeueid, __MakeSeq());
    sg_messagequeue_map[id].lst_handler.push_back(handler);
    return handler->reg;
}

struct HandlerWrapper {
    HandlerWrapper(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid, unsigned int _seq)
        : handler(_handler), recvbroadcast(_recvbroadcast) {
        reg.seq = _seq;
        reg.queue = _messagequeueid;
    }

    MessageHandler_t reg;
    MessageHandler handler;
    bool recvbroadcast;
};

生成了一個HandlerWrapper,并將其保留在了一個map中,隨后返回了MessageHandler_t,其中保存了_seq與_messagequeueid。這里我的感覺是這個handler就是個類似句柄的東西,保存MessageHandler的一個關聯關系,即消息隊列與seq碼(這里是個自增的靜態變量)。實際上調用者只要有這個MessageHandler_t就可以了。最后將這個MessageHandler_t賦值給了reg_async_。這里又有一個對象ScopeRegister是個MessageHandler_t的包裝對象,里面統一封裝了方法來操作MessageHandler_t。

3.runthread_(boost::bind(&Alarm::__Run, this), "alarm")。一個線程對象,線程函數是Alarm::__Run。沒事什么好解釋的。

4.inthread_(_inthread), seq_(0), status_(kInit), after_(0) , starttime_(0) , endtime_(0)。都是簡單賦值,暫時不去管它。

5.reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))。類似2。

好了,這個Alarm可以看做是個消息處理,在有消息觸發的情況下會調用到具體的函數中,一般是__OnAlarm。

回到__RunReadWrite,往下看。首先是個while的死循環,我們多帶帶摘錄如下:

    while (true) {
        ......
        if (!alarmnoopinterval.IsWaiting()) {
            ......
            if (__NoopReq(noop_xlog, alarmnooptimeout, has_late_toomuch)) {
                is_noop = true;
                __NotifySmartHeartbeatHeartReq(_profile, last_noop_interval, last_noop_actual_interval);
            }
            ......
        }
        
        ......
        // socket處理
        SocketSelect sel(readwritebreak_, true);
        sel.PreSelect();
        sel.Read_FD_SET(_sock);
        sel.Exception_FD_SET(_sock);
        
        ScopedLock lock(mutex_);
        
        if (!lstsenddata_.empty()) sel.Write_FD_SET(_sock);
        
        lock.unlock();
        
        int retsel = sel.Select(10 * 60 * 1000);
        ......
        // 處理發送(寫入)
        if (sel.Write_FD_ISSET(_sock) && !lstsenddata_.empty()) {
            ......
            ssize_t writelen = ::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0);
            ......
            while (it != lstsenddata_.end() && 0 < writelen) {
                if (0 == it->data.Pos()) OnSend(it->taskid);
                
                if ((size_t)writelen >= it->data.PosLength()) {
                    xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, it->data.PosLength(), it->data.PosLength(), it->data.Length()) >> xlog_group;
                    writelen -= it->data.PosLength();
                    if (!it->task_info.empty()) sent_taskids[it->taskid] = it->task_info;
                    LongLinkNWriteData nwrite(it->taskid, it->data.PosLength(), it->cmdid, it->task_info);
                    nsent_datas.push_back(nwrite);
                    
                    it = lstsenddata_.erase(it);
                } else {
                    xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, writelen, it->data.PosLength(), it->data.Length()) >> xlog_group;
                    it->data.Seek(writelen, AutoBuffer::ESeekCur);
                    writelen = 0;
                }
            }
            
        }
        
        ......
        // 處理接收(讀取)
        if (sel.Read_FD_ISSET(_sock)) {
            bufrecv.AllocWrite(64 * 1024, false);
            ssize_t recvlen = recv(_sock, bufrecv.PosPtr(), 64 * 1024, 0);
            ......
            while (0 < bufrecv.Length()) {
                uint32_t cmdid = 0;
                uint32_t taskid = Task::kInvalidTaskID;
                size_t packlen = 0;
                AutoBuffer body;
                
                int unpackret = longlink_unpack(bufrecv, cmdid, taskid, packlen, body);
                
                if (LONGLINK_UNPACK_FALSE == unpackret) {
                    xerror2(TSF"task socket recv sock:%0, unpack error dump:%1", _sock, xdump(bufrecv.Ptr(), bufrecv.Length()));
                    _errtype = kEctNetMsgXP;
                    _errcode = kEctNetMsgXPHandleBufferErr;
                    goto End;
                }
                
                xinfo2(TSF"task socket recv sock:%_, pack recv %_ taskid:%_, cmdid:%_, %_, packlen:(%_/%_)", _sock, LONGLINK_UNPACK_CONTINUE == unpackret ? "continue" : "finish", taskid, cmdid, sent_taskids[taskid], LONGLINK_UNPACK_CONTINUE == unpackret ? bufrecv.Length() : packlen, packlen);
                lastrecvtime_.gettickcount();
                
                if (LONGLINK_UNPACK_CONTINUE == unpackret) {
                    OnRecv(taskid, bufrecv.Length(), packlen);
                    break;
                } else {
                    
                    sent_taskids.erase(taskid);
                    
                    bufrecv.Move(-(int)(packlen));
                    
                    if (__NoopResp(cmdid, taskid, body, alarmnooptimeout, _profile)) {
                        xdebug2(TSF"noopresp span:%0", alarmnooptimeout.ElapseTime());
                        is_noop = false;
                    } else {
                        OnResponse(kEctOK, 0, cmdid, taskid, body, _profile);
                    }
                }
            }
        }
    }

// 收尾,整個looper退出
End:
    

從while中的代碼能夠看出,基本上就是上面摘錄的幾塊,如下所示:
1.__NoopReq調用,無數據狀態處理;
2.socket的select處理;
3.處理發送寫入部分;
4.處理接收讀取部分;

這里需要逐個分析了:
1.__NoopReq:
先看代碼,并不長:

bool LongLink::__NoopReq(XLogger& _log, Alarm& _alarm, bool need_active_timeout) {
    AutoBuffer buffer;
    uint32_t req_cmdid = 0;
    bool suc = false;
    
    if (identifychecker_.GetIdentifyBuffer(buffer, req_cmdid)) {
        suc = Send((const unsigned char*)buffer.Ptr(), (int)buffer.Length(), req_cmdid, Task::kLongLinkIdentifyCheckerTaskID);
        identifychecker_.SetSeq(Task::kLongLinkIdentifyCheckerTaskID);
        xinfo2(TSF"start noop synccheck taskid:%0, cmdid:%1, ", Task::kLongLinkIdentifyCheckerTaskID, req_cmdid) >> _log;
    } else {
        AutoBuffer body;
        longlink_noop_req_body(body);
        suc = SendWhenNoData((const unsigned char*) body.Ptr(), body.Length(), longlink_noop_cmdid(), Task::kNoopTaskID);
        xinfo2(TSF"start noop taskid:%0, cmdid:%1, ", Task::kNoopTaskID, longlink_noop_cmdid()) >> _log;
    }
    
    if (suc) {
        _alarm.Cancel();
        _alarm.Start(need_active_timeout ? (2* 1000) : (10 * 1000));
    } else {
        xerror2("send noop fail");
    }
    
    return suc;
}

說實話,這里看的不是很清晰 ,因為之前肯定有忽略的部分,我的猜測是在走了一個發送信令的校驗后,根據返回的值的不同決定是執行send發送數據(使用校驗填充好的buffer),還是走SendWhenNoData發送(自行填充請求體)沒有數據的情況。暫時先往下看一步,看看Send:

bool LongLink::__Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) {
    lstsenddata_.push_back(LongLinkSendData());

    lstsenddata_.back().cmdid = _cmdid;
    lstsenddata_.back().taskid = _taskid;
    longlink_pack(_cmdid, _taskid, _pbuf, _len, lstsenddata_.back().data);
    lstsenddata_.back().data.Seek(0, AutoBuffer::ESeekStart);
    lstsenddata_.back().task_info = _task_info;

    readwritebreak_.Break();
    return true;
}

這里能夠清晰的看到,在使用lstsenddata_這個隊列,來進行發送的請求,實際上就是向隊列中增加一項。那么現在的問題就在于這個發送的數據時怎么來的了。這就需要我們弄懂LongLinkIdentifyChecker這個玩意兒。
/mars-master/mars/stn/src/longlink_identify_checker.cc

bool LongLinkIdentifyChecker::GetIdentifyBuffer(AutoBuffer &_buffer, uint32_t &_cmdid)
{
    if (has_checked_) return false;
    
    hash_code_buffer_.Reset();
    _buffer.Reset();

    IdentifyMode mode = (IdentifyMode)GetLonglinkIdentifyCheckBuffer(_buffer, hash_code_buffer_, (int&)_cmdid);

    switch (mode)
    {
    case kCheckNever:
        {
            has_checked_ = true;
        }
        break;
    case kCheckNext:
        {
            has_checked_ = false;
        }
        break;
    case kCheckNow:
        {
            cmd_id_ = _cmdid;
            return true;
        }
        break;
    default:
        xassert2(false);
    }
    
    return false;
}

調用了GetLonglinkIdentifyCheckBuffer,我們追溯到stn_logic.cc中:

    int  GetLonglinkIdentifyCheckBuffer(AutoBuffer& identify_buffer, AutoBuffer& buffer_hash, int32_t& cmdid) {
        xassert2(sg_callback != NULL);
        return sg_callback->GetLonglinkIdentifyCheckBuffer(identify_buffer, buffer_hash, cmdid);
    }

實際上是對sg_callback這個回調的調用。最終我找到的線索是在MarsServiceNative.java上層的onCreate中設置了回調:

        // set callback
        AppLogic.setCallBack(stub);
        StnLogic.setCallBack(stub);
        SdtLogic.setCallBack(stub);

再接著找到了MarsServiceStub.java中的getLongLinkIdentifyCheckBuffer:

    @Override
    public int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID) {
        // Send identify request buf to server
        // identifyReqBuf.write();

        return ECHECK_NEVER;
    }

返回的是ECHECK_NEVER,沒有填充buffer。也即是說has_checked_ = true,然后返回false。其實看到這一刻我是崩潰的,真心不知道是想干嘛。我們只能這么理解,只要進入__NoopReq其實都是在走SendWhenNoData發送無數據狀態。好吧,我們重新回到__RunReadWrite中看一下。每次在while循環中一上來只要不是alarmnoopinterval正在等待的狀態,那么就走一個發送無數據狀態。看看發送無數據的代碼:

bool LongLink::SendWhenNoData(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid) {
    ScopedLock lock(mutex_);

    if (kConnected != connectstatus_) return false;
    if (!lstsenddata_.empty()) return false;

    return __Send(_pbuf, _len, _cmdid, _taskid, "");
}

其實是檢查lstsenddata_是否有內容,如果沒有才發送。那么好吧,這里整體理解就是每次whie循環開始都會檢查如果發送隊列中沒有數據的時候,發送一個特定的無數據狀態來確認連接。但是這里寫的比較復雜,還需要回調回上層java的代碼中,讓其來控制狀態,從而根據狀態控制流程,只能說考慮的很周全,任何情況在任何節點都可以有處理。吐槽下如果我們自己寫來規劃這部分的時候大多數時候都是最對無數據檢測放在下層,然后直接就發送了,不會讓上層這里進行什么干涉吧。其實這里還有些點沒有詳細的分析很清楚,原諒文章有限,畢竟不能偏離主線太多。

2.socket的select操作。
這里倒沒什么可說的,前面的設置,為后面的sel.Select(10 60 1000)做準備,內部采用poll來運作。

3.發送過程。
先是判斷如果發送隊列里面有內容,執行下面的::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0)。這里注意,參數給定的是隊列的第一個的data,也就是說這里是取出第一個執行發送。
下面就是一個while循環,將發送隊列過了一遍。如果剛才發送的數據大小與待發送的實際數據長度相等,那么認為是發送完了這一個,從隊列中移除這一個,然后下一次while會自動取下一個了。如果沒有;認為是沒發完,位移數據,下次while仍然獲取到這個item,但是數據位移已經變了,因此繼續發送下面的數據。經過這個while之后,所有的發送隊列中的數據都應當發送完畢了。

4.接收過程。
前面沒什么好說的,無非是開辟buffer空間,然后執行recv調用。之后進入一個while循環,條件是讀取的buffer有數據。
首先走一個解包調用,內部走的是__unpack_test,具體內容就不貼了,我簡單看了下,基本上就是解開頭部,頭部的信息標識了本次傳遞的基本信息,包括了版本號等內容,一個結構體,還是比較標準的。這里是嘗試解包,如果本次接收到的大小連頭部都不夠,那肯定返回錯誤,需要繼續接收了。那么從這個能夠看出,每次傳遞的數據都是帶有一個頭部的__STNetMsgXpHeader。這東西里面塞入的內容可以和客戶端的版本,當前這個信令的id等關聯起來。
再下去看到的就是對解包返回值的判斷了,如果一切順利,就走到sent_taskids.erase(taskid);這里需要注意,這個sent_taskids是個發送的taskid的map,這里可以推測發送和接受其實是關聯的,這里接收完畢移除這個保留項。然后走的__NoopResp這個調用。如果返回false表示不是空的信令返回,那么就走OnResponse。這個函數實際上是在LongLinkTaskManager中綁定了longlink_->OnResponse = boost::bind(&LongLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6);綁定在了LongLinkTaskManager::__OnResponse這里。

void LongLinkTaskManager::__OnResponse(ErrCmdType _error_type, int _error_code, uint32_t _cmdid, uint32_t _taskid, AutoBuffer& _body, const ConnectProfile& _connect_profile) {
    copy_wrapper body(_body);
    RETURN_LONKLINK_SYNC2ASYNC_FUNC(boost::bind(&LongLinkTaskManager::__OnResponse, this, _error_type, _error_code, _cmdid, _taskid, body, _connect_profile));

    ......
    
    int err_code = 0;
    int handle_type = Buf2Resp(it->task.taskid, it->task.user_context, body, err_code, Task::kChannelLong);
    
    switch(handle_type){
        case kTaskFailHandleNoError:
        {
            dynamic_timeout_.CgiTaskStatistic(it->task.cgi, (unsigned int)it->transfer_profile.send_data_size + (unsigned int)body->Length(), ::gettickcount() - it->transfer_profile.start_send_time);
            __SingleRespHandle(it, kEctOK, err_code, handle_type, _connect_profile);
            xassert2(fun_notify_network_err_);
            fun_notify_network_err_(__LINE__, kEctOK, err_code, _connect_profile.ip, _connect_profile.port);
        }
            break;
        ......
    }

}

其實就2件事,通過Buf2Resp底層回包返回給上層解析。如果沒有錯誤kTaskFailHandleNoError,會執行__SingleRespHandle:

bool LongLinkTaskManager::__SingleRespHandle(std::list::iterator _it, ErrCmdType _err_type, int _err_code, int _fail_handle, const ConnectProfile& _connect_profile) {
    ......
    int cgi_retcode = fun_callback_(_err_type, _err_code, _fail_handle, _it->task, (unsigned int)(curtime - _it->start_task_time));
    ......
}

這里的關鍵點就這一個,調用回調,回調的綁定在net_core.cc中的NetCore構造里,longlink_task_manager_->fun_callback_ = boost::bind(&NetCore::__CallBack, this, (int)kCallFromLong, _1, _2, _3, _4, _5);,最終執行的是NetCore::__CallBack:

int NetCore::__CallBack(int _from, ErrCmdType _err_type, int _err_code, int _fail_handle, const Task& _task, unsigned int _taskcosttime) {

    if (task_callback_hook_ && 0 == task_callback_hook_(_from, _err_type, _err_code, _fail_handle, _task)) {
        xwarn2(TSF"task_callback_hook let task return. taskid:%_, cgi%_.", _task.taskid, _task.cgi);
        return 0;
    }

    if (kEctOK == _err_type || kTaskFailHandleTaskEnd == _fail_handle)
        return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

    if (kCallFromZombie == _from) return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

#ifdef USE_LONG_LINK
    if (!zombie_task_manager_->SaveTask(_task, _taskcosttime))
#endif
        return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

    return 0;
}

看到了吧,走了OnTaskEnd,任務結束。

此文從中間部分開始粗糙了,前面鋪墊的東西后面沒有講到,心不靜的時候分析東西效果確實不大好。總而言之既然堅持寫完了,這里還是留個記錄吧,日后有機會的時候會回顧把這部分完善好。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70720.html

相關文章

  • 微信開源mars源碼分析2—上層samples分析

    摘要:本來是想直接深入到的核心層去看的,但是發現其實上面的部分還有好些沒有分析到,因此回來繼續分析。另外一個,是專用于統計的,我們暫時不去關注。具體的內容我會在后面的核心層分析的時候指出。準備下一篇進行的核心層分析吧。 本來是想直接深入到mars的核心層去看的,但是發現其實上面的samples部分還有好些沒有分析到,因此回來繼續分析。ConversationActivity這個類中實際上還做...

    MyFaith 評論0 收藏0
  • 微信開源mars源碼分析1—上層samples分析

    摘要:微信已經開源了,但是市面上相關的文章較少,即使有也是多在于使用等這些,那么這次我希望能夠從這個直接用于底層通訊的部分進行個分析。首先明確下,微信用了的開源協議庫,來代替和。核心的部分我們先放下,下一篇再深入分析。 微信已經開源了mars,但是市面上相關的文章較少,即使有也是多在于使用xlog等這些,那么這次我希望能夠從stn這個直接用于im底層通訊的部分進行個分析。為了能分析的全面些,...

    caiyongji 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<