--subscriber.onNext(""+i) 这里的i转换为String 后作为参数传递为subscribe(new Action1<String>))中的String
顾名思义,Create操作符是用来创建一个Observable的。下面来看一个简单的代码段:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //Emit Data } })create方法接收一个参数Observable.OnSubscribe 来看下它的源码:
/** * Invoked when Observable.subscribe is called. */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity }Observable.OnSubscribe 说白了就是一个继承了Action1接口的接口:
public interface Action1<T> extends Action { void call(T t); } /** * All Action interfaces extend from this. * <p> * Marker interface to allow instanceof checks. */ public interface Action extends Function { } /** * All Func and Action interfaces extend from this. * <p> * Marker interface to allow instanceof checks. */ public interface Function { }它们的继承关系如下: Observable.OnSubscribe <- Action1 <- Action <- Function
create()方法也就是个工厂方法:
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); }通过OnSubscribe的源码的注释 Invoked when Observable.subscribe is called. 意思是 当Observable被订阅(subscribe) OnSubscribe接口的call方法会被执行。
知道如何创建(create)Observable, 接下来我们看下如何订阅它:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 5; i++) { printLog(tvLogs, "Emit Data:", i + ""); subscriber.onNext("" + i); } } }) .subscribe(new Action1<String>() { @Override public void call(String s) { //showToast(s); printLog(tvLogs, "Consume Data:", s); } });当调用了subscribe方法 Observable.OnSubscribe的call方法就会被执行,在Observable.OnSubscribe的call方法中循环了调用了5次subscriber.onNext,在subscribe的Action1回调就会接受5次回调。
Emit Data:'0' , Thread Name:RxCachedThreadScheduler-1 Emit Data:'1' , Thread Name:RxCachedThreadScheduler-1 Emit Data:'2' , Thread Name:RxCachedThreadScheduler-1 Emit Data:'3' , Thread Name:RxCachedThreadScheduler-1 Emit Data:'4' , Thread Name:RxCachedThreadScheduler-1 Consume Data:'0' , Thread Name:main Consume Data:'1' , Thread Name:main Consume Data:'2' , Thread Name:main Consume Data:'3' , Thread Name:main Consume Data:'4' , Thread Name:main从输出的日志可以看到,我们还打印了Thread Name线程的名称,我们可以控制发送数据、消费数据所在的线程。
.observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io())subscribeOn 设置Observable的call方法所在的线程 【生产数据】
observeOn 设置subscribe的call方法所在的线程【消费数据】