国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo源碼解析——cluster

seal_de / 2174人閱讀

摘要:簡單來說就是應對出錯情況采取的策略。由于重試,重試次數過多時,帶來時延。通常用于實時性要求較高的讀操作,但需要浪費更多服務資源。通常用于通知所有提供者更新緩存或日志等本地資源信息。

我們再來回顧一下官網的對于集群容錯的架構設計圖

Cluster概述

將 Directory 中的多個 Invoker 偽裝成一個 Invoker(偽裝過程用到loadBalance),對上層透明,偽裝過程包含了容錯邏輯,調用失敗后,重試另一個。簡單來說,就是應對出錯情況采取的策略。看看這個接口:

該接口有9個實現類,換個角度來說,就是有9中應對策略,本文介紹幾個比較常用的策略

FailoverCluster

失敗自動切換,當調用遠程服務失敗時,自動選擇其他服務進行調用。可以通過retries設置重試次數。由于重試,重試次數過多時,帶來時延。

/**
 * Failover
 * 當invoker調用失敗,打印錯誤日志,并且重試其他invoker
 * 重試將導致時延
 */
public class FailoverClusterInvoker extends AbstractClusterInvoker {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
        // 局部引用
        List> copyinvokers = invokers;

        // 參數校驗
        checkInvokers(copyinvokers, invocation);

        // 獲取方法名稱
        String methodName = RpcUtils.getMethodName(invocation);

        // 獲取重試次數
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            // 最少要調用1次
            len = 1;
        }

        // 局部引用
        RpcException le = null;
        // 已經調用過的invoker列表
        List> invoked = new ArrayList>(copyinvokers.size());
        // 調用失敗的invoker地址
        Set providers = new HashSet(len);

        // i < len 作為循環條件,說明len是多少就循環多少次(len等于 重試次數 + 1)
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                // 檢查invoker是否被銷毀
                checkWhetherDestroyed();
                // 重新選擇invoker(在重試之前,需要重新選擇,以避免候選invoker的改變)
                copyinvokers = list(invocation);
                // 參數檢查
                checkInvokers(copyinvokers, invocation);
            }

            /*
             * 這一步就是進入loadBalance負載均衡
             * 因為上述步驟可能篩選出invoker數量大于1,所以再次經過loadBalance的篩選(同時避免獲取到已經調用過的invoker)
             */
            Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);

            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 遠程方法調用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }

                // 正常執行,直接返回結果。否則,如果還有重試次數,則繼續重試
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }

        // 能到這里,說明都失敗了,providers保存失敗的invoker地址
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

}
MergeableCluster

這個主要用在分組聚合中,我們來看一下官網的介紹

按組合并返回結果 ,比如菜單服務,接口一樣,但有多種實現,用group區分,現在消費方需從每種group中調用一次返回結果,合并結果返回,這樣就可以實現聚合菜單項。

下面補充一下使用方法(網上基本沒有使用方法的教程,樓主才疏學淺,花了幾個小時才摸索出來):
(1)consumer側,提供合并merge方法
這里有幾個步驟:
a、在resources目錄下,新建META-INF及dubbo,新建文本com.alibaba.dubbo.rpc.cluster.Merger
b、映射自定義的merger名稱到相應的實現類,如:
myMerger=com.patty.dubbo.consumer.service.MyMerger
c、實現合并函數,需要實現Merger接口,如下:

public class MyMerger implements Merger {

    @Override
    public ModelResult merge(ModelResult... items) {

        ModelResult result = new ModelResult();
        for (ModelResult item : items) {
            // 進行數據合并操作
            result.setResult((String)result.getResult() + (String) item.getResult());
        }

        return result;
    }
}

(2)將reference的cluster屬性設置為"mergeable",group設置為“*”,并且設置合并方法,如下:


        
    

(3)同一套代碼,分別利用不同的group,把服務發布到注冊中心上面。例如:/group1/com.huangyuan.demoService 及 /group2/com.huangyuan.demoService

(3)接下來就可以直接使用了,這邊測試得到結果:(這里合并只是簡單連接字符串)

接下來再看下源碼:

public Result invoke(final Invocation invocation) throws RpcException {
        // 獲取Directory中的invoker
        List> invokers = directory.list(invocation);

        // 獲取合并方法的名稱
        String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
        if (ConfigUtils.isEmpty(merger)) {
            for (final Invoker invoker : invokers) {
                // 如果沒有合并方法,只調動其中一個分組
                if (invoker.isAvailable()) {
                    return invoker.invoke(invocation);
                }
            }
            return invokers.iterator().next().invoke(invocation);
        }

        // 獲取返回值類型
        Class returnType;
        try {
            returnType = getInterface().getMethod(
                    invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
        } catch (NoSuchMethodException e) {
            returnType = null;
        }

        Map> results = new HashMap>();
        for (final Invoker invoker : invokers) {
            Future future = executor.submit(new Callable() {
                @Override
                public Result call() throws Exception {
                    return invoker.invoke(new RpcInvocation(invocation, invoker));
                }
            });
            // 保留future(未真正執行遠程調用)
            results.put(invoker.getUrl().getServiceKey(), future);
        }

        Object result = null;

        // 結果列表
        List resultList = new ArrayList(results.size());

        // 超時時間
        int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

        //
        for (Map.Entry> entry : results.entrySet()) {
            Future future = entry.getValue();
            try {
                // 執行遠程調用
                Result r = future.get(timeout, TimeUnit.MILLISECONDS);
                if (r.hasException()) {
                    log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + 
                                    " failed: " + r.getException().getMessage(), 
                            r.getException());
                } else {
                    resultList.add(r);
                }
            } catch (Exception e) {
                throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
            }
        }

        if (resultList.isEmpty()) {
            return new RpcResult((Object) null);
        } else if (resultList.size() == 1) {
            // 只有一個結果,直接返回了
            return resultList.iterator().next();
        }

        if (returnType == void.class) {
            return new RpcResult((Object) null);
        }

        if (merger.startsWith(".")) {
            /*
             * 配置的方法名稱,以"."開頭
             * 這種方式,入參固定只有一個,沒有達到合并的效果,不建議使用
             */
            merger = merger.substring(1);
            Method method;
            try {
                method = returnType.getMethod(merger, returnType);
            } catch (NoSuchMethodException e) {
                throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + 
                        returnType.getClass().getName() + " ]");
            }
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                if (method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException("Can not merge result: " + e.getMessage(), e);
            }
        } else {
            /*
             * 建議使用Merger擴展的方式
             */
            Merger resultMerger;
            if (ConfigUtils.isDefault(merger)) {
                resultMerger = MergerFactory.getMerger(returnType);
            } else {
                resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
            }
            if (resultMerger != null) {
                List rets = new ArrayList(resultList.size());
                for (Result r : resultList) {
                    rets.add(r.getValue());
                }
                result = resultMerger.merge(
                        rets.toArray((Object[]) Array.newInstance(returnType, 0)));
            } else {
                throw new RpcException("There is no merger to merge result.");
            }
        }
        return new RpcResult(result);
    }

PS:其實合并方法還有另外一個使用方式,使用".方法名稱",并且合并方法只能寫在結果類中,這種方式有一個很大的弊端,就是源碼中入參固定只有一個,所以達不到合并效果,故不推薦使用。


        
    
AvailableCluster
public class AvailableCluster implements Cluster {

    public static final String NAME = "available";

    @Override
    public  Invoker join(Directory directory) throws RpcException {

        return new AbstractClusterInvoker(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker invoker : invokers) {
                    if (invoker.isAvailable()) {
                        // 僅僅執行可只用的invoker
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };

    }

}

遍歷所有的Invokers判斷invoker.isAvalible,只要一個有為true直接調用返回,否則就拋出異常.

ForkingCluster

引用官網的介紹

并行調用多個服務器,只要一個成功即返回。通常用于實時性要求較高的讀操作,但需要浪費更多服務資源。可通過 forks="2" 來設置最大并行數。

我們來看看源碼的實現

FailfastCluster

快速失敗

Failfast可以理解為只發起一次調用,若失敗則立即報錯

通常用于非冪等寫操作

@Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        Invoker invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 成功直接往下執行
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 失敗拋出異常,不做別的處理
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
    }
BroadcastCluster

廣播調用

廣播調用所有提供者,逐個調用,任意一臺報錯則報錯。通常用于通知所有提供者更新緩存或日志等本地資源信息。

public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;

        for (Invoker invoker : invokers) {
            try {
                // 循環調用invoker
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        if (exception != null) {
            throw exception;
        }
        return result;
    }
FailbackClusterInvoker

失敗自動重試

當失敗了,記錄失敗的請求,按照一定的間隔定時重試

特別適用于通知服務

這個相對比較復雜,先了解一些基礎概念

Delayed

延遲接口,用于標記在給定延遲之后應該被作用的對象

ScheduledFuture

實現Delayed、Future接口,能夠獲取未來調度的結果

演示一些上面ScheduledFuture的用法

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author huangy on 2018/11/12
 */
public class ScheduledFutureTest {

    // 延遲調用,獲取未來調度結果的對象
    private volatile ScheduledFuture retryFuture;

    // 指定時間間隔 重發執行一次
    private static final long RETRY_FAILED_PERIOD = 1 * 1000;

    // ScheduledExecutorService的主要作用就是可以將定時任務與線程池功能結合使用
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

    public void func() {
        retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

            @Override
            public void run() {
                System.out.println("retry");
            }
            // 延遲第一次執行的時間    每次的延遲           時間單位(現在填的是毫秒)
        }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
    }

    public static void main(String[] args) {
        new ScheduledFutureTest().func();
    }
}

結果如下:

其實看完這個例子,再看failbackCluster就挺簡單了

public class FailbackClusterInvoker extends AbstractClusterInvoker {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    // 5s 重發一次
    private static final long RETRY_FAILED_PERIOD = 5 * 1000;

    /**
     * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
            new NamedInternalThreadFactory("failback-cluster-timer", true));

    // 保存需要重新執行的invoker
    private final ConcurrentMap> failed = new ConcurrentHashMap>();

    // 延遲調用,獲取未來調度結果的對象
    private volatile ScheduledFuture retryFuture;

    public FailbackClusterInvoker(Directory directory) {
        super(directory);
    }

    private void addFailed(Invocation invocation, AbstractClusterInvoker router) {
        if (retryFuture == null) {
            // 避免同時調度
            synchronized (this) {
                if (retryFuture == null) {
                    retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                        @Override
                        public void run() {
                            // collect retry statistics
                            try {
                                // 隔一段時間重新執行
                                retryFailed();
                            } catch (Throwable t) { // Defensive fault tolerance
                                logger.error("Unexpected error occur at collect statistic", t);
                            }
                        }
                    }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                }
            }
        }
        failed.put(invocation, router);
    }

    void retryFailed() {
        // 沒有需要重新執行的invoker
        if (failed.size() == 0) {
            return;
        }

        // 逐個調用之前失敗的invoker
        for (Map.Entry> entry : new HashMap>(
                failed).entrySet()) {
            Invocation invocation = entry.getKey();
            Invoker invoker = entry.getValue();
            try {
                invoker.invoke(invocation);
                failed.remove(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
            }
        }
    }

    @Override
    protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker invoker = select(loadbalance, invocation, invokers, null);
            // 正常執行,則直接返回
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                    + e.getMessage() + ", ", e);
            // 記錄失敗的請求
            addFailed(invocation, this);
            return new RpcResult(); // ignore
        }
    }

}
FailsafeCluster

調用實例失敗后,如果有報錯,則忽略掉異常,返回一個正常的空結果

@Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult(); // ignore
        }
    }

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72098.html

相關文章

  • dubbo源碼解析——消費過程

    摘要:上一篇源碼解析概要篇中我們了解到中的一些概念及消費端總體調用過程。由于在生成代理實例的時候,在構造函數中賦值了,因此可以只用該進行方法的調用。 上一篇 dubbo源碼解析——概要篇中我們了解到dubbo中的一些概念及消費端總體調用過程。本文中,將進入消費端源碼解析(具體邏輯會放到代碼的注釋中)。本文先是對消費過程的總體代碼邏輯理一遍,個別需要細講的點,后面會專門的文章進行解析。...

    darkbug 評論0 收藏0
  • dubbo源碼解析(一)Hello,Dubbo

    摘要:英文全名為,也叫遠程過程調用,其實就是一個計算機通信協議,它是一種通過網絡從遠程計算機程序上請求服務而不需要了解底層網絡技術的協議。 Hello,Dubbo 你好,dubbo,初次見面,我想和你交個朋友。 Dubbo你到底是什么? 先給出一套官方的說法:Apache Dubbo是一款高性能、輕量級基于Java的RPC開源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...

    evin2016 評論0 收藏0
  • dubbo源碼解析(四十五)服務引用過程

    摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...

    xiaowugui666 評論0 收藏0
  • dubbo源碼解析——概要篇

    摘要:服務提供者代碼上面這個類會被封裝成為一個實例,并新生成一個實例。這樣當網絡通訊層收到一個請求后,會找到對應的實例,并調用它所對應的實例,從而真正調用了服務提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進行源碼學習。總結起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現在我們在哪,我們下一...

    Meathill 評論0 收藏0
  • dubbo源碼解析(三十五)集群——cluster

    摘要:失敗安全,出現異常時,直接忽略。失敗自動恢復,在調用失敗后,返回一個空結果給服務提供者。源碼分析一該類實現了接口,是集群的抽象類。 集群——cluster 目標:介紹dubbo中集群容錯的幾種模式,介紹dubbo-cluster下support包的源碼。 前言 集群容錯還是很好理解的,就是當你調用失敗的時候所作出的措施。先來看看有哪些模式: showImg(https://segmen...

    gself 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<