RxJava: Reactive Extensions for the JVM

    xiaoxiao2021-03-26  33

    RxJava是Reactive Extensions的Java VM实现:用于通过使用可观察序列来编译异步和基于事件的程序的库。 它扩展了观察者模式以支持数据/事件序列,并添加运算符,允许您以声明方式组合序列,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。

    入门

    第一步是将RxJava 2包含到您的项目中,例如,作为Gradle编译依赖关系: compile "io.reactivex.rxjava2:rxjava:2.x.y" 二是编写Hello World程序: package rxjava.examples; import io.reactivex.*; public class HelloWorld { public static void main(String[] args) { Flowable.just("Hello world").subscribe(System.out::println); } } 如果您的平台不支持Java 8 lambdas(尚未),则必须手动创建Consumer的内部类: Flowable.just("Hello world") .subscribe(new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } ); RxJava 2具有几个基类,您可以发现运算符: io.reactivex.Flowable : 0..N flows, supporting Reactive-Streams and backpressure io.reactivex.Observable: 0..N flows, no backpressure io.reactivex.Single: a flow of exactly 1 item or an error io.reactivex.Completable: a flow without items but only a completion or error signal io.reactivex.Maybe: a flow with no items, exactly one item or an error RxJava的一个常见用例是在后台线程上运行一些计算,网络请求,并在UI线程上显示结果(或错误): Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "Done"; }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.single()) .subscribe(System.out::println, Throwable::printStackTrace); Thread.sleep(2000); // <--- wait for the flow to finish 这种类型的链接方法称为流利的API,类似于构建器模式。 然而,RxJava的反应类型是不可变的; 每个方法调用返回一个新的具有添加行为的Flowable。 为了说明,该示例可以重写如下: Flowable<String> source = Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "Done"; }); Flowable<String> runBackground = source.subscribeOn(Schedulers.io()); Flowable<String> showForeground = runBackground.observeOn(Schedulers.single()); showForeground.subscribe(System.out::println, Throwable::printStackTrace); Thread.sleep(2000); 通常,您可以通过subscribeOn将计算或阻止IO移动到其他线程。 一旦数据准备就绪,您可以确保它们在前台或GUI线程上通过observeOn处理。 RxJava操作符不直接工作线程或执行服务器,而是使用所谓的调度器抽象出统一API后面的并发源。 RxJava 2具有多个可通过Schedulers实用程序类访问的标准调度程序。 这些在所有JVM平台上都可用,但是某些特定平台(如Android)有自己的典型调度器定义:AndroidSchedulers.mainThread(),SwingScheduler.instance()或JavaFXSchedulers.gui()。 Thread.sleep(2000); 到底是没有意外。 在RxJava中,默认调度程序在守护线程上运行,这意味着一旦Java主线程退出,它们都会停止,并且后台计算可能永远不会发生。 在这个例子中,休眠一段时间,让我们看看控制台上的流的输出,有时间。 RxJava中的流在本质上是顺序的,分成可以彼此并行运行的处理阶段: Flowable.range(1, 10) .observeOn(Schedulers.computation()) .map(v -> v * v) .blockingSubscribe(System.out::println); 此示例流在计算调度程序上将1到10的数字进行平方,并在“主”线程(更准确地说是blockingSubscribe的调用程序线程)上使用结果。 然而,对于该流程,lambda v→v * v不并行运行; 它接收相同计算线程上的值1到10。 并行处理数字1到10: Flowable.range(1, 10) .flatMap(v -> Flowable.just(v) .subscribeOn(Schedulers.computation()) .map(w -> w * w) ) .blockingSubscribe(System.out::println); 实际上,RxJava中的并行性意味着运行独立流并将其结果合并回单个流。 运算符flatMap通过首先将从1到10的每个数字映射到其自己的单独的Flowable中,运行它们并且合并计算的平方。 从2.0.5开始,有一个实验运算符parallel()和类型ParallelFlowable,有助于实现相同的并行处理模式: Flowable.range(1, 10) .parallel() .runOn(Schedulers.computation()) .map(v -> v * v) .sequential() .blockingSubscribe(System.out::println); flatMap是一个强大的运算符,在很多情况下都有帮助。 例如,给定返回Flowable的服务,我们想使用第一个服务发出的值调用另一个服务: Flowable<Inventory> inventorySource = warehouse.getInventoryAsync(); inventorySource.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId()) .map(demand -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand)); ) .subscribe(); 但是,请注意,flatMap不保证任何顺序,内部流的最终结果可能会交织。 还有其他操作符: concatMap,用于映射和运行一个内部流 concatMap Eager,它“一次”运行所有内部流,但输出流将按照创建内部流的顺序。
    转载请注明原文地址: https://ju.6miu.com/read-663638.html

    最新回复(0)