Observable.OnSubscribe 的理解

    xiaoxiao2021-08-21  112

    -- “通过OnSubscribe的源码的注释 :意思是 当Observable被订阅(subscribe) OnSubscribe接口的call方法会被执行。

    --subscriber.onNext(""+i) 这里的i转换为String 后作为参数传递为subscribe(new Action1<String>))中的String

    create操作符的基本使用

    顾名思义,Create操作符是用来创建一个Observable的。下面来看一个简单的代码段:

    Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //Emit Data } })

    create方法接收一个参数Observable.OnSubscribe 来看下它的源码:

    /** * Invoked when Observable.subscribe is called. */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity }

    Observable.OnSubscribe 说白了就是一个继承了Action1接口的接口:

    public interface Action1<T> extends Action { void call(T t); } /** * All Action interfaces extend from this. * <p> * Marker interface to allow instanceof checks. */ public interface Action extends Function { } /** * All Func and Action interfaces extend from this. * <p> * Marker interface to allow instanceof checks. */ public interface Function { }

    它们的继承关系如下: Observable.OnSubscribe <- Action1 <- Action <- Function

    create()方法也就是个工厂方法:

    public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); }

    通过OnSubscribe的源码的注释 Invoked when Observable.subscribe is called. 意思是 当Observable被订阅(subscribe) OnSubscribe接口的call方法会被执行。

    知道如何创建(create)Observable, 接下来我们看下如何订阅它:

    Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 5; i++) { printLog(tvLogs, "Emit Data:", i + ""); subscriber.onNext("" + i); } } }) .subscribe(new Action1<String>() { @Override public void call(String s) { //showToast(s); printLog(tvLogs, "Consume Data:", s); } });

    当调用了subscribe方法 Observable.OnSubscribe的call方法就会被执行,在Observable.OnSubscribe的call方法中循环了调用了5次subscriber.onNext,在subscribe的Action1回调就会接受5次回调。

    Emit Data:'0' , Thread Name:RxCachedThreadScheduler-1 Emit Data:'1' , Thread Name:RxCachedThreadScheduler-1 Emit Data:'2' , Thread Name:RxCachedThreadScheduler-1 Emit Data:'3' , Thread Name:RxCachedThreadScheduler-1 Emit Data:'4' , Thread Name:RxCachedThreadScheduler-1 Consume Data:'0' , Thread Name:main Consume Data:'1' , Thread Name:main Consume Data:'2' , Thread Name:main Consume Data:'3' , Thread Name:main Consume Data:'4' , Thread Name:main

    从输出的日志可以看到,我们还打印了Thread Name线程的名称,我们可以控制发送数据、消费数据所在的线程。

    .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io())

    subscribeOn 设置Observable的call方法所在的线程 【生产数据】

    observeOn 设置subscribe的call方法所在的线程【消费数据】

    转载请注明原文地址: https://ju.6miu.com/read-676794.html

    最新回复(0)