国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RxJava 源碼解析之觀察者模式

Steve_Wang_ / 707人閱讀

摘要:傳統觀察者模式觀察者模式面向的需求是對象觀察者對對象被觀察者的某種變化高度敏感,需要在變化的一瞬間做出反應。如下圖而作為一個工具庫,使用的就是通用形式的觀察者模式。這是為了方便以上就是最基本的一個通過觀察者模式,來響應事件的原理。

了解 RxJava 的應該都知道是一個基于事務驅動的庫,響應式編程的典范。提到事務驅動和響應就不得不說說,設計模式中觀察者模式,已經了解的朋友,可以直接跳過觀察者模式的介紹,直接到 RxJava 源碼中對于觀察者的應用。

觀察者模式

該部分結合自扔物線的 《給 Android 開發者的 RxJava 詳解》, 強烈推薦剛接觸 RxJava 的朋友閱讀。

傳統觀察者模式

觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。舉個例子,新聞里喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候實施抓捕。在這個例子里,警察是觀察者,小偷是被觀察者,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間。程序的觀察者模式和這種真正的『觀察』略有不同,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態),而是采用注冊( Register )或者稱為訂閱( Subscribe )的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。 Android 開發中一個比較典型的例子是點擊監聽器 OnClickListener 。對設置 OnClickListener 來說, View 是被觀察者, OnClickListener 是觀察者,二者通過 setOnClickListener() 方法達成訂閱關系。訂閱之后用戶點擊按鈕的瞬間,Android Framework 就會將點擊事件發送給已經注冊的 OnClickListener 。采取這樣被動的觀察方式,既省去了反復檢索狀態的資源消耗,也能夠得到最高的反饋速度。當然,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務必通知我』。

OnClickListener 的模式大致如下圖:

如圖所示,通過 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(這一過程沒有在圖上畫出);當用戶點擊時,Button 自動調用 OnClickListeneronClick() 方法。另外,如果把這張圖中的概念抽象出來(Button -> 被觀察者、OnClickListener -> 觀察者、setOnClickListener() -> 訂閱,onClick() -> 事件),就由專用的觀察者模式(例如只用于監聽控件點擊)轉變成了通用的觀察者模式。如下圖:

而 RxJava 作為一個工具庫,使用的就是通用形式的觀察者模式。

RxJava 中觀察者模式

RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現訂閱關系,從而 Observable 可以在需要的時候發出事件來通知 Observer。

與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted()onError()

onCompleted(): 事件隊列完結。RxJava 不僅把每個事件多帶帶處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標志。

onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。

在一個正確運行的事件序列中, onCompleted()onError() 有且只有一個,并且是事件序列中的最后一個。需要注意的是,onCompleted()onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。并且只要onCompleted()onError() 中有一個調用了,都會中止 onNext() 的調用。

RxJava 的觀察者模式大致如下圖:

基本實現

基于以上觀點, RxJava 的基本實現主要有三點:

創建 Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer 接口的實現方式:

Observer observer = new Observer() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 接口之外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的:

Subscriber subscriber = new Subscriber() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。

// Observable.java 源碼

    public final Subscription subscribe(final Observer observer) {
        if (observer instanceof Subscriber) { // 如果是 Subscriber 的子類,直接轉化為 Subscriber
            return subscribe((Subscriber)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        
        return subscribe(new ObserverSubscriber(observer));
    }
// ObserverSubscriber.java

public final class ObserverSubscriber extends Subscriber {
    ...
}

通過源碼可以看到,傳入的 Observer 最終還是會轉化為 Subscriber 來使用。

所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區別對于使用者來說主要有兩點:

onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用于做一些準備工作,例如數據的清零或重置。這是一個可選方法,默認情況下它的實現為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法。

// Subscriber.java

    public void onStart() {
        // do nothing by default
    }

unsubscribe(): 這是 Subscriber 所實現的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調用后,Subscriber 將不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,因為在 subscribe() 之后, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有內存泄露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免內存泄露的發生。

// Subscriber.java

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    
    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }
創建 Observable

Observable 即被觀察者,它決定什么時候觸發事件以及觸發怎樣的事件。例如 create() 方法

Observable observable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

可以看到,這里傳入了一個 OnSubscribe 對象作為參數。OnSubscribe 會被存儲在返回的 Observable 對象中,它的作用相當于一個計劃表,當 Observable 被訂閱的時候,OnSubscribecall() 方法會自動被調用,事件序列就會依照設定依次觸發(對于上面的代碼,就是觀察者Subscriber 將會被調用三次 onNext() 和一次 onCompleted()。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

create() 方法是 RxJava 最基本的創造事件序列的方法。基于這個方法, RxJava 還提供了一些方法用來快捷創建事件隊列,例如 just(), from()

訂閱 Subscribe

創建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯結起來,整條鏈子就可以工作了。代碼形式很簡單:

observable.subscribe(observer);

// 或者:
observable.subscribe(subscriber);

有人可能會注意到, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關系。這讓人讀起來有點別扭,不過如果把 API 設計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對流式 API 的設計就造成影響了,比較起來明顯是得不償失的。

整個過程中對象間的關系如下圖:

源碼層解析 基礎原理
// 例子

Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("Hi");
                subscriber.onNext("Aloha");
                subscriber.onCompleted();
            }
        }).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("value: " + s);
            }
        });

log 信息

value: Hello
value: Hi
value: Aloha
onCompleted

看到上面代碼,可能會有人跟我一樣不明白, create() 中的 OnSubscribecall()Subscriber 是怎么樣最終就變成了 subscribe() 中的 Subscriber

下面來一下 Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。

static  Subscription subscribe(Subscriber subscriber, Observable observable) {

    ...

    // 可以用于做一些準備工作,例如數據的清零或重置, 默認情況下它的實現為空
    subscriber.onStart();

    if (!(subscriber instanceof SafeSubscriber)) {
        // 強制轉化為 SafeSubscriber 是為了保證 onCompleted 或 onError 調用的時候會中止 onNext 的調用
        subscriber = new SafeSubscriber(subscriber);
    }

    ...
    // // onObservableStart() 默認返回的就是 observable.onSubscribe
    RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        
    // onObservableReturn() 默認也是返回 subscriber
    return RxJavaHooks.onObservableReturn(subscriber);   
    
    ...
}

通過源碼可以看到,subscriber() 實際就做了 4 件事情

調用 Subscriber.onStart() 。這個方法在前面已經介紹過,是一個可選的準備方法。

將傳入的 Subscriber 轉化為 SafeSubscriber, 為了保證 onCompleted 或 onError 調用的時候會中止 onNext 的調用。

// 注意:這不是 SafeSubscriber 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。

public class SafeSubscriber extends Subscriber {

    private final Subscriber actual;

    boolean done; // 通過改標志來保證 onCompleted 或 onError 調用的時候會中止 onNext 的調用

    public SafeSubscriber(Subscriber actual) {
        super(actual);
        this.actual = actual;
    }

    @Override
    public void onCompleted() {
        if (!done) {
            done = true;

            ...
            
            actual.onCompleted();
            
            ...

            unsubscribe(); // 取消訂閱,結束事務
        }
    }
       
    @Override
    public void onError(Throwable e) {
        
        ...

        if (!done) {
            done = true;
            _onError(e);
        }
    }

    @Override
    public void onNext(T t) {
    
        if (!done) { // done 為 true 時,中止傳遞
            actual.onNext(t);
        }
        
    }

    @SuppressWarnings("deprecation")
    protected void _onError(Throwable e) {
        ...

        actual.onError(e);

        ...

        unsubscribe();
 
        ...
    }
}

通過代碼可以看出來,通過 SafeSubscriber 中的布爾變量 done 來做標記保證上文提到的 onCompleted()onError() 二者的互斥性,即在隊列中調用了其中一個,就不應該再調用另一個。并且只要 onCompleted()onError() 中有一個調用了,都會中止 onNext() 的調用。

調用 Observable 中的 OnSubscribe.call(Subscriber) 。在這里,事件發送的邏輯開始運行。從這也可以看出,在 RxJava 中, Observable 并不是在創建的時候就立即開始發送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。

將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().

以上就是 RxJava 最基本的一個通過觀察者模式,來響應事件的原理。下面來看看 RxJava 中一些基本操作符的實現原理又是怎樣的。

為了能更好的理解源碼,需要對 RxJava 有基本的使用基礎,對 RxJava 不太熟悉的朋友請先一步到《給 Android 開發者的 RxJava 詳解》

進階
        Observable.interval(1, TimeUnit.SECONDS)
                .map(new Func1() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                })
                .subscribe(new Action1() {
                    @Override
                    public void call(Long aLong) {
                        System.out.println("value: " + aLong);
                    }
                });

log 信息

value: 0
value: 5
value: 10
...

上面的列子會每秒生成一個從 0 依次遞增的整數,然后通過 map() 變換操作符后,變成了 5 的倍數的一個整數列。

第一次看到該例子時,就喜歡上了 RxJava,這種鏈式函數的交互模式真的很簡潔,終于可以從回調地獄里逃出來了。喜歡的同時不免也會想 RxJava 是如何實現的。這種鏈式的函數流可以算是建造者模式的一種變形,只不過省去了中間 Builder 而直接返回當前對象來實現。 更讓我興奮的是內部這些操作符的實現原理。

上文也已經說過了在 RxJava 中, Observable 并不是在創建的時候就立即開始發送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。 所以對于上面一段的代碼我們要從 subscribe() 往前屢,首先看一下 map() 這個函數的內部實現。

    public final  Observable map(Func1 func) {
        // 新建了一個 Observable 并使用新的 OnSubscribeMap 來封裝傳入的數據
        return unsafeCreate(new OnSubscribeMap(this, func));
    }

不用說大家也猜到了 OnSubscribeMapOnSubscribe 的子類

// 注意:這不是 OnSubscribeMap 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。

public final class OnSubscribeMap implements OnSubscribe {

    final Observable source;

    final Func1 transformer;

    public OnSubscribeMap(Observable source, Func1 transformer) {
        this.source = source; // 列子中經過 Observable.interval() 函數生成的 Observable
        this.transformer = transformer;
    }

    // 傳入的 o 就是例子中 `subscribe()` 出入的 Subscribe 
    // 具體結合 Observable.subscribe() 源碼來理解
    @Override
    public void call(final Subscriber o) {
        
        // 對傳入的 Subscriber 進行再次封裝成 MapSubscriber
        // 具體 Observable.map() 的邏輯是在 MapSubscriber 中
        MapSubscriber parent = new MapSubscriber(o, transformer); 
        o.add(parent); // 加入到 SubscriptionList 中,為之后取消訂閱
        
        // Observable.interval() 返回的 Observable 進行訂閱(關鍵點)
        source.unsafeSubscribe(parent); 
    }

    ...
}

可以看到 call() 方法的邏輯很簡單,只是將例子中 Observable.subscribe() 傳入的 Subscriber 進行封裝后,再將上流傳入的 Observable 進行訂閱

// 注意:這不是 MapSubscriber 的源碼
// 而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。
static final class MapSubscriber extends Subscriber {

    final Subscriber actual;

    final Func1 mapper;

    public MapSubscriber(Subscriber actual, Func1 mapper) {
        this.actual = actual; // Observable.subscribe() 傳入的 Subscriber
        this.mapper = mapper;
    }

    @Override
    public void onNext(T t) {
        R result;

        ...

        result = mapper.call(t); // 數據進行了變換

        ...

        actual.onNext(result); // 往下流傳
    }

    ...
}

通過以上就完成了 map() 對數據的變換,這里最終的就是理解 OnSubscribeMapcall()source.unsafeSubscribe(parent); source 指的是例子中 Observable.interval() 生成的對象。

再來看一下 RxJava 中對 Observable.interval() 的實現

    public static Observable interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        return unsafeCreate(new OnSubscribeTimerPeriodically(initialDelay, period, unit, scheduler));
    }

可以看出 interval()map() 一樣都是通過生成新的 Observable 并向 Observable 中傳入與之對應的 OnSubscribe 的子類來完成具體操作。

// 注意:這不是 OnSubscribeTimerPeriodically 的源碼
// 而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。

public final class OnSubscribeTimerPeriodically implements OnSubscribe {
    final long initialDelay;
    final long period;
    final TimeUnit unit;
    final Scheduler scheduler;

    public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    // 傳入的 Subscriber 為上文提到的 OnSubscribeMap.call() 方法中 source.unsafeSubscribe(parent);
    @Override
    public void call(final Subscriber child) {
        final Worker worker = scheduler.createWorker();
        child.add(worker);
        worker.schedulePeriodically(new Action0() {
            long counter;
            @Override
            public void call() {
                ...

                child.onNext(counter++);

                ...
            }

        }, initialDelay, period, unit);
    }
}

以上就是 RxJava 整體的邏輯結構,可以看到 RxJava 將觀察者模式發揮的淋漓盡致。整體邏輯的處理有點像遞歸函數的原理。而 map() 則像一種代理機制,通過事件攔截和處理實現事件序列的變換。

總結: 精簡掉細節的話,也可以這么說:在 Observable 執行了各種操作符( map, interval 等)之后 方法之后,會返回一個新的 Observable,這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,并在處理后發送給 Subscriber。

參考

給 Android 開發者的 RxJava 詳解

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/66911.html

相關文章

  • 「碼個蛋」2017年200篇精選干貨集合

    摘要:讓你收獲滿滿碼個蛋從年月日推送第篇文章一年過去了已累積推文近篇文章,本文為年度精選,共計篇,按照類別整理便于讀者主題閱讀。本篇文章是今年的最后一篇技術文章,為了讓大家在家也能好好學習,特此花了幾個小時整理了這些文章。 showImg(https://segmentfault.com/img/remote/1460000013241596); 讓你收獲滿滿! 碼個蛋從2017年02月20...

    wangtdgoodluck 評論0 收藏0
  • RxJava2.x源碼解析(一):訂閱流程

    摘要:現在網上已經有大量的源碼分析文章,各種技術的都有。你完全可以寫成下面的鏈式風格方法會最先被執行同樣,為了便于理解,我會借用流里面經常用到的水流進行類比。該子類的命名是有規律可言的。現在網上已經有大量的源碼分析文章,各種技術的都有。但我覺得很多文章對初學者并不友好,讓人讀起來云里霧里的,比源碼還源碼。究其原因,是根本沒有從學習者的角度去分析。在自己完成了源碼閱讀之后,卻忘記了自己是如何一步步提...

    harryhappy 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<