摘要:層也就是網絡傳輸層,在遠程通信中必然會涉及到傳輸。值為,不等待消息發出,將消息放入隊列,即刻返回。三該類繼承了并且實現接口,是服務器抽象類。八該類是多消息處理器的抽象類。創建線程池設置組件的獲得實例把線程池放到
遠程通訊——Transport層
目標:介紹Transport層的相關設計和邏輯、介紹dubbo-remoting-api中的transport包內的源碼解析。前言
先預警一下,該文篇幅會很長,做好心理準備。Transport層也就是網絡傳輸層,在遠程通信中必然會涉及到傳輸。它在dubbo 的框架設計中也處于倒數第二層,當然最底層是序列化,這個后面介紹。官方文檔對Transport層的解釋是抽象 mina 和 netty 為統一接口,以 Message 為中心,擴展接口為 Channel、Transporter、Client、Server、Codec。那我們現在先來看這個包下面的類圖:
可以看到有四個包繼承了AbstractChannel、AbstractServer、AbstractClient。也就是說現在Transport層是抽象mina、netty以及grizzly為統一接口。看完類圖,再來看看包結構:
下面的講解大致會按照類圖中類的順序往下講,盡量把client、server、channel、codec、dispacher五部分涉及到的內容一起講解。
源碼解析 (一)AbstractPeerpublic abstract class AbstractPeer implements Endpoint, ChannelHandler { private final ChannelHandler handler; private volatile URL url; /** * 是否正在關閉 */ // closing closed means the process is being closed and close is finished private volatile boolean closing; /** * 是否關閉完成 */ private volatile boolean closed; public AbstractPeer(URL url, ChannelHandler handler) { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.url = url; this.handler = handler; } }
該類實現了Endpoint和ChannelHandler兩個接口,要關注的兩個點:
實現ChannelHandler接口并且有在屬性中還有一個handler,下面很多實現方法也是直接調用了handler方法,這種模式叫做裝飾模式,這樣做可以對裝飾對象靈活的增強功能。對裝飾模式不懂的朋友可以google一下。有很多例子介紹。
在該類中有closing和closed屬性,在Endpoint中有很多關于關閉通道的操作,會有關閉中和關閉完成的狀態區分,在該類中就緩存了這兩個屬性來判斷關閉的狀態。
下面我就介紹該類中的send方法,其他方法比較好理解,到時候可以直接看源碼:
@Override public void send(Object message) throws RemotingException { // url中sent的配置項 send(message, url.getParameter(Constants.SENT_KEY, false)); }
該配置項是選擇是否等待消息發出:
sent值為true,等待消息發出,消息發送失敗將拋出異常。
sent值為false,不等待消息發出,將消息放入 IO 隊列,即刻返回。
對該類還有點糊涂的朋友,記住在ChannelHandler接口,該類就做了裝飾模式中裝飾角色,在Endpoint接口,只是維護了通道的正在關閉和關閉完成兩個狀態。
(二)AbstractEndpointpublic abstract class AbstractEndpoint extends AbstractPeer implements Resetable { /** * 日志記錄 */ private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class); /** * 編解碼器 */ private Codec2 codec; /** * 超時時間 */ private int timeout; /** * 連接超時時間 */ private int connectTimeout; public AbstractEndpoint(URL url, ChannelHandler handler) { super(url, handler); this.codec = getChannelCodec(url); // 優先從url配置中取,如果沒有,默認為1s this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 優先從url配置中取,如果沒有,默認為3s this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); } /** * 從url中獲得編解碼器的配置,并且返回該實例 * @param url * @return */ protected static Codec2 getChannelCodec(URL url) { String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); // 優先從Codec2的擴展類中找 if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } } }
該類是端點的抽象類,其中封裝了編解碼器以及兩個超時時間。基于dubbo 的SPI機制,獲得相應的編解碼器實現對象,編解碼器優先從Codec2的擴展類中尋找。
下面來看看該類中的reset方法:
@Override public void reset(URL url) { if (isClosed()) { throw new IllegalStateException("Failed to reset parameters " + url + ", cause: Channel closed. channel: " + getLocalAddress()); } try { // 判斷重置的url中有沒有攜帶timeout,有的話重置 if (url.hasParameter(Constants.TIMEOUT_KEY)) { int t = url.getParameter(Constants.TIMEOUT_KEY, 0); if (t > 0) { this.timeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { // 判斷重置的url中有沒有攜帶connect.timeout,有的話重置 if (url.hasParameter(Constants.CONNECT_TIMEOUT_KEY)) { int t = url.getParameter(Constants.CONNECT_TIMEOUT_KEY, 0); if (t > 0) { this.connectTimeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { // 判斷重置的url中有沒有攜帶codec,有的話重置 if (url.hasParameter(Constants.CODEC_KEY)) { this.codec = getChannelCodec(url); } } catch (Throwable t) { logger.error(t.getMessage(), t); } } @Deprecated public void reset(com.alibaba.dubbo.common.Parameters parameters) { reset(getUrl().addParameters(parameters.getParameters())); }
這個方法是Resetable接口中的方法,可以看到以前的reset實現方法都加上了@Deprecated注解,不推薦使用了,因為這種實現方式重置太復雜,需要把所有參數都設置一遍,比如我只想重置一個超時時間,但是其他值不變,如果用以前的reset,我需要在url中把所有值都帶上,就會很多余。現在用新的reset,每次只關心我需要重置的值,只更改為需要重置的值。比如上面的代碼所示,只想修改超時時間,那我就只在url中攜帶超時時間的參數。
(三)AbstractServer該類繼承了AbstractEndpoint并且實現Server接口,是服務器抽象類。重點實現了服務器的公共邏輯,比如發送消息,關閉通道,連接通道,斷開連接等。并且抽象了打開和關閉服務器兩個方法。
1.屬性/** * 服務器線程名稱 */ protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler"; private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class); /** * 線程池 */ ExecutorService executor; /** * 服務地址,也就是本地地址 */ private InetSocketAddress localAddress; /** * 綁定地址 */ private InetSocketAddress bindAddress; /** * 最大可接受的連接數 */ private int accepts; /** * 空閑超時時間,單位是s */ private int idleTimeout = 600; //600 seconds
該類的屬性比較好理解,就是稍微注意一下idleTimeout的單位是s。
2.構造函數public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); // 從url中獲得本地地址 localAddress = getUrl().toInetSocketAddress(); // 從url配置中獲得綁定的ip String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); // 從url配置中獲得綁定的端口號 int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); // 判斷url中配置anyhost是否為true或者判斷host是否為不可用的本地Host if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); // 從url中獲取配置,默認值為0 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); // 從url中獲取配置,默認600s this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { // 開啟服務器 doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } // 獲得線程池 //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
構造函數大部分邏輯就是從url中取配置,存到緩存中,并且做了開啟服務器的操作。具體的看上面的注釋,還是比較清晰的。
3.reset方法@Override public void reset(URL url) { if (url == null) { return; } try { // 重置accepts的值 if (url.hasParameter(Constants.ACCEPTS_KEY)) { int a = url.getParameter(Constants.ACCEPTS_KEY, 0); if (a > 0) { this.accepts = a; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { // 重置idle.timeout的值 if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) { int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0); if (t > 0) { this.idleTimeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { // 重置線程數配置 if (url.hasParameter(Constants.THREADS_KEY) && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; // 獲得url配置中的線程數 int threads = url.getParameter(Constants.THREADS_KEY, 0); // 獲得線程池允許的最大線程數 int max = threadPoolExecutor.getMaximumPoolSize(); // 返回核心線程數 int core = threadPoolExecutor.getCorePoolSize(); // 設置最大線程數和核心線程數 if (threads > 0 && (threads != max || threads != core)) { if (threads < core) { // 如果設置的線程數比核心線程數少,則直接設置核心線程數 threadPoolExecutor.setCorePoolSize(threads); if (core == max) { // 當核心線程數和最大線程數相等的時候,把最大線程數也重置 threadPoolExecutor.setMaximumPoolSize(threads); } } else { // 當大于核心線程數時,直接設置最大線程數 threadPoolExecutor.setMaximumPoolSize(threads); // 只有當核心線程數和最大線程數相等的時候才設置核心線程數 if (core == max) { threadPoolExecutor.setCorePoolSize(threads); } } } } } catch (Throwable t) { logger.error(t.getMessage(), t); } // 重置url super.setUrl(getUrl().addParameters(url.getParameters())); }
該類中的reset方法做了三個值的重置,分別是最大可連接的客戶端數量、空閑超時時間以及線程池的兩個配置參數。其中要注意核心線程數和最大線程數的區別。舉個例子,核心線程數就像是工廠正式工,最大線程數,就是工廠臨時工作量加大,請了一批臨時工,臨時工加正式工的和就是最大線程數,等這批任務結束后,臨時工要辭退的,而正式工會留下。
還有send、close、connected、disconnected等方法比較簡單,如果有興趣,可以到我的GitHub查看,地址文章末尾會給出。
(四)AbstractClient該類是客戶端的抽象類,繼承了AbstractEndpoint類,實現了Client接口,該類中也是做了客戶端公用的重連邏輯,抽象了打開客戶端、關閉客戶端、連接服務器、斷開服務器連接以及獲得通道方法,讓子類去重點關注這幾個方法。
1.屬性/** * 客戶端線程名稱 */ protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler"; private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); /** * 線程池id */ private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger(); /** * 重連定時任務執行器 */ private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true)); /** * 連接鎖 */ private final Lock connectLock = new ReentrantLock(); /** * 發送消息時,若斷開,是否重連 */ private final boolean send_reconnect; /** * 重連次數 */ private final AtomicInteger reconnect_count = new AtomicInteger(0); /** * 在這之前是否調用重新連接的錯誤日志 */ // Reconnection error log has been called before? private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false); /** * 重連 warning 的間隔.(waring多少次之后,warning一次),也就是錯誤多少次后告警一次錯誤 */ // reconnect warning period. Reconnect warning interval (log warning after how many times) //for test private final int reconnect_warning_period; /** * 關閉超時時間 */ private final long shutdown_timeout; /** * 線程池 */ protected volatile ExecutorService executor; /** * 重連執行任務 */ private volatile ScheduledFuture> reconnectExecutorFuture = null; // the last successed connected time /** * 最后成功連接的時間 */ private long lastConnectedTime = System.currentTimeMillis();
上述屬性大部分跟重連有關,該類最重要的也是封裝了重連的邏輯。
2.構造函數public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); // 從url中獲得是否重連的配置,默認為false send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); // 從url中獲得關閉超時時間,默認為900s shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); // The default reconnection interval is 2s, 1800 means warning interval is 1 hour. // 重連的默認值是2s,重連 warning 的間隔默認是1800,當出錯的時候,每隔1800*2=3600s報警一次 reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); try { // 打開客戶端 doOpen(); } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } try { // connect. // 連接服務器 connect(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); } } catch (RemotingException t) { if (url.getParameter(Constants.CHECK_KEY, true)) { close(); throw t; } else { logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t); } } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } // 從緩存中獲得線程池 executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); // 清楚線程池緩存 ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); }
該構造函數中做了一些屬性值的設置,并且做了打開客戶端和連接服務器的操作。
3.wrapChannelHandlerprotected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) { // 加入線程名稱 url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME); // 設置使用的線程池類型 url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL); // 包裝 return ChannelHandlers.wrap(handler, url); }
該方法是包裝通道處理器,設置使用的線程池類型是可緩存線程池。
4.initConnectStatusCheckCommandprivate synchronized void initConnectStatusCheckCommand() { //reconnect=false to close reconnect int reconnect = getReconnectParam(getUrl()); // 有連接頻率的值,并且當前沒有連接任務 if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) { Runnable connectStatusCheckCommand = new Runnable() { @Override public void run() { try { if (!isConnected()) { // 重連 connect(); } else { // 記錄最后一次重連的時間 lastConnectedTime = System.currentTimeMillis(); } } catch (Throwable t) { String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl(); // wait registry sync provider list if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) { // 如果之前沒有打印過重連的誤日志 if (!reconnect_error_log_flag.get()) { reconnect_error_log_flag.set(true); // 打印日志 logger.error(errorMsg, t); return; } } // 如果到達一次重連日志告警周期,則打印告警日志 if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) { logger.warn(errorMsg, t); } } } }; // 開啟重連定時任務 reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); } }
該方法是初始化重連線程,其中做了重連失敗后的告警日志和錯誤日志打印策略。
5.reconnect@Override public void reconnect() throws RemotingException { disconnect(); connect(); }
多帶帶放該方法是因為這是該類關注的重點。實現了客戶端的重連邏輯。
6.其他connect、disconnect、close等方法都是調用了對應的抽象方法,而具體的邏輯需要看具體的子類如何去實現相關的抽象方法,這幾個方法邏輯比較簡單,我不在這里貼出源碼,有興趣可以看我的GitHub,地址文章末尾會給出。
(四)AbstractChannel該類是通道的抽象類,該類里面做的邏輯很簡單,具體的發送消息邏輯在它 的子類中實現。
@Override public void send(Object message, boolean sent) throws RemotingException { // 檢測通道是否關閉 if (isClosed()) { throw new RemotingException(this, "Failed to send message " + (message == null ? "" : message.getClass().getName()) + ":" + message + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress()); } }
可以看到send方法,其中只做了檢測通道是否關閉的狀態檢測,沒有實現具體的發送消息的邏輯。
(五)ChannelHandlerDelegate該類繼承了ChannelHandler,從它的名字可以看出是ChannelHandler的代表,它就是作為裝飾模式中的Component角色,后面講到的AbstractChannelHandlerDelegate作為裝飾模式中的Decorator角色。
public interface ChannelHandlerDelegate extends ChannelHandler { /** * 獲得通道 * @return */ ChannelHandler getHandler(); }(六)AbstractChannelHandlerDelegate
屬性:
protected ChannelHandler handler
該類實現了ChannelHandlerDelegate接口,并且有一個屬性是ChannelHandler,上述已經說到這是裝飾模式中的裝飾角色,其中的所有實現方法都直接調用被裝飾的handler屬性的方法。
(七)DecodeHandler該類為解碼處理器,繼承了AbstractChannelHandlerDelegate,對接收到的消息進行解碼,在父類處理接收消息的功能上疊加了解碼功能。
我們來看看received方法:
@Override public void received(Channel channel, Object message) throws RemotingException { // 如果是Decodeable類型的消息,則對整個消息解碼 if (message instanceof Decodeable) { decode(message); } // 如果是Request請求類型消息,則對請求中對請求數據解碼 if (message instanceof Request) { decode(((Request) message).getData()); } // 如果是Response返回類型的消息,則對返回消息中對結果進行解碼 if (message instanceof Response) { decode(((Response) message).getResult()); } // 繼續將消息委托給handler,繼續處理 handler.received(channel, message); }
可以看到做了三次判斷,根據消息的不同會對消息的不同數據做解碼。可以看到,這里用到裝飾模式后,在處理消息的前面做了解碼的處理,并且還能繼續委托給handler來處理消息,通過組合做到了功能的疊加。
private void decode(Object message) { // 如果消息類型是Decodeable,進一步調用Decodeable的decode來解碼 if (message != null && message instanceof Decodeable) { try { ((Decodeable) message).decode(); if (log.isDebugEnabled()) { log.debug("Decode decodeable message " + message.getClass().getName()); } } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Call Decodeable.decode failed: " + e.getMessage(), e); } } // ~ end of catch } // ~ end of if } // ~ end of method decode
可以看到這是解析消息的邏輯,當消息是Decodeable類型,還會繼續調用Decodeable的decode方法來進行解析。它的實現類后續會講解到。
(八)MultiMessageHandler該類是多消息處理器的抽象類。同樣繼承了AbstractChannelHandlerDelegate類,我們來看看它的received方法:
@SuppressWarnings("unchecked") @Override public void received(Channel channel, Object message) throws RemotingException { // 當消息為多消息時 循環交給handler處理接收到當消息 if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; for (Object obj : list) { handler.received(channel, obj); } } else { // 如果是單消息,就直接交給handler處理器 handler.received(channel, message); } }
邏輯很簡單,當消息是多消息類型時,也就是一次性接收到多條消息的情況,循環去處理消息,當消息是單消息時候,直接交給handler去處理。
(九)WrappedChannelHandler該類跟AbstractChannelHandlerDelegate的作用類似,都是裝飾模式中的裝飾角色,其中的所有實現方法都直接調用被裝飾的handler屬性的方法,該類是為了添加線程池的功能,它的子類都是去關心哪些消息是需要分發到線程池的,哪些消息直接由I / O線程執行,現在版本有四種場景,也就是它的四個子類,下面我一一描述。
public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; // 創建線程池 executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); // 設置組件的key String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } // 獲得dataStore實例 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); // 把線程池放到dataStore中緩存 dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }
可以看到構造方法除了屬性的填充以外,線程池是基于dubbo 的SPI Adaptive機制創建的,在dataStore中把線程池加進去, 該線程池就是AbstractClient 或 AbstractServer 從 DataStore 獲得的線程池。
public ExecutorService getExecutorService() { // 首先返回的不是共享線程池,是該類的線程池 ExecutorService cexecutor = executor; // 如果該類的線程池關閉或者為空,則返回的是共享線程池 if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } return cexecutor; }
該方法是獲得線程池的實例,不過該類里面有兩個線程池,還加入了一個共享線程池,共享線程池優先級較低。
(十)ExecutionChannelHandler該類繼承了WrappedChannelHandler,也是增強了功能,處理的是接收請求消息時,把請求消息分發到線程池,而除了請求消息以外,其他消息類型都直接通過I / O線程直接執行。
@Override public void received(Channel channel, Object message) throws RemotingException { // 獲得線程池實例 ExecutorService cexecutor = getExecutorService(); // 如果消息是request類型,才會分發到線程池,其他消息,如響應,連接,斷開連接,心跳將由I / O線程直接執行。 if (message instanceof Request) { try { // 把請求消息分發到線程池 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly, // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent // this scenario from happening, but a better solution should be considered later. // 當線程池滿了,SERVER_THREADPOOL_EXHAUSTED_ERROR錯誤無法正常返回 // 因此消費者方必須等到超時。這是一種預防的臨時解決方案,所以這里直接返回該錯誤 if (t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event.", t); } } else { // 如果消息不是request類型,則直接處理 handler.received(channel, message); } }
上述就可以都看到對于請求消息的處理,其中有個打補丁的方式是當線程池滿了的時候,消費者只能等待請求超時,所以這里直接返回線程池滿的錯誤。
(十一)AllChannelHandler該類也繼承了WrappedChannelHandler,也是為了增強功能,處理的是連接、斷開連接、捕獲異常以及接收到的所有消息都分發到線程池。
@Override public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { // 把連接操作分發到線程池處理 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { // 把斷開連接操作分發到線程池處理 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { // 把所有消息分發到線程池處理 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out // 這里處理線程池滿的問題,只有在請求時候會出現。 //復線程池已滿,拒絕調用,不返回,并導致使用者等待超時 if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { // 把捕獲異常作分發到線程池處理 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } }
可以看到,所有操作以及消息都分到到線程池中。并且注意操作不同,傳入的狀態也不同。
(十二)ConnectionOrderedChannelHandler該類也是繼承了WrappedChannelHandler,增強功能,該類是把連接、取消連接以及接收到的消息都分發到線程池,但是不同的是,該類自己創建了一個跟連接相關的線程池,把連接操作和斷開連接操分發到該線程池,而接收到的消息則分發到WrappedChannelHandler的線程池中。來看看具體的實現。
/** * 連接線程池 */ protected final ThreadPoolExecutor connectionExecutor; /** * 連接隊列大小限制 */ private final int queuewarninglimit; public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); // 獲得線程名,默認是Dubbo String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 創建連接線程池 connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME There"s no place to release connectionExecutor! // 設置工作隊列限制,默認是1000 queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); }
可以屬性中有一個連接線程池,看到在構造函數里創建了該線程池,而queuewarninglimit是用來限制連接線程池的工作隊列長度,比較簡單。來看看連接和斷開連接到邏輯。
@Override public void connected(Channel channel) throws RemotingException { try { // 核對工作隊列長度 checkQueueLength(); // 分發連接操作 connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { try { // 核對工作隊列長度 checkQueueLength(); // 分發斷開連接操作 connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t); } }
可以看到,這兩個操作都是分發到連接線程池connectionExecutor中,和AllChannelHandle類r中的分發的線程池不是同一個。而ConnectionOrderedChannelHandler的received方法跟AllChannelHandle一樣,我就不貼出來。
(十三)MessageOnlyChannelHandler該類也是繼承了WrappedChannelHandler,是WrappedChannelHandler的最后一個子類,也是增強功能,不過該類只是處理了所有的消息分發到線程池。可以看到源碼,比較簡單:
@Override public void received(Channel channel, Object message) throws RemotingException { // 獲得線程池實例 ExecutorService cexecutor = getExecutorService(); try { // 把消息分發到線程池 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
下面我講講解五種線程池的調度策略,也就是我在《dubbo源碼解析(八)遠程通信——開篇》中提到的Dispatcher接口的五種實現,分別是AllDispatcher、DirectDispatcher、MessageOnlyDispatcher、ExecutionDispatcher、ConnectionOrderedDispatcher。
(十四)AllDispatcherpublic class AllDispatcher implements Dispatcher { public static final String NAME = "all"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { // 線程池調度方法:任何消息以及操作都分發到線程池中 return new AllChannelHandler(handler, url); } }
對照著上述講到的AllChannelHandler,是不是很清晰這種線程池的調度方法。并且該調度方法是默認的調度方法。
(十五)ConnectionOrderedDispatcherpublic class ConnectionOrderedDispatcher implements Dispatcher { public static final String NAME = "connection"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { // 線程池調度方法:連接、斷開連接分發到到線程池和其他消息分發到線程池不是同一個 return new ConnectionOrderedChannelHandler(handler, url); } }
對照上述講到的ConnectionOrderedChannelHandler,也很清晰該線程池調度方法。
(十六)DirectDispatcherpublic class DirectDispatcher implements Dispatcher { public static final String NAME = "direct"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { // 直接處理消息,不分發到線程池 return handler; } }
該線程池調度方法是不調度線程池,直接執行。
(十七)ExecutionDispatcherpublic class ExecutionDispatcher implements Dispatcher { public static final String NAME = "execution"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { // 線程池調度方法:只有請求消息分發到線程池,其他都直接執行 return new ExecutionChannelHandler(handler, url); } }
對照著上述的ExecutionChannelHandler講解,也可以很清晰的看出該線程池調度策略。
(十八)MessageOnlyDispatcherpublic class MessageOnlyDispatcher implements Dispatcher { public static final String NAME = "message"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { // 只要是接收到的消息,都分發到線程池 return new MessageOnlyChannelHandler(handler, url); } }
對照著上述講到的MessageOnlyChannelHandler,可以很清晰該線程池調度策略。
(十九)ChannelHandlers該類是通道處理器工廠,會對傳入的handler進行一次包裝,無論是Client還是Server都會做這樣的處理,也就是做了一些功能上的增強,就像上述我說到的裝飾模式中的那些功能。
我們來看看源碼:
public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { // 調用了多消息處理器,對心跳消息進行了功能加強 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
最關鍵的是這兩個方法,看第二個方法,其實就是包裝了MultiMessageHandler功能,增加了多消息處理的功能,以及對心跳消息做了功能增強。
(二十)AbstractCodec實現 Codec2 接口,,其中實現了一些編解碼的公共邏輯。
1.checkPayloadprotected static void checkPayload(Channel channel, long size) throws IOException { // 默認長度 int payload = Constants.DEFAULT_PAYLOAD; if (channel != null && channel.getUrl() != null) { // 優先從url中獲得消息長度配置,如果沒有則用默認長度 payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD); } // 如果消息長度過長,則報錯 if (payload > 0 && size > payload) { ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel); logger.error(e); throw e; } }
該方法是檢驗消息長度。
2.getSerializationprotected Serialization getSerialization(Channel channel) { return CodecSupport.getSerialization(channel.getUrl()); }
該方法是獲得序列化對象。
3.isClientSideprotected boolean isClientSide(Channel channel) { // 獲得是side對應的value String side = (String) channel.getAttribute(Constants.SIDE_KEY); if ("client".equals(side)) { return true; } else if ("server".equals(side)) { return false; } else { InetSocketAddress address = channel.getRemoteAddress(); URL url = channel.getUrl(); // 判斷url的主機地址是否和遠程地址一樣,如果是,則判斷為client,如果不是,則判斷為server boolean client = url.getPort() == address.getPort() && NetUtils.filterLocalHost(url.getIp()).equals( NetUtils.filterLocalHost(address.getAddress() .getHostAddress())); // 把value設置進去 channel.setAttribute(Constants.SIDE_KEY, client ? "client" : "server"); return client; } }
該方法是判斷是否為客戶端側的通道。
4.isServerSideprotected boolean isServerSide(Channel channel) { return !isClientSide(channel); }
該方法是判斷是否為服務端側的通道。
(二十一)TransportCodec該類是傳輸編解碼器,使用 Serialization 進行序列化/反序列化,直接編解碼。關于序列化為會在后續文章中介紹。
@Override public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException { // 獲得序列化的 ObjectOutput 對象 OutputStream output = new ChannelBufferOutputStream(buffer); ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output); // 寫入 ObjectOutput encodeData(channel, objectOutput, message); objectOutput.flushBuffer(); // 釋放 if (objectOutput instanceof Cleanable) { ((Cleanable) objectOutput).cleanup(); } } @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { // 獲得反序列化的 ObjectInput 對象 InputStream input = new ChannelBufferInputStream(buffer); ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input); // 讀取 ObjectInput Object object = decodeData(channel, objectInput); // 釋放 if (objectInput instanceof Cleanable) { ((Cleanable) objectInput).cleanup(); } return object; }
該類關鍵方法就是編碼和解碼,比較好理解,直接進行了序列化和反序列化。
(二十二)CodecAdapter該類是Codec 的適配器,用到了適配器模式,把Codec適配成Codec2。將Codec的編碼和解碼方法都適配成Codec2。比如很多時候都只能用Codec2的編解碼器,但是有的時候需要用Codec,但是不能滿足導致只能加入適配器來完成使用。
@Override public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException { UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(1024); // 調用舊的編解碼器的編碼 codec.encode(channel, os, message); buffer.writeBytes(os.toByteArray()); } @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { byte[] bytes = new byte[buffer.readableBytes()]; int savedReaderIndex = buffer.readerIndex(); buffer.readBytes(bytes); UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(bytes); // 調用舊的編解碼器的解碼 Object result = codec.decode(channel, is); buffer.readerIndex(savedReaderIndex + is.position()); return result == Codec.NEED_MORE_INPUT ? DecodeResult.NEED_MORE_INPUT : result; }
可以看到,在編碼和解碼的方法中都調用了codec的方法。
(二十三)ChannelDelegate、ServerDelegate、ClientDelegateChannelDelegate實現類Channel,ServerDelegate實現了Server,ClientDelegate實現了Client,都用到了裝飾模式,都作為裝飾模式中的裝飾角色,所以類中的所有實現方法都調用了屬性的方法。具體代碼就不貼了,朋友們可以自行查看。
(二十四)ChannelHandlerAdapter該類實現了ChannelHandler接口,是通道處理器適配類,該類中所有實現方法都是空的,所有想實現ChannelHandler接口的類可以直接繼承該類,選擇需要實現的方法進行實現,不需要實現ChannelHandler接口中所有方法。
(二十五)ChannelHandlerDispatcher該類是通道處理器調度器,其中緩存了所有通道處理器,有一個通道處理器集合。并且每個操作都會去遍歷該集合,執行相應的操作,例如:
@Override public void connected(Channel channel) { // 遍歷通道處理器集合 for (ChannelHandler listener : channelHandlers) { try { // 連接 listener.connected(channel); } catch (Throwable t) { logger.error(t.getMessage(), t); } } }(二十六)CodecSupport
該類是編解碼工具類,提供查詢 Serialization 的功能。
/** * 序列化對象集合 key為序列化類型編號 */ private static MapID_SERIALIZATION_MAP = new HashMap (); /** * 序列化擴展名集合 key為序列化類型編號 value為序列化擴展名 */ private static Map ID_SERIALIZATIONNAME_MAP = new HashMap (); static { // 利用dubbo 的SPI機制獲得序列化擴展名 Set supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions(); for (String name : supportedExtensions) { // 獲得相應擴展名的序列化實現 Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name); byte idByte = serialization.getContentTypeId(); if (ID_SERIALIZATION_MAP.containsKey(idByte)) { logger.error("Serialization extension " + serialization.getClass().getName() + " has duplicate id to Serialization extension " + ID_SERIALIZATION_MAP.get(idByte).getClass().getName() + ", ignore this Serialization extension"); continue; } // 緩存序列化實現 ID_SERIALIZATION_MAP.put(idByte, serialization); // 緩存序列化編號和擴展名 ID_SERIALIZATIONNAME_MAP.put(idByte, name); } }
可以看到該類中緩存了所有的序列化對象和序列化擴展名。可以從中拿到Serialization。
(二十七)ExceedPayloadLimitException該類是消息長度限制異常。
public class ExceedPayloadLimitException extends IOException { private static final long serialVersionUID = -1112322085391551410L; public ExceedPayloadLimitException(String message) { super(message); } }后記
該部分相關的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了Transport層的相關設計和邏輯、介紹dubbo-remoting-api中的transport包內的源碼解,其中關鍵的是整個設計都在使用裝飾模式,傳輸層中關鍵的編解碼器以及客戶端、服務的、通道的抽象,還有關鍵的就是線程池的調度方法,熟悉那五種調度方法,對消息的處理。整個傳輸層核心的消息,很多操作圍繞著消息展開。下一篇我會講解交換層exchange部分。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,我的私人微信號碼:HUA799695226。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72722.html
摘要:而存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發到的。 2.7大揭秘——服務端處理請求過程 目標:從源碼的角度分析服務端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費端發送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數據包后的一系列處理以及如何返回最終結果。我們也知道消費端在發送請求的時候已經做了編碼,所以我...
摘要:可以參考源碼解析二十四遠程調用協議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。 2.7大揭秘——消費端發送請求過程 目標:從源碼的角度分析一個服務方法調用經歷怎么樣的磨難以后到達服務端。 前言 前一篇文章講到的是引用服務的過程,引用服務無非就是創建出一個代理。供消費者調用服務的相關方法。...
摘要:而編碼器是講應用程序的數據轉化為網絡格式,解碼器則是講網絡格式轉化為應用程序,同時具備這兩種功能的單一組件就叫編解碼器。在中是老的編解碼器接口,而是新的編解碼器接口,并且已經用把適配成了。 遠程通訊——開篇 目標:介紹之后解讀遠程通訊模塊的內容如何編排、介紹dubbo-remoting-api中的包結構設計以及最外層的的源碼解析。 前言 服務治理框架中可以大致分為服務通信和服務管理兩個...
摘要:和斷開,處理措施不一樣,會分別做出重連和關閉通道的操作。取消定時器取消大量已排隊任務,用于回收空間該方法是停止現有心跳,也就是停止定時器,釋放空間。做到異步處理返回結果時能給準確的返回給對應的請求。 遠程通訊——Exchange層 目標:介紹Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的源碼解析。 前言 上一篇文章我講的是dubb...
摘要:服務暴露過程目標從源碼的角度分析服務暴露過程。導出服務,包含暴露服務到本地,和暴露服務到遠程兩個過程。其中服務暴露的第八步已經沒有了。將泛化調用版本號或者等信息加入獲得服務暴露地址和端口號,利用內數據組裝成。 dubbo服務暴露過程 目標:從源碼的角度分析服務暴露過程。 前言 本來這一篇一個寫異步化改造的內容,但是最近我一直在想,某一部分的優化改造該怎么去撰寫才能更加的讓讀者理解。我覺...
閱讀 2580·2021-11-24 09:38
閱讀 2612·2019-08-30 15:54
閱讀 926·2019-08-30 15:52
閱讀 1915·2019-08-30 15:44
閱讀 2721·2019-08-30 13:48
閱讀 776·2019-08-29 16:21
閱讀 1006·2019-08-29 14:03
閱讀 2221·2019-08-28 18:15