Observable (被观察者,事件源) Subscribers(观察者)
1.创建一个Observable对象很简单,直接调用Observable.create即可
Observable<String> observable = Observable.create( new OnSubscribe<String>() { public void call(Subscriber<? super String> sub) { sub.onNext("Hello, world!"); } } );Observable.create 先进行创建 new OnSubscribe一个观察者出现<String> call 中给观察者添加数据Hello word
2.创建一个Subscriber来处理Observable对象发出的字符串。
Subscriber<String> sub = new Subscriber<String>() { public void onNext(String text) { Log.d(TAG,text); } public void onError(Throwable arg0) {} public void onCompleted() {} };3.将观察者和被观察者进行关联起来
observable.subscribe(sub);4.可以简写为
Observable.just("Hello world").subscribe(new Action1<String>() { public void call(String text) { Log.d(TAG, text); } });其中just相当于仅仅发送一条消息,然后直接关联观察者…subscribe方法有一个重载版本,接受三个Action1类型的参数,分别对应OnNext,OnComplete, OnError函数。
Observable和Subscriber可以做任何事情 Observable可以是一个数据库查询,Subscriber用来显示查询结果;Observable可以是屏幕上的点击事件,Subscriber用来响应点击事件;Observable可以是一个网络请求,Subscriber用来显示请求结果。
Observable和Subscriber是独立于中间的变换过程的。 在Observable和Subscriber中间可以增减任何数量的map。整个系统是高度可组合的,操作数据是一个很简单的过程。
5.方法介绍
Observable.interval(1,Integer,TimeUtil.SECONDS);每隔一秒发送数据Observable.from(Integer[]);一般存放数组类型,执行结果Observable.just(Integer[],Integer[],...);依次按照顺序执行Observable.range(0,40);指定输出数据的范围map(): 事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava 最常用的变换. Observable.filter(new Func1<Object,Boolean>(){ public Boolean call(){ return true;//进行过滤 } }).observeOn(Schedulers.io());在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
如果想要在执行前oncreate等一系列的方法前执行准备操作可以用doOnSubscribe()
默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。下面例子是一个加载图片,进度条的例子
Observable.create(new Observable.OnSubscribe<Bitmap>() { public void call(final Subscriber<? super Bitmap> subscriber) { SystemClock.sleep(5000); OkGo.get(URL).tag(this).execute(new BitmapCallback() { public void onSuccess(Bitmap bitmap, Call call, Response response){ subscriber.onNext(bitmap); subscriber.onCompleted(); } }); } }) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { public void call() { mProBar.setVisibility(View.VISIBLE); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Bitmap>() { public void onCompleted() { mProBar.setVisibility(View.GONE); } public void onError(Throwable e) { } public void onNext(Bitmap bitmap) { mIv.setImageBitmap(bitmap); } }); 如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。