分析下这段代码,我们可以通过Observable.create来创建一个Observable被观察者,需要传入一个Observable.OnSubscribe参数,这个参数是干嘛的呢?我们先通过源码来看下这个创建方法的注释
Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to it.
字面上的翻译是:返回一个被观察者,当一个订阅者订阅它,会执行指定的方法,所以知道这个 Observable.OnSubscribe 参数,实现的call方法,里面就是我们指定的要告诉订阅者执行的方法。具体要执行的方法就是 subscriber 执行三遍 onNext() ,然后执行一遍 onCompleted()
这是基本的创建观察者的方式,可以选择Observer或者Subscriber,直接new一个对象,实现接口的方法即可,这个不难。
RxJava按调用的顺序来讲是反着的,像是被观察者订阅了观察者,订阅了订阅者一样。但我们可以这样理解,在创建被观察者的时候,是被观察者主动指定观察者应该执行哪些方法的,所以这边被观察者订阅观察者也是可以接受的。
这里的Action又是什么呢?我们看下RxJava这些调用相关内部的方法源码,简化后:
public final Subscription subscribe(final Action1<? super T> onNext) { Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED; Action0 onCompleted = Actions.empty(); return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted)); } public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) { Action0 onCompleted = Actions.empty(); return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted)); } public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) { return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted)); } public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }通过源码我们可以知道最终他会通过一个onError,一个onCompleted和一个onNext三个Action来构造一个ActionSubscriber对象,这个时候分三种情况:
当我们只传入一个 Action1 onNext 时,这个对象会给我们加一个 ERROR_NOT_IMPLEMENTED 没有实现的 Action1 onError 和一个空的 Actio0 onCompleted ,构造一个ActionSubscriber;当我们传入的是 Action1 onNext 和 Action1 onError 时,它会帮我们再加一个空的 Action0 onCompleted ,构造一个ActionSubscriber;当我们传入的是 Action1 onNext 、Action1 onError 、Action0 onCompleted 时,它刚好构造一个ActionSubscriber细心的同学可能会发现了,onNext对应的是Action1,onError对应的Action1,onCompleted对应的Action0,下面是ActionSubscriber类的源码。
public final class ActionSubscriber<T> extends Subscriber<T> { final Action1<? super T> onNext; final Action1<Throwable> onError; final Action0 onCompleted; public ActionSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted) { this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } @Override public void onNext(T t) { onNext.call(t); } @Override public void onError(Throwable e) { onError.call(e); } @Override public void onCompleted() { onCompleted.call(); } }通过代码可以知道,ActionSubscriber是继承Subscriber,也是属于一个订阅者。只是通过它我们可以分离出其中的Action。