国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RxJava系列六(從微觀角度解讀RxJava源碼)

zero / 3210人閱讀

摘要:而這個就是線程調(diào)度的關(guān)鍵前面的例子中我們通過指定了發(fā)射處理事件以及通知觀察者的一系列操作的執(zhí)行線程,正是通過這個創(chuàng)建了我們前面提到的。總結(jié)這一章以執(zhí)行流程操作符實現(xiàn)以及線程調(diào)度三個方面為切入點剖析了源碼。

轉(zhuǎn)載請注明出處:https://zhuanlan.zhihu.com/p/22338235

RxJava系列1(簡介)

RxJava系列2(基本概念及使用介紹)

RxJava系列3(轉(zhuǎn)換操作符)

RxJava系列4(過濾操作符)

RxJava系列5(組合操作符)

RxJava系列6(從微觀角度解讀RxJava源碼)

RxJava系列7(最佳實踐)

前言

通過前面五個篇幅的介紹,相信大家對RxJava的基本使用以及操作符應(yīng)該有了一定的認識。但是知其然還要知其所以然;所以從這一章開始我們聊聊源碼,分析RxJava的實現(xiàn)原理。本文我們主要從三個方面來分析RxJava的實現(xiàn):

RxJava基本流程分析

操作符原理分析

線程調(diào)度原理分析

本章節(jié)基于RxJava1.1.9版本的源碼

一、RxJava執(zhí)行流程分析

在RxJava系列2(基本概念及使用介紹)中我們介紹過,一個最基本的RxJava調(diào)用是這樣的:

示例A

Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

首先調(diào)用Observable.create()創(chuàng)建一個被觀察者Observable,同時創(chuàng)建一個OnSubscribe作為create()方法的入?yún)ⅲ唤又鴦?chuàng)建一個觀察者Subscriber,然后通過subseribe()實現(xiàn)二者的訂閱關(guān)系。這里涉及到三個關(guān)鍵對象和一個核心的方法:

Observable(被觀察者)

OnSubscribe (從純設(shè)計模式的角度來理解,OnSubscribe.call()可以看做是觀察者模式中被觀察者用來通知觀察者的notifyObservers()方法)

Subscriber (觀察者)

subscribe() (實現(xiàn)觀察者與被觀察者訂閱關(guān)系的方法)

1、Observable.create()源碼分析

首先我們來看看Observable.create()的實現(xiàn):

public static  Observable create(OnSubscribe f) {
    return new Observable(RxJavaHooks.onCreate(f));
}

這里創(chuàng)建了一個被觀察者Observable,同時將RxJavaHooks.onCreate(f)作為構(gòu)造函數(shù)的參數(shù),源碼如下:

protected Observable(OnSubscribe f) {
    this.onSubscribe = f;
}

我們看到源碼中直接將參數(shù)RxJavaHooks.onCreate(f)賦值給了當(dāng)前我們構(gòu)造的被觀察者Observable的成員變量onSubscribe。那么RxJavaHooks.onCreate(f)返回的又是什么呢?我們接著往下看:

public static  Observable.OnSubscribe onCreate(Observable.OnSubscribe onSubscribe) {
    Func1 f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

由于我們并沒調(diào)用RxJavaHooks.initCreate(),所以上面代碼中的onObservableCreate為null;因此RxJavaHooks.onCreate(f)最終返回的就是f,也就是我們在Observable.create()的時候new出來的OnSubscribe。(由于對RxJavaHooks的理解并不影響我們對RxJava執(zhí)行流程的分析,因此在這里我們不做進一步的探討。為了方便理解我們只需要知道RxJavaHooks一系列方法的返回值就是入?yún)⒈旧砭蚈K了,例如這里的RxJavaHooks.onCreate(f)返回的就是f)。

至此我們做下邏輯梳理:Observable.create()方法構(gòu)造了一個被觀察者Observable對象,同時將new出來的OnSubscribe賦值給了該Observable的成員變量onSubscribe

2、Subscriber源碼分析

接著我們看下觀察者Subscriber的源碼,為了增加可讀性,我去掉了源碼中的注釋和部分代碼。

public abstract class Subscriber implements Observer, Subscription {
    
    private final SubscriptionList subscriptions;//訂閱事件集,所有發(fā)送給當(dāng)前Subscriber的事件都會保存在這里
    
    ...

    protected Subscriber(Subscriber subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    ...

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

    public void onStart() {
    }
    
    ...
}
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

Subscriber實現(xiàn)了Subscription接口,從而對外提供isUnsubscribed()unsubscribe()方法。前者用于判斷是否已經(jīng)取消訂閱;后者用于將訂閱事件列表(也就是當(dāng)前觀察者的成員變量subscriptions)中的所有Subscription取消訂閱,并且不再接受觀察者Observable發(fā)送的后續(xù)事件。

3、subscribe()源碼分析

前面我們分析了觀察者和被觀察者相關(guān)的源碼,那么接下來便是整個訂閱流程中最最關(guān)鍵的環(huán)節(jié)了。

public final Subscription subscribe(Subscriber subscriber) {
    return Observable.subscribe(subscriber, this);
}
static  Subscription subscribe(Subscriber subscriber, Observable observable) {

    ...

    subscriber.onStart();
    
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber(subscriber);
    }

    try {
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

subscribe()方法中將傳進來的subscriber包裝成了SafeSubscriberSafeSubscriber其實是subscriber的一個代理,對subscriber的一系列方法做了更加嚴格的安全校驗。保證了onCompleted()onError()只會有一個被執(zhí)行且只執(zhí)行一次,一旦它們其中方法被執(zhí)行過后onNext()就不在執(zhí)行了。

上述代碼中最關(guān)鍵的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。這里的RxJavaHooks和之前提到的一樣,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二個入?yún)?b>observable.onSubscribe,也就是當(dāng)前observable的成員變量onSubscribe。而這個成員變量我們前面提到過,它是我們在Observable.create()的時候new出來的。所以這段代碼可以簡化為onSubscribe.call(subscriber)。這也印證了我在RxJava系列2(基本概念及使用介紹)中說的,onSubscribe.call(subscriber)中的subscriber正是我們在subscribe()方法中new出來的觀察者。

到這里,我們對RxJava的執(zhí)行流程做個總結(jié):首先我們調(diào)用crate()創(chuàng)建一個觀察者,同時創(chuàng)建一個OnSubscribe作為該方法的入?yún)ⅲ唤又{(diào)用subscribe()來訂閱我們自己創(chuàng)建的觀察者Subscriber
一旦調(diào)用subscribe()方法后就會觸發(fā)執(zhí)行OnSubscribe.call()。然后我們就可以在call方法調(diào)用觀察者subscriberonNext(),onCompleted(),onError()

最后我用張圖來總結(jié)下之前的分析結(jié)果:

二、操作符原理分析

之前我們介紹過幾十個操作符,要一一分析它們的源碼顯然不太現(xiàn)實。在這里我拋磚引玉,選取一個相對簡單且常用的map操作符來分析。

我們先來看一個map操作符的簡單應(yīng)用:

示例B

Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
}).map(new Func1() {
    @Override
    public String call(Integer integer) {
        return "This is " + integer;
    }
}).subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

為了便于表述,我將上面的代碼做了如下拆解:

Observable observableA = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
});

Subscriber subscriberOne = new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
};

Observable observableB = 
        observableA.map(new Func1() {
                @Override
                public String call(Integer integer) {
                    return "This is " + integer;;
                }
            });

observableB.subscribe(subscriberOne);

map()的源碼和上一小節(jié)介紹的create()一樣位于Observable這個類中。

public final  Observable map(Func1 func) {
    return create(new OnSubscribeMap(this, func));
}

通過查看源碼我們發(fā)現(xiàn)調(diào)用map()的時候?qū)嶋H上是創(chuàng)建了一個新的被觀察者Observable,我們姑且稱它為ObservableB;一開始通過Observable.create()創(chuàng)建的Observable我們稱之為ObservableA。在創(chuàng)建ObservableB的時候同時創(chuàng)建了一個OnSubscribeMap,而ObservableA和變換函數(shù)Func1則作為構(gòu)造OnSubscribeMap的參數(shù)。

public final class OnSubscribeMap implements OnSubscribe {

    final Observable source;//ObservableA
    
    final Func1 transformer;//map操作符中的轉(zhuǎn)換函數(shù)Func1。T為轉(zhuǎn)換前的數(shù)據(jù)類型,在上面的例子中為Integer;R為轉(zhuǎn)換后的數(shù)據(jù)類型,在該例中為String。

    public OnSubscribeMap(Observable source, Func1 transformer) {
        this.source = source;
        this.transformer = transformer;
    }
    
    @Override
    public void call(final Subscriber o) {//結(jié)合第一小節(jié)的分析結(jié)果,我們知道這里的入?yún)其實就是我們自己new的觀察者subscriberOne。
        MapSubscriber parent = new MapSubscriber(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
    
    static final class MapSubscriber extends Subscriber {
        
        final Subscriber actual;//這里的actual就是我們在調(diào)用subscribe()時創(chuàng)建的觀察者mSubscriber
        final Func1 mapper;//變換函數(shù)
        boolean done;
        
        public MapSubscriber(Subscriber actual, Func1 mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            actual.onNext(result);
        }
        
        @Override
        public void onError(Throwable e) {
            ...
            actual.onError(e);
        }
        
        @Override
        public void onCompleted() {
            ...
            actual.onCompleted();
        }
        
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

OnSubscribeMap實現(xiàn)了OnSubscribe接口,因此OnSubscribeMap就是一個OnSubscribe。在調(diào)用map()的時候創(chuàng)建了一個新的被觀察者ObservableB,然后我們用ObservableB.subscribe(subscriberOne)訂閱了觀察者subscriberOne。結(jié)合我們在第一小節(jié)的分析結(jié)果,所以OnSubscribeMap.call(o)中的o就是subscribe(subscriberOne)中的subscriberOne;一旦調(diào)用了ObservableB.subscribe(subscriberOne)就會執(zhí)行OnSubscribeMap.call()

call()方法中,首先通過我們的觀察者o和轉(zhuǎn)換函數(shù)transformer構(gòu)造了一個MapSubscriber,最后調(diào)用了source也就是observableAunsafeSubscribe()方法。即observableA訂閱了一個觀察者MapSubscriber

public final Subscription unsafeSubscribe(Subscriber subscriber) {
    try {
        ...
        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

上面這段代碼最終執(zhí)行了onSubscribe也就是OnSubscribeMapcall()方法,call()方法中的參數(shù)就是之前在OnSubscribeMap.call()中new出來的MapSubscriber。最后在call()方法中執(zhí)行了我們自己的業(yè)務(wù)代碼:

subscriber.onNext(1);
subscriber.onCompleted();

其實也就是執(zhí)行了MapSubscriberonNext()onCompleted()

@Override
public void onNext(T t) {
    R result;
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        ...
        return;
    }
    actual.onNext(result);
}

onNext(T t)方法中的的mapper就是變換函數(shù),actual就是我們在調(diào)用subscribe()時創(chuàng)建的觀察者subscriberOne。這個T就是我們例子中的IntegerR就是String。在onNext()中首先調(diào)用變換函數(shù)mapper.call()T轉(zhuǎn)換成R(在我們的例子中就是將Integer類型的1轉(zhuǎn)換成了String類型的“This is 1”);接著調(diào)用subscriberOne.onNext(String result)。同樣在調(diào)用MapSubscriber.onCompleted()時會執(zhí)行subscriberOne.onCompleted()。這樣就完成了一直完成的調(diào)用流程。

我承認太啰嗦了,花費了這么大的篇幅才將map()的轉(zhuǎn)換原理解釋清楚。我也是希望盡量的將每個細節(jié)都呈現(xiàn)出來方便大家理解,如果看我啰嗦了這么久還是沒能理解,請看下面我畫的這張執(zhí)行流程圖。

三、線程調(diào)度原理分析

在前面的文章中我介紹過RxJava可以很方便的通過subscribeOn()observeOn()來指定數(shù)據(jù)流的每一部分運行在哪個線程。其中subscribeOn()指定了處理Observable的全部的過程(包括發(fā)射數(shù)據(jù)和通知)的線程;observeOn()指定了觀察者的onNext(), onError()onCompleted()執(zhí)行的線程。接下來我們就分析分析源碼,看看線程調(diào)度是如何實現(xiàn)的。

在分析源碼前我們先看看一段常見的通過RxJava實現(xiàn)的線程調(diào)度代碼:

示例C

Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});
1、subscribeOn()源碼分析
public final Observable subscribeOn(Scheduler scheduler) {
    ...
    return create(new OperatorSubscribeOn(this, scheduler));
}

通過上面的代碼我們可以看到,subscribeOn()map()一樣是創(chuàng)建了一個新的被觀察者Observable。因此我大致就能猜到subscribeOn()的執(zhí)行流程應(yīng)該和map()差不多,OperatorSubscribeOn肯定也是一個OnSubscribe。那我們接下來就看看OperatorSubscribeOn的源碼:

public final class OperatorSubscribeOn implements OnSubscribe {

    final Scheduler scheduler;//線程調(diào)度器,用來指定訂閱事件發(fā)送、處理等所在的線程
    final Observable source;

    public OperatorSubscribeOn(Observable source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber s = new Subscriber(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                source.unsafeSubscribe(s);
            }
        });
    }
}

OperatorSubscribeOn實現(xiàn)了OnSubscribe接口,call()中對Subscriber的處理也和OperatorMapSubscriber的處理類似。首先通過scheduler構(gòu)建了一個Worker;然后用傳進來的subscriber構(gòu)造了一個新的Subscriber s,并將s丟到Worker.schedule()中來處理;最后用原Observable去訂閱觀察者s。而這個Worker就是線程調(diào)度的關(guān)鍵!前面的例子中我們通過subscribeOn(Schedulers.io())指定了Observable發(fā)射處理事件以及通知觀察者的一系列操作的執(zhí)行線程,正是通過這個Schedulers.io()創(chuàng)建了我們前面提到的Worker。所以我們來看看Schedulers.io()的實現(xiàn)。

首先通過Schedulers.io()獲得了ioScheduler并返回,上面的OperatorSubscribeOn通過這個的SchedulercreateWorker()方法創(chuàng)建了我們前面提到的Worker

public static Scheduler io() {
    return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

接著我們看看這個ioScheduler是怎么來的,下面的代碼向我們展現(xiàn)了是如何在Schedulers的構(gòu)造函數(shù)中通過RxJavaSchedulersHook.createIoScheduler()來初始化ioScheduler的。

private Schedulers() {

    ...

    Scheduler io = hook.getIOScheduler();
    if (io != null) {
        ioScheduler = io;
    } else {
        ioScheduler = RxJavaSchedulersHook.createIoScheduler();
    }

    ...
}

最終RxJavaSchedulersHook.createIoScheduler()返回了一個CachedThreadScheduler,并賦值給了ioScheduler

public static Scheduler createIoScheduler() {
    return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    ...
    return new CachedThreadScheduler(threadFactory);
}

到這一步既然我們知道了ioScheduler就是一個CachedThreadScheduler,那我們就來看看它的createWorker()的實現(xiàn)。

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

上面的代碼向我們赤裸裸的呈現(xiàn)了前面OperatorSubscribeOn中的Worker其實就是EventLoopWorker。我們重點要關(guān)注的是他的scheduleActual()

static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
    private final CompositeSubscription innerSubscription = new CompositeSubscription();
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;
    final AtomicBoolean once;

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.once = new AtomicBoolean();
        this.threadWorker = pool.get();
    }

    ...

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        ...
        ScheduledAction s = threadWorker.scheduleActual(new Action0() {
            @Override
            public void call() {
                if (isUnsubscribed()) {
                    return;
                }
                action.call();
            }
        }, delayTime, unit);
        innerSubscription.add(s);
        s.addParent(innerSubscription);
        return s;
    }
}

通過對源碼的一步步追蹤,我們知道了前面OperatorSubscribeOn.call()中的inner.schedule()最終會執(zhí)行到ThreadWorkerscheduleActual()方法。

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
    Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
    ScheduledAction run = new ScheduledAction(decoratedAction);
    Future f;
    if (delayTime <= 0) {
        f = executor.submit(run);
    } else {
        f = executor.schedule(run, delayTime, unit);
    }
    run.add(f);
    return run;
}

scheduleActual()中的ScheduledAction實現(xiàn)了Runnable接口,通過線程池executor最終實現(xiàn)了線程切換。上面便是subscribeOn(Schedulers.io())實現(xiàn)線程切換的全部過程。

2、observeOn()源碼分析

observeOn()切換線程是通過lift來實現(xiàn)的,相比subscribeOn()在實現(xiàn)原理上相對復(fù)雜些。不過本質(zhì)上最終還是創(chuàng)建了一個新的Observable

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
    return lift(new OperatorObserveOn(scheduler, delayError, bufferSize));
}

public final  Observable lift(final Operator operator) {
    return create(new OnSubscribeLift(onSubscribe, operator));
}

OperatorObserveOn作為OnSubscribeLift構(gòu)造函數(shù)的參數(shù)用來創(chuàng)建了一個新的OnSubscribeLift對象,接下來我們看看OnSubscribeLift的實現(xiàn):

public final class OnSubscribeLift implements OnSubscribe {
    
    final OnSubscribe parent;

    final Operator operator;

    public OnSubscribeLift(OnSubscribe parent, Operator operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber o) {
        try {
            Subscriber st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            o.onError(e);
        }
    }
}

OnSubscribeLift繼承自OnSubscribe,通過前面的分析我們知道一旦調(diào)用了subscribe()將觀察者與被觀察綁定后就會觸發(fā)被觀察者所對應(yīng)的OnSubscribecall()方法,所以這里會觸發(fā)OnSubscribeLift.call()。在call()中調(diào)用了OperatorObserveOn.call()并返回了一個新的觀察者Subscriber st,接著調(diào)用了前一級Observable對應(yīng)OnSubscriber.call(st)

我們再看看OperatorObserveOn.call()的實現(xiàn):

public Subscriber call(Subscriber child) {
    ...
    ObserveOnSubscriber parent = new ObserveOnSubscriber(scheduler, child, delayError, bufferSize);
    parent.init();
    return parent;
}

OperatorObserveOn.call()中創(chuàng)建了一個ObserveOnSubscriber并調(diào)用init()進行了初始化。

static final class ObserveOnSubscriber extends Subscriber implements Action0 {

    ...

    @Override
    public void onNext(final T t) {
        ...
        schedule();
    }

    @Override
    public void onCompleted() {
        ...
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        ...
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }

    @Override
    public void call() {
        long missed = 1L;
        long currentEmission = emitted;

        final Queue q = this.queue;
        final Subscriber localChild = this.child;
        final NotificationLite localOn = this.on;
        
        for (;;) {
            long requestAmount = requested.get();
            
            while (requestAmount != currentEmission) {
                boolean done = finished;
                Object v = q.poll();
                boolean empty = v == null;
                
                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }
                
                if (empty) {
                    break;
                }
                
                localChild.onNext(localOn.getValue(v));

                currentEmission++;
                if (currentEmission == limit) {
                    requestAmount = BackpressureUtils.produced(requested, currentEmission);
                    request(currentEmission);
                    currentEmission = 0L;
                }
            }
            
            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }

            emitted = currentEmission;
            missed = counter.addAndGet(-missed);
            if (missed == 0L) {
                break;
            }
        }
    }
    
    ...
}

ObserveOnSubscriber繼承自Subscriber,并實現(xiàn)了Action0接口。我們看到ObserveOnSubscriberonNext()onCompleted()onError()都有個schedule(),這個方法就是我們線程調(diào)度的關(guān)鍵;通過schedule()將新觀察者ObserveOnSubscriber發(fā)送給subscriberOne的所有事件都切換到了recursiveScheduler所對應(yīng)的線程,簡單的說就是把subscriberOneonNext()onCompleted()onError()方法丟到了recursiveScheduler對應(yīng)的線程中來執(zhí)行。

那么schedule()又是如何做到這一點的呢?他內(nèi)部調(diào)用了recursiveScheduler.schedule(this)recursiveScheduler其實就是一個Worker,和我們在介紹subscribeOn()時提到的worker一樣,執(zhí)行schedule()實際上最終是創(chuàng)建了一個runable,然后把這個runnable丟到了特定的線程池中去執(zhí)行。在runnablerun()方法中調(diào)用了ObserveOnSubscriber.call(),看上面的代碼大家就會發(fā)現(xiàn)在call()方法中最終調(diào)用了subscriberOneonNext()onCompleted()onError()方法。這便是它實現(xiàn)線程切換的原理。

好了,我們最后再看看示例C對應(yīng)的執(zhí)行流程圖,幫助大家加深理解。

總結(jié)

這一章以執(zhí)行流程操作符實現(xiàn)以及線程調(diào)度三個方面為切入點剖析了RxJava源碼。下一章將站在更宏觀的角度來分析整個RxJava的框架結(jié)構(gòu)、設(shè)計思想等等。敬請期待~~ :)

如果大家喜歡這一系列的文章,歡迎關(guān)注我的知乎專欄和GitHub。

知乎專欄:https://zhuanlan.zhihu.com/baron

GitHub:https://github.com/BaronZ88

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/66533.html

相關(guān)文章

  • RxJava系列番外篇:一個RxJava解決復(fù)雜業(yè)務(wù)邏輯的案例

    摘要:之前寫過一系列的文章,也承諾過會盡快有的介紹。所以這次還是給大家分享一個使用解決問題的案例,希望對大家在使用的時候有一點點啟發(fā)。上述這一套復(fù)雜的業(yè)務(wù)邏輯如果使用傳統(tǒng)編碼方式將是極其復(fù)雜的。 之前寫過一系列RxJava1的文章,也承諾過會盡快有RxJava2的介紹。無奈實際項目中還未真正的使用RxJava2,不敢妄動筆墨。所以這次還是給大家分享一個使用RxJava1解決問題的案例,希望對...

    EscapedDog 評論0 收藏0
  • RxJava系列七(最佳實踐)

    摘要:按照計劃這一期是要介紹框架結(jié)構(gòu)和設(shè)計思想的,但是考慮到將在十月底發(fā)布正式版因此決定將框架結(jié)構(gòu)和設(shè)計思想分析放到正式版發(fā)布后再做。后續(xù)我也會有一系列的文章來介紹和的區(qū)別。首選我們需要調(diào)用系統(tǒng)來獲取所有已安裝的,所以在的方法中調(diào)用。 轉(zhuǎn)載請注明出處:[https://zhuanlan.zhihu.com/p/... RxJava系列1(簡介) RxJava系列2(基本概念及使用介紹) R...

    Carson 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<