Osheep

时光不回头,当下最重要。

Rxjava2源码分析(三)

概述

书接上文,上节我们分析了Rxjava是如何对被观察线程进行调度的,这节我们来分析下Rxjava是如何对观察者线程进行调度的。还是之前的套路,先看个简单的demo。

简单的例子

    private void doSomeWork() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                Log.i("lx", " subscribe: " + Thread.currentThread().getName());
                e.onNext("a");
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
            }
            @Override
            public void onNext(String str) {
                Log.i("lx", " onNext: " + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {
                Log.i("lx", " onError: " + Thread.currentThread().getName());
            }
            @Override
            public void onComplete() {
                Log.i("lx", " onComplete: " + Thread.currentThread().getName());
            }
        });
    }

看看运行结果:

com.rxjava2.android.samples I/lx:  onSubscribe: main
com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onNext: main
com.rxjava2.android.samples I/lx:  onComplete: main

从结果可以看出,事件的生产线程运行在RxCachedThreadScheduler-1中,而事件的消费线程则被调度到了main线程中。关键代码是因为这句.observeOn(AndroidSchedulers.mainThread())。 下面我们着重分析下这句代码都做了哪些事情。

AndroidSchedulers.mainThread()

先来看看AndroidSchedulers.mainThread()是什么?贴代码

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

注释已经说的很明白了,是一个在主线程执行任务的scheduler,接着看

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });
            
    public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
    if (scheduler == null) {
        throw new NullPointerException("scheduler == null");
    }
    Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
    if (f == null) {
        return callRequireNonNull(scheduler);
    }
    return applyRequireNonNull(f, scheduler);
    }
    

代码很简单,这个AndroidSchedulers.mainThread()想当于new HandlerScheduler(new Handler(Looper.getMainLooper())),原来是利用AndroidHandler来调度到main线程的。

我们再看看HandlerScheduler,它与我们上节分析的IOScheduler类似,都是继承自Scheduler,所以AndroidSchedulers.mainThread()其实就是是创建了一个运行在main thread上的scheduler。
好了,我们再回过头来看observeOn方法。

observeOn

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

重点是这个new ObservableObserveOn,看名字是不是有种似成相识的感觉,还记得上篇的ObservableSubscribeOn吗? 它俩就是亲兄弟,是继承自同一个父类。
重点还是这个方法,我们前文已经提到了,Observable的subscribe方法最终都是调用subscribeActual方法。下面看看这个方法的实现:


    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // scheduler 就是前面提到的 HandlerScheduler,所以进入else分支
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 创建 HandlerWorker
            Scheduler.Worker w = scheduler.createWorker();
            // 调用上游Observable的subscribe,将订阅向上传递
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

从上面代码可以看到使用了ObserveOnObserver类对observer进行装饰,好了,我们再来看看ObserveOnObserver
我们已经知道了,事件源发射的事件,是通过observer的onNext,onError,onComplete发射到下游的。所以看看ObserveOnObserver的这三个方法是如何实现的。
由于篇幅问题,我们只分析onNext方法,onErroronComplete方法有兴趣的同学可以自己分析下。

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        
        // 如果是非异步方式,将上游发射的时间加入到队列
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }
    
    void schedule() {
        // 保证只有唯一任务在运行
        if (getAndIncrement() == 0) {
            // 调用的就是HandlerWorker的schedule方法
            worker.schedule(this);
        }
    }
    
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }
        
       

schedule方法将传入的run调度到对应的handle所在的线程来执行,这个例子里就是有main线程来完成。 再回去看看前面传入的run吧。
回到ObserveOnObserver中的run方法:

    @Override
    public void run() {
        // 此例子中代码不会进入这个分支,至于这个drainFused是什么,后面章节再讨论。
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    
    void drainNormal() {
        int missed = 1;

        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;

        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }

            for (;;) {
                boolean d = done;
                T v;

                try {
                    // 从队列中queue中取出事件
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;

                if (checkTerminated(d, empty, a)) {
                    return;
                }

                if (empty) {
                    break;
                }
                //调用下游observer的onNext将事件v发射出去
                a.onNext(v);
            }

            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }

至此我们明白了RXjava是如何调度消费者线程了。

消费者线程调度流程概括

Rxjava调度消费者现在的流程,以observeOn(AndroidSchedulers.mainThread())为例。

  1. AndroidSchedulers.mainThread()先创建一个包含handlerScheduler, 这个handler是主线程的handler
  2. observeOn方法创建ObservableObserveOn,它是上游Observable的一个装饰类,其中包含前面创建的SchedulerbufferSize等.
  3. 当订阅方法subscribe被调用后,ObservableObserveOnsubscribeActual方法创建Scheduler.Worker并调用上游的subscribe方法,同时将自身接收的参数’observer’用装饰类ObserveOnObserver装饰后传递给上游。
  4. 当上游调用被ObserveOnObserveronNextonErroronComplete方法时,ObserveOnObserver将上游发送的事件通通加入到队列queue中,然后再调用scheduler将处理事件的方法调度到对应的线程中(本例会调度到main thread)。 处理事件的方法将queue中保存的事件取出来,调用下游原始的observer再发射出去。
  5. 经过以上流程,下游处理事件的消费者线程就运行在了observeOn调度后的thread中。

总结

经过前面两节的分析,我们已经明白了Rxjava是如何对线程进行调度的。

  • Rxjava的subscribe方法是由下游一步步向上游进行传递的。会调用上游的subscribe,直到调用到事件源。
    如:
source.subscribe(xxx);

而上游的source往往是经过装饰后的Observable, Rxjava就是利用ObservableSubscribeOnsubscribe方法调度到了指定线程运行,生产者线程最终会运行在被调度后的线程中。但多次调用subscribeOn方法会怎么样呢? 我们知道因为subscribe方法是由下而上传递的,所以事件源的生产者线程最终都只会运行在第一次执行subscribeOn所调度的线程中,换句话就是多次调用subscribeOn方法,只有第一次有效。

  • Rxjava发射事件是由上而下发射的,上游的onNextonErroronComplete方法会调用下游传入的observer的对应方法。往往下游传递的observer对象也是经过装饰后的observer对象。Rxjava就是利用ObserveOnObserver将执行线程调度后,再调用下游对应的onNextonErroronComplete方法,这样下游消费者就运行再了指定的线程内。 那么多次调用observeOn调度不同的线程会怎么样呢? 因为事件是由上而下发射的,所以每次用observeOn切换完线程后,对下游的事件消费都有效,比如下游的map操作符。最终的事件消费线程运行在最后一个observeOn切换后线程中。
  • 另外通过源码可以看到onSubscribe运行在subscribe的调用线程中,这个就不具体分析了。
点赞