RxJava操作符(6)-通用

    xiaoxiao2025-03-30  4

    Delay

    Delay操作符是延时数据的发射。

    原理图如下:

    Delay操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(1, 2, 3) .delay(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()); } 运行代码,结果如下:

    DelaySubscription

    DelaySubscription操作符是延时订阅Observable。

    原理图如下:

    DelaySubscription操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(1, 2, 3) .delaySubscription(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()); } 运行代码,结果如下:

    DoOnEach

    DoOnEach操作符为Observable设置一个回调,每当Observable发射数据时,都会触发这个回调,类似于观察者模式。

    原理图如下:

    DoOnEach操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = createObservable(5).doOnEach(new Action1<Notification<? super Integer>>() { @Override public void call(Notification<? super Integer> notification) { String message = "value = " + notification.getValue() + " kind = " + notification.getKind(); displayLogcat(message); } }); } private Observable createObservable(final int index) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < index; i++) { subscriber.onNext(i); if (i == index - 1) { subscriber.onError(new Throwable("error")); } } } }); } 运行代码,结果如下:

    DoNext

    doOnNext操作符类似于doOnEach(Action1),但是它的Action不是接受一个Notification参数,而是接受发射的数据项。

    原理图如下:

    DoNext操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = createObservable(5).doOnNext(new Action1<Integer>() { @Override public void call(Integer integer) { displayLogcat("integer = " + integer); } }); } private Observable createObservable(final int index) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < index; i++) { subscriber.onNext(i); if (i == index - 1) { subscriber.onError(new Throwable("error")); } } } }); } 运行代码,结果如下:

    DoOnSubscribe

    doOnSubscribe操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。

    原理图如下:

    DoOnSubscribe操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(1, 2, 3).doOnSubscribe(new Action0() { @Override public void call() { displayLogcat("doOnSubscribe"); } }); } 运行代码,结果如下:

    DoOnUnSubscribe

    doOnUnsubscribe操作符注册一个动作,当观察者取消订阅它生成的Observable它就会被调用。

    原理图如下:

    DoOnUnSubscribe操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(1, 2, 3).doOnUnsubscribe(new Action0() { @Override public void call() { displayLogcat("doOnUnsubscribe"); } }); } 运行代码,结果如下:

    DoOnCompleted

    doOnCompleted操作符注册一个动作,当它产生的Observable正常终止调用onCompleted时会被调用。

    原理图如下:

    DoOnCompleted操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(1, 2, 3).doOnCompleted(new Action0() { @Override public void call() { displayLogcat("doOnCompleted"); } }); } 运行代码,结果如下:

    DoOnError

    doOnError操作符注册一个动作,当它产生的Observable异常终止调用onError时会被调用。

    原理图如下:

    DoOnError操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = createObservable(5).doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { displayLogcat("doOnError throwable = " + throwable); } }); } private Observable createObservable(final int index) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < index; i++) { subscriber.onNext(i); if (i == index - 1) { subscriber.onError(new Throwable("error")); } } } }); } 运行代码,结果如下:

    DoOnTerminate

    doOnTerminate操作符注册一个动作,当它产生的Observable终止之前会被调用,无论是正常还是异常终止。

    原理图如下:

    DoOnTerminate操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(1, 2, 3).doOnTerminate(new Action0() { @Override public void call() { displayLogcat("doOnTerminate"); } }); } 运行代码,结果如下:

    FinallyDo

    finallyDo操作符注册一个动作,当它产生的Observable终止之后会被调用,无论是正常还是异常终止。

    原理图如下:

    FinallyDo操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(1, 2, 3).finallyDo(new Action0() { @Override public void call() { displayLogcat("finallyDo"); } }); } 运行代码,结果如下:

    Materialize

    Materialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来。

    原理图如下:

    Materialize操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(1, 2, 3).materialize(); } 运行代码,结果如下:

    Dematerialize

    Dematerialize操作符跟Materialize操作符正好相反。

    原理图如下:

    Dematerialize操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.just(Notification.createOnNext(1), Notification.createOnNext(2), Notification.createOnNext(3), Notification.<Integer>createOnCompleted()).dematerialize(); } 运行代码,结果如下:

    SubscribeOn

    SubscribeOn操作符用来决定Observable在哪个线程上运行。

    原理图如下:

    SubscribeOn操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()); } 运行代码,结果如下:

    ObserverOn

    ObserverOn操作符用来决定观察者运行在哪个线程上。

    原理图如下:

    ObserverOn操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.timer(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()); } 运行代码,结果如下:

    TimeInterval

    TimeInterval操作符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔。

    原理图如下:

    TimeInterval操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } subscriber.onCompleted(); } }).timeInterval().observeOn(AndroidSchedulers.mainThread()); } 运行代码,结果如下:

    Timestamp

    TimeStamp操作符会对数据封装为Timestamped对象,该对象包含发射出去的数据和发射该数据时的时间戳。

    原理图如下:

    Timestamp操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } subscriber.onCompleted(); } }).timestamp().observeOn(AndroidSchedulers.mainThread()); } 运行代码,结果如下:

    Timeout

    TimeOut操作符是如果Observable是timeout指定的时间内,没发射出数据,timeout则会终止这次Observable的发射,并自己发射一个数据;反之发射Observable的数据。

    原理图如下:

    Timeout操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } subscriber.onNext(123); } }).timeout(3, TimeUnit.SECONDS, Observable.just(456)) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()); } 运行代码,结果如下:

    Using

    Using操作符是创建一个资源,让Observable引用这个资源,在Observable用完之后对资源进行回收。

    原理图如下:

    Using操作符使用如下:

    @Override protected void createObservable() { super.createObservable(); mObservable = Observable.using(new Func0<Integer>() { @Override public Integer call() { return 1; } }, new Func1<Integer, Observable<Integer>>() { @Override public Observable call(Integer o) { displayLogcat("Func1 call o = " + o); return Observable.just(o); } }, new Action1<Integer>() { @Override public void call(Integer o) { displayLogcat("Action1 call o = " + o); unsubscribe(); } }); } 运行代码,结果如下: 接下来就是鸣谢了,非常感谢以下两位博主,相关链接如下: https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Observable-Utility-Operators.html http://mushuichuan.com/2015/12/11/rxjava-operator-6/ github地址
    转载请注明原文地址: https://ju.6miu.com/read-1297539.html
    最新回复(0)