RxAndroid2.0使用概述

    xiaoxiao2021-03-25  76

    RxJava —— WHAT | WHEN | HOW (基于rxjava2)

    1.WHAT

    RxJava本质上是一个实现异步操作的库,是扩展的观察者模式。

    实现异步还有可以使用AsyncTask、Handler等其他线程类,但是RxJava可以做到简洁。

    从SD中读取所有图片并添加到图片管理,因为有IO操作,所以使用线程处理。

    new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageManager.add(bitmap); } }); } } } } }.start();

    而使用RxJava中可以这样写:

    Observable.from(folders) .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageManager.add(bitmap); } });

    虽然代码上是增加了,但是逻辑上采用了链式调用,更加清晰。使用了lamda还可以是这样:

    Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

    RxJava中有四个基本概念:观察者、被观察者/主题、订阅、事件。

    Observable/Observer(不支持backpress)

    Flowable/FlowableSubscriber(支持backpress)

    Single/SingleObserver(只发送一个onSuccess或者onError的通知)

    Completable/CompletableObserver(只发送一个onComplete或者onError的通知)

    Maybe/MaybeObserver(Single与Completable的结合,只发送一个onSuccess或者onComplete或者onError的通知)

    Backpress:当被观察者不停发射数据流,而观察者的响应不及时就会产生MissingBackpressureException,所以需要一定的策略来应对无法这种情况。

    2.WHEN

    常见的使用场景可以是请求网络、io操作等,具体可以参考的业务逻辑,如: - 检查缓存是否失效取然后选择数据源; - 需要多个接口返回数据后更新界面; - 接口的请求参数另一个接口请求返回的数据; - 界面按钮需要防止连续点击的情况; - 响应式的界面; - 复杂的数据变换;

    3. HOW

    3.1 操作符

    creat、just、fromXXX

    Observable.create从零开始创建一个Observable; Observable.just的参数即为将要发射的数据,可传入多个基类相同对象; Observable.fromXXX方法包括:fromArray、fromCallable、fromFuture、fromIterable、fromPublisher;from方法参数类型:数组、Callable接口对象、Future接口对象、Iterable接口对象、Publisher接口对象

    Future,Callable都是Java并发框架的接口,Callable 、Future、ThreadPoolExecutor需要一起研究;Publisher接口是rxjava的基础接口。

    empty、never、error

    empty:创建一个不发射任何数据但是正常终止的Observable,此时会回调onCompleted

    never:创建一个不发射数据也不终止的Observable

    error:创建一个不发射数据以一个错误终止的Observable

    range

    该操作符创建特定整数序列的Observable,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据

    timer

    timer操作符创建一个在给定的时间段之后返回一个特殊值的Observable。它在延迟一段给定的时间后发射一个简单的数字0 。

    interval

    该操作符按固定的时间间隔发射一个无限递增的整数序列,它接受一个表示时间间隔的参数和一个表示时间单位的参数。

    repeat

    该操作符是重复的发射某个数据序列,并且可以自己设置重复的次数。

    defer

    直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable,该操作符能保证订阅执行时数据源是最新的数据。

    map

    该操作符是对原始Observable发射的每一项数据运用一个函数,然后返回一个发射这些结果的Observable。

    flatMap

    该操作符与map操作符的区别是它将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。

    cast

    该操作符就是做一些强制类型转换操作的。例如,当我们在页面跳转时数据对象往往是序列化的,当我们在新的页面收到数据后就要强制转换为我们想要的类型。底层调用map操作符。

    concatMap

    该操作符是类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。

    switchMap

    当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个。

    filter

    该操作符接收一个Predicate参数,可以在其中通过运用自己的判断条件去判断我们要过滤的数据,当数据通过判断条件后返回true表示发射该项数据,否则就不发射。

    first

    只对Observable发射的第一项数据,或者满足某个条件的第一项数据进行处理,则可以使用First操作符。

    last

    该操作符与first意义相反,若我们只对Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣时使用该操作符。

    publish、refCount

    Observable 有 Cold 和 Hot 之分:Hot Observable 无论有没有 Observer 订阅,事件都会发出;Cold Observable 只有 Subscriber 订阅时,才开始执行发射数据流的代码。

    Observable 的 just、creat、range、fromXXX 等操作符都能生成Cold Observable。

    使用 publish 操作符,可以让 Cold Observable 转换成 Hot Observable,它将Observable 转换成 ConnectableObservable,而ConnectableObservable在被订阅后需要调用connect()才会开始发射数据流;ConnectableObservable的refCount操作符可以将Hot Observable转换成 Cold Observable。

    3.2 源码分析

    订阅

    通过查看源码会看到Observable.just调用了Observable.fromXXX方法,而Observable.fromXXX和Observable.create调用了RxJavaPlugins.onAssembly。 RxJavaPlugins.onAssembly参数:

    ObservableFromCallable<T> extends Observable<T> implements Callable<T> ObservableFromArray<T> extends Observable<T> ObservableFromFuture<T> extends Observable<T> ObservableFromIterable<T> extends Observable<T> ObservableFromPublisher<T> extends Observable<T>

    创建Observable对象的from方法返回的就是ObservableFrom这些对象,ObservableFrom对象继承自Observable,实现了subscribeActual方法。

    Observable.just("").subscribe(new Observer());

    Observable的非静态订阅方法subscribe实际上是调用了subscribeActual(Observer s)

    发射数据

    Cold Observable的subscribeActual(Observer s)方法实际上也是数据流的发送方法。在源码中会发现,在Observer订阅当前Cold Observable之后便立即开始了数据发射方法,以fromArray举例:

    @Override public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { return; } d.run(); }

    先创建一个FromArrayDisposable对象对需要发射的数据流进行封装,在执行订阅s.onSubscribe(d);后立即开始了d.run();数据流的发射操作。


    ConnectableObservable是Hot Observable,不会主动发射数据流。 Observable的publish操作符在创建ConnectableObservable是实际上是创建了一个ObservablePublish对象,ObservablePublish继承了抽象类ConnectableObservable, ConnectableObservable也是Observable的子类,ObservablePublish在实现subscribeActual(Observer s)方法的时候只进行了Observer对Observable的订阅,而没有立刻发射数据流。同时实现了ConnectableObservable的connect方法,在connect方法中才开始进行数据流的发送。 这也就是之前提到的,需要在使用publish操作符将Cold Observable转为Hot Observable需要调用connect才能发送数据流。


    Hot Observable转为Cold Observable,关键点在ConnectableObservable的refCount操作符会调用RxJavaPlugins.onAssembly()来创建一个继承了Observable的ObservableRefCount对象,ObservableRefCount的构造器接受ConnectableObservable对象,ObservableRefCount在实现subscribeActual(Observer s)方法的时候调用了ConnectableObservable的connect方法,以此来达到Hot Observable转为Cold Observable的目的。


    总结:RxJava各种操作符的实现,实际上是装饰模式的各种妙用

    线程切换

    //常用的线程策略 Schedulers.immediate()//在当前线程运行,默认为此策略; Schedulers.newThread()//每次都创建新的线程执行操作; Schedulers.io()//类似newThread()但是此策略有无限量的线程池,主要用于读写文件、数据库、网络请求等; Schedulers.computation()//用于需要计算的策略,使用线程池,池大小为CPU核心数; Schedulers.trampoline()//将任务加入一个队列,等待执行 AndroidSchedulers.mainThread()//在Android主线程中执行, RxAndroid独有

    使用Rxjava可以非常方便指定订阅者对执行线程。例:

    Flowable<String> flowableJust = Flowable.just("test Just Flowable"); Disposable disposable = flowableJust .map(new Function<String, String>() { public String apply(String s) throws Exception { System.out.println("map1 thread = " + Thread.currentThread().getName()); return s + "mp1"; } }) .map(new Function<String, String>() { public String apply(String s) throws Exception { System.out.println("map2 thread = " + Thread.currentThread().getName()); return s + "mp2"; } }) .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.io()) .map(new Function<String, String>() { public String apply(String s) throws Exception { System.out.println("map3 thread = " + Thread.currentThread().getName()); return s + "mp3"; } }) .map(new Function<String, String>() { public String apply(String s) throws Exception { System.out.println("map4 thread = " + Thread.currentThread().getName()); return s + "mp4"; } }) .subscribe(new Consumer<String>() { public void accept(String s) throws Exception { System.out.println("onNext Just s = " + Thread.currentThread().getName()); } }, new Consumer<Throwable>() { public void accept(Throwable throwable) throws Exception { System.out.println("onError Just throwable = " + throwable.getMessage()); } }, new Action() { public void run() throws Exception { System.out.println("onComplete Just"); } }, new Consumer<Subscription>() { public void accept(Subscription subscription) throws Exception { subscription.request(Long.MAX_VALUE); System.out.println("onSubscribe Just subscription = " + Thread.currentThread().getName()); } });

    运行结果:

    onSubscribe Just subscription = main map1 thread = RxComputationThreadPool-1 map2 thread = RxComputationThreadPool-1 map3 thread = RxCachedThreadScheduler-1 map4 thread = RxCachedThreadScheduler-1 onNext Just s = RxCachedThreadScheduler-1

    subscribeOn操作符改变调用它之前代码的线程;observeOn操作符改变调用它之后代码的线程。

    除了订阅操作本身在主线程中运行,其他操作都在subscribeOn与observeOn两个操作符指定的线程中运行。

    Schedulers是创建Scheduler的工厂, 提供了若干静态方法用来创建各种Scheduler;

    Scheduler提供创建Workder的接口;

    Worker提供了几种执行任务的接口,用来执行任务, 它底下利用各种类型的线程或者线程池完成任务的执行,它是真正执行任务的地方;

    同一类型的Scheduler只有一个,但是对应的worker是不同的.比如Schedulers.computation, 对应的Scheduler只有一个,但是每次调用createWorker,获得的worker是scheduler里面worker数组中的一个(数组数目和处理器的数目相同)

    转载请注明原文地址: https://ju.6miu.com/read-33085.html

    最新回复(0)