国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo源碼解析(二十)遠程調用——Filter

cheukyin / 1145人閱讀

摘要:源碼分析一該過濾器是對記錄日志的過濾器,它所做的工作就是把引用服務或者暴露服務的調用鏈信息寫入到文件中。然后返回,再清空,這樣是因為后面的調用鏈中的附加值對前面的調用鏈是不可見的。

遠程調用——Filter
目標:介紹dubbo-rpc-api中的各種filter過濾器的實現邏輯。
前言

本文會介紹在dubbo中的過濾器,先來看看下面的圖:

可以看到紅色圈圈不服,在服務發現和服務引用中都會進行一些過濾器過濾。具體有哪些過濾器,就看下面的介紹。

源碼分析 (一)AccessLogFilter

該過濾器是對記錄日志的過濾器,它所做的工作就是把引用服務或者暴露服務的調用鏈信息寫入到文件中。日志消息先被放入日志集合,然后加入到日志隊列,然后被放入到寫入文件到任務中,最后進入文件。

1.屬性
private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class);

/**
 * 日志訪問名稱,默認的日志訪問名稱
 */
private static final String ACCESS_LOG_KEY = "dubbo.accesslog";

/**
 * 日期格式
 */
private static final String FILE_DATE_FORMAT = "yyyyMMdd";

private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

/**
 * 日志隊列大小
 */
private static final int LOG_MAX_BUFFER = 5000;

/**
 * 日志輸出的頻率
 */
private static final long LOG_OUTPUT_INTERVAL = 5000;

/**
 * 日志隊列 key為訪問日志的名稱,value為該日志名稱對應的日志集合
 */
private final ConcurrentMap> logQueue = new ConcurrentHashMap>();

/**
 * 日志線程池
 */
private final ScheduledExecutorService logScheduled = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Dubbo-Access-Log", true));

/**
 * 日志記錄任務
 */
private volatile ScheduledFuture logFuture = null;

按照我上面講到日志流向,日志先進入到是日志隊列中的日志集合,再進入logQueue,在進入logFuture,最后落地到文件。

2.init
private void init() {
    // synchronized是一個重操作消耗性能,所有加上判空
    if (logFuture == null) {
        synchronized (logScheduled) {
            // 為了不重復初始化
            if (logFuture == null) {
                // 創建日志記錄任務
                logFuture = logScheduled.scheduleWithFixedDelay(new LogTask(), LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS);
            }
        }
    }
}

該方法是初始化方法,就創建了日志記錄任務。

3.log
private void log(String accesslog, String logmessage) {
    init();
    Set logSet = logQueue.get(accesslog);
    if (logSet == null) {
        logQueue.putIfAbsent(accesslog, new ConcurrentHashSet());
        logSet = logQueue.get(accesslog);
    }
    if (logSet.size() < LOG_MAX_BUFFER) {
        logSet.add(logmessage);
    }
}

該方法是增加日志信息到日志集合中。

4.invoke
@Override
public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    try {
        // 獲得日志名稱
        String accesslog = invoker.getUrl().getParameter(Constants.ACCESS_LOG_KEY);
        if (ConfigUtils.isNotEmpty(accesslog)) {
            // 獲得rpc上下文
            RpcContext context = RpcContext.getContext();
            // 獲得調用的接口名稱
            String serviceName = invoker.getInterface().getName();
            // 獲得版本號
            String version = invoker.getUrl().getParameter(Constants.VERSION_KEY);
            // 獲得組,是消費者側還是生產者側
            String group = invoker.getUrl().getParameter(Constants.GROUP_KEY);
            StringBuilder sn = new StringBuilder();
            sn.append("[").append(new SimpleDateFormat(MESSAGE_DATE_FORMAT).format(new Date())).append("] ").append(context.getRemoteHost()).append(":").append(context.getRemotePort())
                    .append(" -> ").append(context.getLocalHost()).append(":").append(context.getLocalPort())
                    .append(" - ");
            // 拼接組
            if (null != group && group.length() > 0) {
                sn.append(group).append("/");
            }
            // 拼接服務名稱
            sn.append(serviceName);
            // 拼接版本號
            if (null != version && version.length() > 0) {
                sn.append(":").append(version);
            }
            sn.append(" ");
            // 拼接方法名
            sn.append(inv.getMethodName());
            sn.append("(");
            // 拼接參數類型
            Class[] types = inv.getParameterTypes();
            // 拼接參數類型
            if (types != null && types.length > 0) {
                boolean first = true;
                for (Class type : types) {
                    if (first) {
                        first = false;
                    } else {
                        sn.append(",");
                    }
                    sn.append(type.getName());
                }
            }
            sn.append(") ");
            // 拼接參數
            Object[] args = inv.getArguments();
            if (args != null && args.length > 0) {
                sn.append(JSON.toJSONString(args));
            }
            String msg = sn.toString();
            // 如果用默認的日志訪問名稱
            if (ConfigUtils.isDefault(accesslog)) {
                LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + invoker.getInterface().getName()).info(msg);
            } else {
                // 把日志加入集合
                log(accesslog, msg);
            }
        }
    } catch (Throwable t) {
        logger.warn("Exception in AcessLogFilter of service(" + invoker + " -> " + inv + ")", t);
    }
    // 調用下一個調用鏈
    return invoker.invoke(inv);
}

該方法是最重要的方法,從拼接了日志信息,把日志加入到集合,并且調用下一個調用鏈。

4.LogTask
private class LogTask implements Runnable {
    @Override
    public void run() {
        try {
            if (logQueue != null && logQueue.size() > 0) {
                // 遍歷日志隊列
                for (Map.Entry> entry : logQueue.entrySet()) {
                    try {
                        // 獲得日志名稱
                        String accesslog = entry.getKey();
                        // 獲得日志集合
                        Set logSet = entry.getValue();
                        // 如果文件不存在則創建文件
                        File file = new File(accesslog);
                        File dir = file.getParentFile();
                        if (null != dir && !dir.exists()) {
                            dir.mkdirs();
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("Append log to " + accesslog);
                        }
                        if (file.exists()) {
                            // 獲得現在的時間
                            String now = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date());
                            // 獲得文件最后一次修改的時間
                            String last = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date(file.lastModified()));
                            // 如果文件最后一次修改的時間不等于現在的時間
                            if (!now.equals(last)) {
                                // 獲得重新生成文件名稱
                                File archive = new File(file.getAbsolutePath() + "." + last);
                                // 因為都是file的絕對路徑,所以沒有進行移動文件,而是修改文件名
                                file.renameTo(archive);
                            }
                        }
                        // 把日志集合中的日志寫入到文件
                        FileWriter writer = new FileWriter(file, true);
                        try {
                            for (Iterator iterator = logSet.iterator();
                                 iterator.hasNext();
                                 iterator.remove()) {
                                writer.write(iterator.next());
                                writer.write("
");
                            }
                            writer.flush();
                        } finally {
                            writer.close();
                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}

該內部類實現了Runnable,是把日志消息落地到文件到線程。

(二)ActiveLimitFilter

該類時對于每個服務的每個方法的最大可并行調用數量限制的過濾器,它是在服務消費者側的過濾。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得url對象
    URL url = invoker.getUrl();
    // 獲得方法名稱
    String methodName = invocation.getMethodName();
    // 獲得并發調用數(單個服務的單個方法),默認為0
    int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
    // 通過方法名來獲得對應的狀態
    RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
    if (max > 0) {
        // 獲得該方法調用的超時次數
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
        // 獲得系統時間
        long start = System.currentTimeMillis();
        long remain = timeout;
        // 獲得該方法的調用數量
        int active = count.getActive();
        // 如果活躍數量大于等于最大的并發調用數量
        if (active >= max) {
            synchronized (count) {
                // 當活躍數量大于等于最大的并發調用數量時一直循環
                while ((active = count.getActive()) >= max) {
                    try {
                        // 等待超時時間
                        count.wait(remain);
                    } catch (InterruptedException e) {
                    }
                    // 獲得累計時間
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    // 如果累計時間大于超時時間,則拋出異常
                    if (remain <= 0) {
                        throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                + invoker.getInterface().getName() + ", method: "
                                + invocation.getMethodName() + ", elapsed: " + elapsed
                                + ", timeout: " + timeout + ". concurrent invokes: " + active
                                + ". max concurrent invoke limit: " + max);
                    }
                }
            }
        }
    }
    try {
        // 獲得系統時間作為開始時間
        long begin = System.currentTimeMillis();
        // 開始計數
        RpcStatus.beginCount(url, methodName);
        try {
            // 調用后面的調用鏈,如果沒有拋出異常,則算成功
            Result result = invoker.invoke(invocation);
            // 結束計數,記錄時間
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
            return result;
        } catch (RuntimeException t) {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
            throw t;
        }
    } finally {
        if (max > 0) {
            synchronized (count) {
                // 喚醒count
                count.notify();
            }
        }
    }
}

該類只有這一個方法。該過濾器是用來限制調用數量,先進行調用數量的檢測,如果沒有到達最大的調用數量,則先調用后面的調用鏈,如果在后面的調用鏈失敗,則記錄相關時間,如果成功也記錄相關時間和調用次數。

(三)ClassLoaderFilter

該過濾器是做類加載器切換的。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得當前的類加載器
    ClassLoader ocl = Thread.currentThread().getContextClassLoader();
    // 設置invoker攜帶的服務的類加載器
    Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
    try {
        // 調用下面的調用鏈
        return invoker.invoke(invocation);
    } finally {
        // 最后切換回原來的類加載器
        Thread.currentThread().setContextClassLoader(ocl);
    }
}

可以看到先切換成當前的線程鎖攜帶的類加載器,然后調用結束后,再切換回原先的類加載器。

(四)CompatibleFilter

該過濾器是做兼容性的過濾器。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 調用下一個調用鏈
    Result result = invoker.invoke(invocation);
    // 如果方法前面沒有$或者結果沒有異常
    if (!invocation.getMethodName().startsWith("$") && !result.hasException()) {
        Object value = result.getValue();
        if (value != null) {
            try {
                // 獲得方法
                Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                // 獲得返回的數據類型
                Class type = method.getReturnType();
                Object newValue;
                // 序列化方法
                String serialization = invoker.getUrl().getParameter(Constants.SERIALIZATION_KEY);
                // 如果是json或者fastjson形式
                if ("json".equals(serialization)
                        || "fastjson".equals(serialization)) {
                    // 獲得方法的泛型返回值類型
                    Type gtype = method.getGenericReturnType();
                    // 把數據結果進行類型轉化
                    newValue = PojoUtils.realize(value, type, gtype);
                    // 如果value不是type類型
                } else if (!type.isInstance(value)) {
                    // 如果是pojo,則,轉化為type類型,如果不是,則進行兼容類型轉化。
                    newValue = PojoUtils.isPojo(type)
                            ? PojoUtils.realize(value, type)
                            : CompatibleTypeUtils.compatibleTypeConvert(value, type);

                } else {
                    newValue = value;
                }
                // 重新設置RpcResult的result
                if (newValue != value) {
                    result = new RpcResult(newValue);
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    return result;
}

可以看到對于調用鏈的返回結果,如果返回值類型和返回值不一樣的時候,就需要做兼容類型的轉化。重新把結果放入RpcResult,返回。

(五)ConsumerContextFilter

該過濾器做的是在當前的RpcContext中記錄本地調用的一次狀態信息。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 設置rpc上下文
    RpcContext.getContext()
            .setInvoker(invoker)
            .setInvocation(invocation)
            .setLocalAddress(NetUtils.getLocalHost(), 0)
            .setRemoteAddress(invoker.getUrl().getHost(),
                    invoker.getUrl().getPort());
    // 如果該會話域是rpc會話域
    if (invocation instanceof RpcInvocation) {
        // 設置實體域
        ((RpcInvocation) invocation).setInvoker(invoker);
    }
    try {
        // 調用下個調用鏈
        RpcResult result = (RpcResult) invoker.invoke(invocation);
        // 設置附加值
        RpcContext.getServerContext().setAttachments(result.getAttachments());
        return result;
    } finally {
        // 情況附加值
        RpcContext.getContext().clearAttachments();
    }
}

可以看到RpcContext記錄了一次調用狀態信息,然后先調用后面的調用鏈,再回來把附加值設置到RpcContext中。然后返回RpcContext,再清空,這樣是因為后面的調用鏈中的附加值對前面的調用鏈是不可見的。

(六)ContextFilter

該過濾器做的是初始化rpc上下文。

    @Override
    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
        // 獲得會話域的附加值
        Map attachments = invocation.getAttachments();
        // 刪除異步屬性以避免傳遞給以下調用鏈
        if (attachments != null) {
            attachments = new HashMap(attachments);
            attachments.remove(Constants.PATH_KEY);
            attachments.remove(Constants.GROUP_KEY);
            attachments.remove(Constants.VERSION_KEY);
            attachments.remove(Constants.DUBBO_VERSION_KEY);
            attachments.remove(Constants.TOKEN_KEY);
            attachments.remove(Constants.TIMEOUT_KEY);
            attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
        }
        // 在rpc上下文添加上一個調用鏈的信息
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
//                .setAttachments(attachments)  // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());

        // mreged from dubbox
        // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
        if (attachments != null) {
            // 把會話域中的附加值全部加入RpcContext中
            if (RpcContext.getContext().getAttachments() != null) {
                RpcContext.getContext().getAttachments().putAll(attachments);
            } else {
                RpcContext.getContext().setAttachments(attachments);
            }
        }

        // 如果會話域屬于rpc的會話域,則設置實體域
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            // 調用下一個調用鏈
            RpcResult result = (RpcResult) invoker.invoke(invocation);
            // pass attachments to result 把附加值加入到RpcResult
            result.addAttachments(RpcContext.getServerContext().getAttachments());
            return result;
        } finally {
            // 移除本地的上下文
            RpcContext.removeContext();
            // 清空附加值
            RpcContext.getServerContext().clearAttachments();
        }
    }

在《 dubbo源碼解析(十九)遠程調用——開篇》中我已經介紹了RpcContext的作用,角色。該過濾器就是做了初始化RpcContext的作用。

(七)DeprecatedFilter

該過濾器的作用是調用了廢棄的方法時打印錯誤日志。

private static final Logger LOGGER = LoggerFactory.getLogger(DeprecatedFilter.class);

/**
 * 日志集合
 */
private static final Set logged = new ConcurrentHashSet();

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得key 服務+方法
    String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
    // 如果集合中沒有該key
    if (!logged.contains(key)) {
        // 則加入集合
        logged.add(key);
        // 如果該服務方法是廢棄的,則打印錯誤日志
        if (invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.DEPRECATED_KEY, false)) {
            LOGGER.error("The service method " + invoker.getInterface().getName() + "." + getMethodSignature(invocation) + " is DEPRECATED! Declare from " + invoker.getUrl());
        }
    }
    // 調用下一個調用鏈
    return invoker.invoke(invocation);
}

/**
 * 獲得方法定義
 * @param invocation
 * @return
 */
private String getMethodSignature(Invocation invocation) {
    // 方法名
    StringBuilder buf = new StringBuilder(invocation.getMethodName());
    buf.append("(");
    // 參數類型
    Class[] types = invocation.getParameterTypes();
    // 拼接參數
    if (types != null && types.length > 0) {
        boolean first = true;
        for (Class type : types) {
            if (first) {
                first = false;
            } else {
                buf.append(", ");
            }
            buf.append(type.getSimpleName());
        }
    }
    buf.append(")");
    return buf.toString();
}

該過濾器比較簡單。

(八)EchoFilter

該過濾器是處理回聲測試的方法。

@Override
public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果調用的方法是回聲測試的方法 則直接返回結果,否則 調用下一個調用鏈
    if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
        return new RpcResult(inv.getArguments()[0]);
    return invoker.invoke(inv);
}

如果調用的方法是回聲測試的方法 則直接返回結果,否則 調用下一個調用鏈。

(九)ExceptionFilter

該過濾器是作用是對異常的處理。

private final Logger logger;

public ExceptionFilter() {
    this(LoggerFactory.getLogger(ExceptionFilter.class));
}

public ExceptionFilter(Logger logger) {
    this.logger = logger;
}

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    try {
        // 調用下一個調用鏈,返回結果
        Result result = invoker.invoke(invocation);
        // 如果結果有異常,并且該服務不是一個泛化調用
        if (result.hasException() && GenericService.class != invoker.getInterface()) {
            try {
                // 獲得異常
                Throwable exception = result.getException();

                // directly throw if it"s checked exception
                // 如果這是一個checked的異常,則直接返回異常,也就是接口上聲明的Unchecked的異常
                if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                    return result;
                }
                // directly throw if the exception appears in the signature
                // 如果已經在接口方法上聲明了該異常,則直接返回
                try {
                    // 獲得方法
                    Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                    // 獲得異常類型
                    Class[] exceptionClassses = method.getExceptionTypes();
                    for (Class exceptionClass : exceptionClassses) {
                        if (exception.getClass().equals(exceptionClass)) {
                            return result;
                        }
                    }
                } catch (NoSuchMethodException e) {
                    return result;
                }

                // for the exception not found in method"s signature, print ERROR message in server"s log.
                // 打印錯誤 該異常沒有在方法上申明
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                // directly throw if exception class and interface class are in the same jar file.
                // 如果異常類和接口類在同一個jar包里面,則拋出異常
                String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                    return result;
                }
                // directly throw if it"s JDK exception
                // 如果是jdk中定義的異常,則直接拋出
                String className = exception.getClass().getName();
                if (className.startsWith("java.") || className.startsWith("javax.")) {
                    return result;
                }
                // directly throw if it"s dubbo exception
                // 如果 是dubbo的異常,則直接拋出
                if (exception instanceof RpcException) {
                    return result;
                }

                // otherwise, wrap with RuntimeException and throw back to the client
                // 如果不是以上的異常,則包裝成為RuntimeException并且拋出
                return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
            } catch (Throwable e) {
                logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                return result;
            }
        }
        return result;
    } catch (RuntimeException e) {
        logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
        throw e;
    }
}

可以看到除了接口上聲明的Unchecked的異常和有定義的異常外,都會包裝成RuntimeException來返回,為了防止客戶端反序列化失敗。

(十)ExecuteLimitFilter

該過濾器是限制最大可并行執行請求數,該過濾器是服務提供者側,而上述講到的ActiveLimitFilter是在消費者側的限制。

    @Override
    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
        // 獲得url對象
        URL url = invoker.getUrl();
        // 方法名稱
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        // 如果該方法設置了executes并且值大于0
        if (max > 0) {
            // 獲得該方法對應的RpcStatus
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//            if (count.getActive() >= max) {
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
            // 獲得信號量
            executesLimit = count.getSemaphore(max);
            // 如果不能獲得許可,則拋出異常
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than  limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        // 計數加1
        RpcStatus.beginCount(url, methodName);
        try {
            // 調用下一個調用鏈
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
            // 計數減1
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }

為什么這里需要用到信號量來控制,可以看一下以下鏈接的介紹:http://manzhizhen.iteye.com/b...

(十一)GenericFilter

該過濾器就是對于泛化調用的請求和結果進行反序列化和序列化的操作,它是服務提供者側的。

@Override
public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果是泛化調用
    if (inv.getMethodName().equals(Constants.$INVOKE)
            && inv.getArguments() != null
            && inv.getArguments().length == 3
            && !invoker.getInterface().equals(GenericService.class)) {
        // 獲得請求名字
        String name = ((String) inv.getArguments()[0]).trim();
        // 獲得請求參數類型
        String[] types = (String[]) inv.getArguments()[1];
        // 獲得請求參數
        Object[] args = (Object[]) inv.getArguments()[2];
        try {
            // 獲得方法
            Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
            // 獲得該方法的參數類型
            Class[] params = method.getParameterTypes();
            if (args == null) {
                args = new Object[params.length];
            }
            // 獲得附加值
            String generic = inv.getAttachment(Constants.GENERIC_KEY);

            // 如果附加值為空,在用上下文攜帶的附加值
            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
            }

            // 如果附加值還是為空或者是默認的泛化序列化類型
            if (StringUtils.isEmpty(generic)
                    || ProtocolUtils.isDefaultGenericSerialization(generic)) {
                // 直接進行類型轉化
                args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
            } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (byte[].class == args[i].getClass()) {
                        try {
                            UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]);
                            // 使用nativejava方式反序列化
                            args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                    .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                                    .deserialize(null, is).readObject();
                        } catch (Exception e) {
                            throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                        }
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
                                        "] only support message type " +
                                        byte[].class +
                                        " and your message type is " +
                                        args[i].getClass());
                    }
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (args[i] instanceof JavaBeanDescriptor) {
                        // 用JavaBean方式反序列化
                        args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        Constants.GENERIC_SERIALIZATION_BEAN +
                                        "] only support message type " +
                                        JavaBeanDescriptor.class.getName() +
                                        " and your message type is " +
                                        args[i].getClass().getName());
                    }
                }
            }
            // 調用下一個調用鏈
            Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
            if (result.hasException()
                    && !(result.getException() instanceof GenericException)) {
                return new RpcResult(new GenericException(result.getException()));
            }
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 用nativejava方式序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class)
                            .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                            .serialize(null, os).writeObject(result.getValue());
                    return new RpcResult(os.toByteArray());
                } catch (IOException e) {
                    throw new RpcException("Serialize result failed.", e);
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                // 使用JavaBean方式序列化返回結果
                return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD));
            } else {
                // 直接轉化為pojo類型然后返回
                return new RpcResult(PojoUtils.generalize(result.getValue()));
            }
        } catch (NoSuchMethodException e) {
            throw new RpcException(e.getMessage(), e);
        } catch (ClassNotFoundException e) {
            throw new RpcException(e.getMessage(), e);
        }
    }
    // 調用下一個調用鏈
    return invoker.invoke(inv);
}
(十二)GenericImplFilter

該過濾器也是對于泛化調用的序列化檢查和處理,它是消費者側的過濾器。

private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);

/**
 * 參數集合
 */
private static final Class[] GENERIC_PARAMETER_TYPES = new Class[]{String.class, String[].class, Object[].class};

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得泛化的值
    String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
    // 如果該值是nativejava或者bean或者true,并且不是一個返回調用
    if (ProtocolUtils.isGeneric(generic)
            && !Constants.$INVOKE.equals(invocation.getMethodName())
            && invocation instanceof RpcInvocation) {
        RpcInvocation invocation2 = (RpcInvocation) invocation;
        // 獲得方法名稱
        String methodName = invocation2.getMethodName();
        // 獲得參數類型集合
        Class[] parameterTypes = invocation2.getParameterTypes();
        // 獲得參數集合
        Object[] arguments = invocation2.getArguments();

        // 把參數類型的名稱放入集合
        String[] types = new String[parameterTypes.length];
        for (int i = 0; i < parameterTypes.length; i++) {
            types[i] = ReflectUtils.getName(parameterTypes[i]);
        }

        Object[] args;
        // 對參數集合進行序列化
        if (ProtocolUtils.isBeanGenericSerialization(generic)) {
            args = new Object[arguments.length];
            for (int i = 0; i < arguments.length; i++) {
                args[i] = JavaBeanSerializeUtil.serialize(arguments[i], JavaBeanAccessor.METHOD);
            }
        } else {
            args = PojoUtils.generalize(arguments);
        }

        // 重新把序列化的參數放入
        invocation2.setMethodName(Constants.$INVOKE);
        invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES);
        invocation2.setArguments(new Object[]{methodName, types, args});
        // 調用下一個調用鏈
        Result result = invoker.invoke(invocation2);

        if (!result.hasException()) {
            Object value = result.getValue();
            try {
                Method method = invoker.getInterface().getMethod(methodName, parameterTypes);
                if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                    if (value == null) {
                        return new RpcResult(value);
                    } else if (value instanceof JavaBeanDescriptor) {
                        // 用javabean方式反序列化
                        return new RpcResult(JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) value));
                    } else {
                        throw new RpcException(
                                "The type of result value is " +
                                        value.getClass().getName() +
                                        " other than " +
                                        JavaBeanDescriptor.class.getName() +
                                        ", and the result is " +
                                        value);
                    }
                } else {
                    // 直接轉化為pojo類型
                    return new RpcResult(PojoUtils.realize(value, method.getReturnType(), method.getGenericReturnType()));
                }
            } catch (NoSuchMethodException e) {
                throw new RpcException(e.getMessage(), e);
            }
            // 如果調用鏈中有異常拋出,并且是GenericException類型的異常
        } else if (result.getException() instanceof GenericException) {
            GenericException exception = (GenericException) result.getException();
            try {
                // 獲得異常類名
                String className = exception.getExceptionClass();
                Class clazz = ReflectUtils.forName(className);
                Throwable targetException = null;
                Throwable lastException = null;
                try {
                    targetException = (Throwable) clazz.newInstance();
                } catch (Throwable e) {
                    lastException = e;
                    for (Constructor constructor : clazz.getConstructors()) {
                        try {
                            targetException = (Throwable) constructor.newInstance(new Object[constructor.getParameterTypes().length]);
                            break;
                        } catch (Throwable e1) {
                            lastException = e1;
                        }
                    }
                }
                if (targetException != null) {
                    try {
                        Field field = Throwable.class.getDeclaredField("detailMessage");
                        if (!field.isAccessible()) {
                            field.setAccessible(true);
                        }
                        field.set(targetException, exception.getExceptionMessage());
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                    result = new RpcResult(targetException);
                } else if (lastException != null) {
                    throw lastException;
                }
            } catch (Throwable e) {
                throw new RpcException("Can not deserialize exception " + exception.getExceptionClass() + ", message: " + exception.getExceptionMessage(), e);
            }
        }
        return result;
    }

    // 如果是泛化調用
    if (invocation.getMethodName().equals(Constants.$INVOKE)
            && invocation.getArguments() != null
            && invocation.getArguments().length == 3
            && ProtocolUtils.isGeneric(generic)) {

        Object[] args = (Object[]) invocation.getArguments()[2];
        if (ProtocolUtils.isJavaGenericSerialization(generic)) {

            for (Object arg : args) {
                // 如果調用消息不是字節數組類型,則拋出異常
                if (!(byte[].class == arg.getClass())) {
                    error(generic, byte[].class.getName(), arg.getClass().getName());
                }
            }
        } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
            for (Object arg : args) {
                if (!(arg instanceof JavaBeanDescriptor)) {
                    error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
                }
            }
        }

        // 設置附加值
        ((RpcInvocation) invocation).setAttachment(
                Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
    }
    return invoker.invoke(invocation);
}

/**
 * 拋出錯誤異常
 * @param generic
 * @param expected
 * @param actual
 * @throws RpcException
 */
private void error(String generic, String expected, String actual) throws RpcException {
    throw new RpcException(
            "Generic serialization [" +
                    generic +
                    "] only support message type " +
                    expected +
                    " and your message type is " +
                    actual);
}
(十三)TimeoutFilter

該過濾器是當服務調用超時的時候,記錄告警日志。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得開始時間
    long start = System.currentTimeMillis();
    // 調用下一個調用鏈
    Result result = invoker.invoke(invocation);
    // 獲得調用使用的時間
    long elapsed = System.currentTimeMillis() - start;
    // 如果服務調用超時,則打印告警日志
    if (invoker.getUrl() != null
            && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),
            "timeout", Integer.MAX_VALUE)) {
        if (logger.isWarnEnabled()) {
            logger.warn("invoke time out. method: " + invocation.getMethodName()
                    + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
                    + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
        }
    }
    return result;
}
(十四)TokenFilter

該過濾器提供了token的驗證功能,關于token的介紹可以查看官方文檔。

@Override
public Result invoke(Invoker invoker, Invocation inv)
        throws RpcException {
    // 獲得token值
    String token = invoker.getUrl().getParameter(Constants.TOKEN_KEY);
    if (ConfigUtils.isNotEmpty(token)) {
        // 獲得服務類型
        Class serviceType = invoker.getInterface();
        // 獲得附加值
        Map attachments = inv.getAttachments();
        String remoteToken = attachments == null ? null : attachments.get(Constants.TOKEN_KEY);
        // 如果令牌不一樣,則拋出異常
        if (!token.equals(remoteToken)) {
            throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
        }
    }
    // 調用下一個調用鏈
    return invoker.invoke(inv);
}
(十五)TpsLimitFilter

該過濾器的作用是對TPS限流。

/**
 * TPS 限制器對象
 */
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {

    // 如果限流器不允許,則拋出異常
    if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
        throw new RpcException(
                "Failed to invoke service " +
                        invoker.getInterface().getName() +
                        "." +
                        invocation.getMethodName() +
                        " because exceed max service tps.");
    }

    // 調用下一個調用鏈
    return invoker.invoke(invocation);
}

其中關鍵是TPS 限制器對象,請看下面的分析。

(十六)TPSLimiter
public interface TPSLimiter {

    /**
     * judge if the current invocation is allowed by TPS rule
     * 是否允許通過
     * @param url        url
     * @param invocation invocation
     * @return true allow the current invocation, otherwise, return false
     */
    boolean isAllowable(URL url, Invocation invocation);

}

該接口是tps限流器的接口,只定義了一個是否允許通過的方法。

(十七)StatItem

該類是統計的數據結構。

class StatItem {

    /**
     * 服務名
     */
    private String name;

    /**
     * 最后一次重置的時間
     */
    private long lastResetTime;

    /**
     * 周期
     */
    private long interval;

    /**
     * 剩余多少流量
     */
    private AtomicInteger token;

    /**
     * 限制大小
     */
    private int rate;

    StatItem(String name, int rate, long interval) {
        this.name = name;
        this.rate = rate;
        this.interval = interval;
        this.lastResetTime = System.currentTimeMillis();
        this.token = new AtomicInteger(rate);
    }

    public boolean isAllowable() {
        long now = System.currentTimeMillis();
        // 如果限制的時間大于最后一次時間加上周期,則重置
        if (now > lastResetTime + interval) {
            token.set(rate);
            lastResetTime = now;
        }

        int value = token.get();
        boolean flag = false;
        // 直到有流量
        while (value > 0 && !flag) {
            flag = token.compareAndSet(value, value - 1);
            value = token.get();
        }

        // 返回flag
        return flag;
    }

    long getLastResetTime() {
        return lastResetTime;
    }

    int getToken() {
        return token.get();
    }

    @Override
    public String toString() {
        return new StringBuilder(32).append("StatItem ")
                .append("[name=").append(name).append(", ")
                .append("rate = ").append(rate).append(", ")
                .append("interval = ").append(interval).append("]")
                .toString();
    }

}

可以看到該類中記錄了一些訪問的流量,并且設置了周期重置機制。

(十八)DefaultTPSLimiter

該類實現了TPSLimiter,是默認的tps限流器實現。

public class DefaultTPSLimiter implements TPSLimiter {

    /**
     * 統計項集合
     */
    private final ConcurrentMap stats
            = new ConcurrentHashMap();

    @Override
    public boolean isAllowable(URL url, Invocation invocation) {
        // 獲得tps限制大小,默認-1,不限制
        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
        // 獲得限流周期
        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                Constants.DEFAULT_TPS_LIMIT_INTERVAL);
        String serviceKey = url.getServiceKey();
        // 如果限制
        if (rate > 0) {
            // 從集合中獲得統計項
            StatItem statItem = stats.get(serviceKey);
            // 如果為空,則新建
            if (statItem == null) {
                stats.putIfAbsent(serviceKey,
                        new StatItem(serviceKey, rate, interval));
                statItem = stats.get(serviceKey);
            }
            // 返回是否允許
            return statItem.isAllowable();
        } else {
            StatItem statItem = stats.get(serviceKey);
            if (statItem != null) {
                // 移除該服務的統計項
                stats.remove(serviceKey);
            }
        }

        return true;
    }

}

是否允許的邏輯還是調用了統計項中的isAllowable方法。

本文介紹了很多的過濾器,哪些過濾器是在服務引用的,哪些服務器是服務暴露的,可以查看相應源碼過濾器的實現上的注解,

例如ActiveLimitFilter上:

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)

可以看到group為consumer組的,也就是服務消費者側的,則是服務引用過程中的的過濾器。

例如ExecuteLimitFilter上:

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)

可以看到group為provider組的,也就是服務消費者側的,則是服務暴露過程中的的過濾器。

后記
該部分相關的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了在服務引用和服務暴露中的各種filter過濾器。接下來我將開始對rpc模塊的監聽器進行講解。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72871.html

相關文章

  • dubbo源碼解析(四十八)異步化改造

    摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發送請求過程講到的十四的,在以前的邏輯會直接在方法中根據配置區分同步異步單向調用。改為關于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協議,關于協議的系列可以查看下面的文章: du...

    lijinke666 評論0 收藏0
  • dubbo源碼解析(四十六)消費端發送請求過程

    摘要:可以參考源碼解析二十四遠程調用協議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。 2.7大揭秘——消費端發送請求過程 目標:從源碼的角度分析一個服務方法調用經歷怎么樣的磨難以后到達服務端。 前言 前一篇文章講到的是引用服務的過程,引用服務無非就是創建出一個代理。供消費者調用服務的相關方法。...

    fish 評論0 收藏0
  • dubbo源碼解析(四十七)服務端處理請求過程

    摘要:而存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發到的。 2.7大揭秘——服務端處理請求過程 目標:從源碼的角度分析服務端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費端發送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數據包后的一系列處理以及如何返回最終結果。我們也知道消費端在發送請求的時候已經做了編碼,所以我...

    yzzz 評論0 收藏0
  • dubbo源碼解析二十四)遠程調用——dubbo協議

    摘要:遠程調用協議目標介紹遠程調用中跟協議相關的設計和實現,介紹的源碼。二該類繼承了,是協議中獨有的服務暴露者。八該類也是對的裝飾,其中增強了調用次數多功能。 遠程調用——dubbo協議 目標:介紹遠程調用中跟dubbo協議相關的設計和實現,介紹dubbo-rpc-dubbo的源碼。 前言 Dubbo 缺省協議采用單一長連接和 NIO 異步通訊,適合于小數據量大并發的服務調用,以及服務消費者...

    rickchen 評論0 收藏0
  • dubbo源碼解析二十二)遠程調用——Protocol

    摘要:七該類也實現了,也是裝飾了接口,但是它是在服務引用和暴露過程中加上了監聽器的功能。如果是注冊中心,則暴露該創建一個暴露者監聽器包裝類對象該方法是在服務暴露上做了監聽器功能的增強,也就是加上了監聽器。 遠程調用——Protocol 目標:介紹遠程調用中協議的設計和實現,介紹dubbo-rpc-api中的各種protocol包的源碼,是重點內容。 前言 在遠程調用中協議是非常重要的一層,看...

    孫淑建 評論0 收藏0
  • dubbo源碼解析二十七)遠程調用——injvm本地調用

    摘要:遠程調用本地調用目標介紹本地調用的設計和實現,介紹的源碼。前言是一個遠程調用的框架,但是它沒有理由不支持本地調用,本文就要講解關于本地調用的實現。服務暴露者集合取消暴露調用父類的取消暴露方法從集合中移除二該類繼承了類,是本地調用的實現。 遠程調用——injvm本地調用 目標:介紹injvm本地調用的設計和實現,介紹dubbo-rpc-injvm的源碼。 前言 dubbo是一個遠程調用的...

    sean 評論0 收藏0

發表評論

0條評論

cheukyin

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<