国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Node.js - 阿里Egg的多進程模型和進程間通訊

Cheng_Gang / 895人閱讀

摘要:第二種是主進程創建監聽后發送給感興趣的工作進程,由工作進程負責直接接收連接。繼續看,可以看到它捕獲了事件,并在回調函數里面關閉連接,關閉本身進程,斷開與的通道。參考與引用多進程模型和進程間通訊源碼解析之

前言

最近用Egg作為底層框架開發項目,好奇其多進程模型的管理實現,于是學習了解了一些東西,順便記錄下來。文章如有錯誤, 請輕噴

為什么需要多進程

伴隨科技的發展, 現在的服務器基本上都是多核cpu的了。然而,Node是一個單進程單線程語言(對于開發者來說是單線程,實際上不是)。我們都知道,cpu的調度單位是線程,而基于Node的特性,那么我們每次只能利用一個cpu。這樣不僅僅利用率極低,而且容錯更是不能接受(出錯時會崩潰整個程序)。所以,Node有了cluster來協助我們充分利用服務器的資源。

cluster工作原理
關于cluster的工作原理推薦大家看這篇文章,這里簡單總結一下:

子進程的端口監聽會被hack掉,而是統一由master的內部TCP監聽,所以不會出現多個子進程監聽同一端口而報錯的現象。

請求統一經過master的內部TCP,TCP的請求處理邏輯中,會挑選一個worker進程向其發送一個newconn內部消息,隨消息發送客戶端句柄。(這里的挑選有兩種方式,第一種是除Windows外所有平臺的默認方法循環法,即由主進程負責監聽端口,接收新連接后再將連接循環分發給工作進程。在分發中使用了一些內置技巧防止工作進程任務過載。第二種是主進程創建監聽socket后發送給感興趣的工作進程,由工作進程負責直接接收連接。)

worker進程收到句柄后,創建客戶端實例(net.socket)執行具體的業務邏輯,然后返回。

如圖:

圖引用出處

多進程模型

先看一下Egg官方文檔的進程模型

                +--------+          +-------+
                | Master |<-------->| Agent |
                +--------+          +-------+
                ^   ^    ^
               /    |     
             /      |       
           /        |         
         v          v          v
+----------+   +----------+   +----------+
| Worker 1 |   | Worker 2 |   | Worker 3 |
+----------+   +----------+   +----------+
類型 進程數量 作用 穩定性 是否運行業務代碼
Master 1 進程管理,進程間消息轉發 非常高
Agent 1 后臺運行工作(長連接客戶端) 少量
Worker 一般為cpu核數 執行業務代碼 一般

大致上就是利用Master作為主線程,啟動Agent作為秘書進程協助Worker處理一些公共事務(日志之類),啟動Worker進程執行真正的業務代碼。

多進程的實現 流程相關代碼

首先從Master入手,這里暫時認為Master是最頂級的進程(事實上還有一個parent進程,待會再說)。

/**
 * start egg app
 * @method Egg#startCluster
 * @param {Object} options {@link Master}
 * @param {Function} callback start success callback
 */
exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};

先從Master的構造函數看起

constructor(options) {
  super();
  // 初始化參數
  this.options = parseOptions(options);
  // worker進程的管理類 詳情見 Manager及Messenger篇
  this.workerManager = new Manager();
  // messenger類, 詳情見 Manager及Messenger篇
  this.messenger = new Messenger(this);
  // 設置一個ready事件 詳情見get-ready npm包
  ready.mixin(this);
  // 是否為生產環境
  this.isProduction = isProduction();
  this.agentWorkerIndex = 0;
  // 是否關閉
  this.closed = false;
  ...

  接下來看的是ready的回調函數及注冊的各類事件:
  this.ready(() => {
    // 將開始狀態設置為true
    this.isStarted = true;
    const stickyMsg = this.options.sticky ? " with STICKY MODE!" : "";
    this.logger.info("[master] %s started on %s (%sms)%s",
    frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);

    // 發送egg-ready至各個進程并觸發相關事件
    const action = "egg-ready";
    this.messenger.send({ action, to: "parent", data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
    this.messenger.send({ action, to: "app", data: this.options });
    this.messenger.send({ action, to: "agent", data: this.options });
    // start check agent and worker status
    this.workerManager.startCheck();
    });
    // 注冊各類事件
    this.on("agent-exit", this.onAgentExit.bind(this));
    this.on("agent-start", this.onAgentStart.bind(this));
    ...
    // 檢查端口并 Fork一個Agent
    detectPort((err, port) => {
      ... 
      this.forkAgentWorker();
    }
  });
}

綜上, 可以看到Master的構造函數主要是初始化和注冊各類相應的事件, 最后運行的是forkAgentWorker函數, 該函數的關鍵代碼可以看到:

const agentWorkerFile = path.join(__dirname, "agent_worker.js");
// 通過child_process執行一個Agent
const agentWorker = childprocess.fork(agentWorkerFile, args, opt);

繼續到agent_worker.js上面看,agent_worker實例化一個agent對象,agent_worker.js有一句關鍵代碼:

agent.ready(() => {
  agent.removeListener("error", startErrorHandler); // 清除錯誤監聽的事件
  process.send({ action: "agent-start", to: "master" }); // 向master發送一個agent-start的動作
});

可以看到, agent_worker.js中的代碼向master發出了一個信息, 動作為agent-start, 再回到Master中, 可以看到其注冊了兩個事件, 分別為once的forkAppWorkers和 on的onAgentStart

this.on("agent-start", this.onAgentStart.bind(this));
this.once("agent-start", this.forkAppWorkers.bind(this));

先看onAgentStart函數, 這個函數相對簡單, 就是一些信息的傳遞:

onAgentStart() {
    this.agentWorker.status = "started";

    // Send egg-ready when agent is started after launched
    if (this.isAllAppWorkerStarted) {
      this.messenger.send({ action: "egg-ready", to: "agent", data: this.options });
    }

    this.messenger.send({ action: "egg-pids", to: "app", data: [ this.agentWorker.pid ] });
    // should send current worker pids when agent restart
    if (this.isStarted) {
      this.messenger.send({ action: "egg-pids", to: "agent", data: this.workerManager.getListeningWorkerIds() });
    }

    this.messenger.send({ action: "agent-start", to: "app" });
    this.logger.info("[master] agent_worker#%s:%s started (%sms)",
      this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
  }

然后會執行forkAppWorkers函數,該函數主要是借助cfork包fork對應的工作進程, 并注冊一系列相關的監聽事件,

...
cfork({
  exec: this.getAppWorkerFile(),
  args,
  silent: false,
  count: this.options.workers,
  // don"t refork in local env
  refork: this.isProduction,
});
...
// 觸發app-start事件
cluster.on("listening", (worker, address) => {
  this.messenger.send({
    action: "app-start",
    data: { workerPid: worker.process.pid, address },
    to: "master",
    from: "app",
  });
});

可以看到forkAppWorkers函數在監聽Listening事件時,會觸發master上的app-start事件。

this.on("app-start", this.onAppStart.bind(this));

...
// master ready回調觸發
if (this.options.sticky) {
  this.startMasterSocketServer(err => {
    if (err) return this.ready(err);
      this.ready(true);
  });
} else {
  this.ready(true);
}

// ready回調 發送egg-ready狀態到各個進程
const action = "egg-ready";
this.messenger.send({ action, to: "parent", data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: "app", data: this.options });
this.messenger.send({ action, to: "agent", data: this.options });

// start check agent and worker status
if (this.isProduction) {
  this.workerManager.startCheck();
}

總結下:

Master.constructor: 先執行Master的構造函數, 里面有個detect函數被執行

Detect: Detect => forkAgentWorker()

forkAgentWorker: 獲取Agent進程, 向master觸發agent-start事件

執行onAgentStart函數, 執行forkAppWorker函數(once)

onAgentStart => 發送各類信息, forkAppWorker => 向master觸發 app-start事件

App-start事件 觸發 onAppStart()方法

onAppStart => 設置ready(true) => 執行ready的回調函數

Ready() = > 發送egg-ready到各個進程并觸發相關事件, 執行startCheck()函數

+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|
進程守護

根據官方文檔,進程守護主要是依賴于graceful和egg-cluster這兩個庫。

未捕獲異常

關閉異常 Worker 進程所有的 TCP Server(將已有的連接快速斷開,且不再接收新的連接),斷開和 Master 的 IPC 通道,不再接受新的用戶請求。

Master 立刻 fork 一個新的 Worker 進程,保證在線的『工人』總數不變。

異常 Worker 等待一段時間,處理完已經接受的請求后退出。

+---------+                 +---------+
|  Worker |                 |  Master |
+---------+                 +----+----+
     | uncaughtException         |
     +------------+              |
     |            |              |                   +---------+
     | <----------+              |                   |  Worker |
     |                           |                   +----+----+
     |        disconnect         |   fork a new worker    |
     +-------------------------> + ---------------------> |
     |         wait...           |                        |
     |          exit             |                        |
     +-------------------------> |                        |
     |                           |                        |
    die                          |                        |
                                 |                        |
                                 |                        |

由執行的app文件可知, app實際上是繼承于Application類, 該類下面調用了graceful()

onServer(server) {
    ......
    graceful({
      server: [ server ],
      error: (err, throwErrorCount) => {
        ......
      },
    });
    ......
  }

繼續看graceful, 可以看到它捕獲了process.on("uncaughtException")事件, 并在回調函數里面關閉TCP連接, 關閉本身進程, 斷開與masterIPC通道。

process.on("uncaughtException", function (err) {
    ......
    // 對http連接設置 Connection: close響應頭
    servers.forEach(function (server) {
      if (server instanceof http.Server) {
        server.on("request", function (req, res) {
          // Let http server set `Connection: close` header, and close the current request socket.
          req.shouldKeepAlive = false;
          res.shouldKeepAlive = false;
          if (!res._header) {
            res.setHeader("Connection", "close");
          }
        });
      }
    });

    // 設置一個定時函數關閉子進程, 并退出本身進程
    // make sure we close down within `killTimeout` seconds
    var killtimer = setTimeout(function () {
      console.error("[%s] [graceful:worker:%s] kill timeout, exit now.", Date(), process.pid);
      if (process.env.NODE_ENV !== "test") {
        // kill children by SIGKILL before exit
        killChildren(function() {
          // 退出本身進程
          process.exit(1);
        });
      }
    }, killTimeout);

    // But don"t keep the process open just for that!
    // If there is no more io waitting, just let process exit normally.
    if (typeof killtimer.unref === "function") {
      // only worked on node 0.10+
      killtimer.unref();
    }

    var worker = options.worker || cluster.worker;

    // cluster mode
    if (worker) {
      try {
        // 關閉TCP連接
        for (var i = 0; i < servers.length; i++) {
          var server = servers[i];
          server.close();
        }
      } catch (er1) {
        ......
      }

      try {
        // 關閉ICP通道
        worker.disconnect();
      } catch (er2) {
        ......
      }
    }
  });

ok, 關閉了IPC通道后, 我們繼續看cfork文件, 即上面提到的fork worker的包, 里面監聽了子進程的disconnect事件, 他會根據條件判斷是否重新fork一個新的子進程

cluster.on("disconnect", function (worker) {
    ......
    // 存起該pid
    disconnects[worker.process.pid] = utility.logDate();
    if (allow()) {
      // fork一個新的子進程
      newWorker = forkWorker(worker._clusterSettings);
      newWorker._clusterSettings = worker._clusterSettings;
    } else {
      ......
    }
  });

一般來說, 這個時候會繼續等待一會然后就執行了上面說到的定時函數了, 即退出進程

OOM、系統異常
關于這種系統異常, 有時候在子進程中是不能捕獲到的, 我們只能在master中進行處理, 也就是cfork包。

cluster.on("exit", function (worker, code, signal) {
    // 是程序異常的話, 會通過上面提到的uncatughException重新fork一個子進程, 所以這里就不需要了
    var isExpected = !!disconnects[worker.process.pid];
    if (isExpected) {
      delete disconnects[worker.process.pid];
      // worker disconnect first, exit expected
      return;
    }
    // 是master殺死的子進程, 無需fork
    if (worker.disableRefork) {
      // worker is killed by master
      return;
    }

    if (allow()) {
      newWorker = forkWorker(worker._clusterSettings);
      newWorker._clusterSettings = worker._clusterSettings;
    } else {
      ......
    }
    cluster.emit("unexpectedExit", worker, code, signal);
  });
進程間通信(IPC)
上面一直提到各種進程間通信,細心的你可能已經發現 cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之間,Worker 與 Agent 進程互相間是沒有的。那么 Worker 之間想通訊該怎么辦呢?是的,通過 Master 來轉發。
廣播消息: agent => all workers
                  +--------+          +-------+
                  | Master |<---------| Agent |
                  +--------+          +-------+
                 /    |     
                /     |      
               /      |       
              /       |        
             v        v         v
  +----------+   +----------+   +----------+
  | Worker 1 |   | Worker 2 |   | Worker 3 |
  +----------+   +----------+   +----------+

指定接收方: one worker => another worker
                  +--------+          +-------+
                  | Master |----------| Agent |
                  +--------+          +-------+
                 ^    |
     send to    /     |
    worker 2   /      |
              /       |
             /        v
  +----------+   +----------+   +----------+
  | Worker 1 |   | Worker 2 |   | Worker 3 |
  +----------+   +----------+   +----------+

master中, 可以看到當agent和app被fork時, 會監聽他們的信息, 同時將信息轉化成一個對象:

agentWorker.on("message", msg => {
  if (typeof msg === "string") msg = { action: msg, data: msg };
  msg.from = "agent";
  this.messenger.send(msg);
});

worker.on("message", msg => {
  if (typeof msg === "string") msg = { action: msg, data: msg };
  msg.from = "app";
  this.messenger.send(msg);
});

可以看到最后調用的是messenger.send, 而messengeer.send就是根據from和to來決定將信息發送到哪里

send(data) {
    if (!data.from) {
      data.from = "master";
    }
    ......

    // app -> master
    // agent -> master
    if (data.to === "master") {
      debug("%s -> master, data: %j", data.from, data);
      // app/agent to master
      this.sendToMaster(data);
      return;
    }

    // master -> parent
    // app -> parent
    // agent -> parent
    if (data.to === "parent") {
      debug("%s -> parent, data: %j", data.from, data);
      this.sendToParent(data);
      return;
    }

    // parent -> master -> app
    // agent -> master -> app
    if (data.to === "app") {
      debug("%s -> %s, data: %j", data.from, data.to, data);
      this.sendToAppWorker(data);
      return;
    }

    // parent -> master -> agent
    // app -> master -> agent,可能不指定 to
    if (data.to === "agent") {
      debug("%s -> %s, data: %j", data.from, data.to, data);
      this.sendToAgentWorker(data);
      return;
    }
  }

master則是直接根據action信息emit對應的注冊事件

sendToMaster(data) {
  this.master.emit(data.action, data.data);
}

而agent和worker則是通過一個sendmessage包, 實際上就是調用下面類似的方法

 // 將信息傳給子進程
 agent.send(data)
 worker.send(data)

最后, 在agent和app都繼承的基礎類EggApplication上, 調用了Messenger類, 該類內部的構造函數如下:

constructor() {
    super();
    ......
    this._onMessage = this._onMessage.bind(this);
    process.on("message", this._onMessage);
  }

_onMessage(message) {
    if (message && is.string(message.action)) {
      // 和master一樣根據action信息emit對應的注冊事件  
      this.emit(message.action, message.data);
    }
  }

總結一下:
思路就是利用事件機制和IPC通道來達到各個進程之間的通信。

其他

學習過程中有遇到一個timeout.unref()的函數, 關于該函數推薦大家參考這個問題的6樓回答

總結

從前端思維轉到后端思維其實還是很吃力的,加上Egg的進程管理實現確實非常厲害, 所以花了很多時間在各種api和思路思考上。

參考與引用

多進程模型和進程間通訊
Egg 源碼解析之 egg-cluster

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/103585.html

相關文章

  • 前端最實用書簽(持續更新)

    摘要:前言一直混跡社區突然發現自己收藏了不少好文但是管理起來有點混亂所以將前端主流技術做了一個書簽整理不求最多最全但求最實用。 前言 一直混跡社區,突然發現自己收藏了不少好文但是管理起來有點混亂; 所以將前端主流技術做了一個書簽整理,不求最多最全,但求最實用。 書簽源碼 書簽導入瀏覽器效果截圖showImg(https://segmentfault.com/img/bVbg41b?w=107...

    sshe 評論0 收藏0
  • 實時聯網游戲后臺服務技術選型與挑戰(網絡接入篇)

    摘要:概述本系列文章將從開發者角度梳理開發實時聯網游戲后臺服務過程中可能面臨的挑戰,并針對性地提供相應解決思路,期望幫助開發者依據自身游戲特點做出合理的技術選型。多路復用避免了讀寫阻塞,減少了上下文切換,提升了利用率和系統吞吐率。 概述:本系列文章將從開發者角度梳理開發實時聯網游戲后臺服務過程中可能面臨的挑戰,并針對性地提供相應解決思路,期望幫助開發者依據自身游戲特點做出合理的技術選型。 關...

    zhisheng 評論0 收藏0
  • PHP進程通信

    摘要:一進程間通信理解間進程通信機制,先了解下進程間有哪些通訊機制歷史發展按照歷史來源主要有兩大塊的管道,,信號的消息隊列,共享內存,信號燈。信號量主要作為進程間,以及進程內部線程之間的通訊手段。主要依賴,兼容擴展實現方式的進程間通信之消息隊列。 PHP間進程如何通信,PHP相關的服務的IPC是實現方式,IPC的思想如何用到項目中。 一、linux進程間通信 理解php間進程通信機制,先了解...

    haobowd 評論0 收藏0
  • 系列3|走進Node.js之多進程模型

    摘要:例如,在方法中,如果需要主從進程之間建立管道,則通過環境變量來告知從進程應該綁定的相關的文件描述符,這個特殊的環境變量后面會被再次涉及到。 文:正龍(滬江網校Web前端工程師)本文原創,轉載請注明作者及出處 之前的文章走進Node.js之HTTP實現分析中,大家已經了解 Node.js 是如何處理 HTTP 請求的,在整個處理過程,它僅僅用到單進程模型。那么如何讓 Web 應用擴展到...

    snowell 評論0 收藏0
  • 從單租戶IaaS到多租戶PaaS——金融級別大數據平臺MaxCompute的多租戶隔離實踐

    摘要:摘要在年云棲大會北京峰會的大數據專場中,來自阿里云的高級技術專家李雪峰帶來了主題為金融級別大數據平臺的多租戶隔離實踐的演講。三是運行隔離機制。針對這一問題,提供了多層隔離嵌套方案以便規避這種潛在的安全風險。 摘要:在2017年云棲大會?北京峰會的大數據專場中,來自阿里云的高級技術專家李雪峰帶來了主題為《金融級別大數據平臺的多租戶隔離實踐》的演講。在分享中,李雪峰首先介紹了基于傳統Iaa...

    beanlam 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<