RxJava是Reactive Extensions的Java VM实现:用于通过使用可观察序列来编译异步和基于事件的程序的库。 它扩展了观察者模式以支持数据/事件序列,并添加运算符,允许您以声明方式组合序列,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。
入门
第一步是将RxJava 2包含到您的项目中,例如,作为Gradle编译依赖关系:
compile "io.reactivex.rxjava2:rxjava:2.x.y"
二是编写Hello World程序:
package rxjava.examples;
import io.reactivex.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
如果您的平台不支持Java 8 lambdas(尚未),则必须手动创建Consumer的内部类:
Flowable.just("Hello world")
.subscribe(new Consumer<String>() {
@Override public void accept(String s) {
System.out.println(s);
}
);
RxJava 2具有几个基类,您可以发现运算符:
io.reactivex.Flowable : 0..N flows, supporting Reactive-Streams and backpressure
io.reactivex.Observable: 0..N flows, no backpressure
io.reactivex.Single: a flow of exactly 1 item or an error
io.reactivex.Completable: a flow without items but only a completion or error signal
io.reactivex.Maybe: a flow with no items, exactly one item or an error
RxJava的一个常见用例是在后台线程上运行一些计算,网络请求,并在UI线程上显示结果(或错误):
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
这种类型的链接方法称为流利的API,类似于构建器模式。
然而,RxJava的反应类型是不可变的;
每个方法调用返回一个新的具有添加行为的Flowable。
为了说明,该示例可以重写如下:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常,您可以通过subscribeOn将计算或阻止IO移动到其他线程。
一旦数据准备就绪,您可以确保它们在前台或GUI线程上通过observeOn处理。
RxJava操作符不直接工作线程或执行服务器,而是使用所谓的调度器抽象出统一API后面的并发源。
RxJava 2具有多个可通过Schedulers实用程序类访问的标准调度程序。
这些在所有JVM平台上都可用,但是某些特定平台(如Android)有自己的典型调度器定义:AndroidSchedulers.mainThread(),SwingScheduler.instance()或JavaFXSchedulers.gui()。
Thread.sleep(2000);
到底是没有意外。
在RxJava中,默认调度程序在守护线程上运行,这意味着一旦Java主线程退出,它们都会停止,并且后台计算可能永远不会发生。
在这个例子中,休眠一段时间,让我们看看控制台上的流的输出,有时间。
RxJava中的流在本质上是顺序的,分成可以彼此并行运行的处理阶段:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
此示例流在计算调度程序上将1到10的数字进行平方,并在“主”线程(更准确地说是blockingSubscribe的调用程序线程)上使用结果。
然而,对于该流程,lambda v→v * v不并行运行;
它接收相同计算线程上的值1到10。
并行处理数字1到10:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
实际上,RxJava中的并行性意味着运行独立流并将其结果合并回单个流。
运算符flatMap通过首先将从1到10的每个数字映射到其自己的单独的Flowable中,运行它们并且合并计算的平方。
从2.0.5开始,有一个实验运算符parallel()和类型ParallelFlowable,有助于实现相同的并行处理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
flatMap是一个强大的运算符,在很多情况下都有帮助。
例如,给定返回Flowable的服务,我们想使用第一个服务发出的值调用另一个服务:
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand
-> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
)
.subscribe();
但是,请注意,flatMap不保证任何顺序,内部流的最终结果可能会交织。
还有其他操作符:
concatMap,用于映射和运行一个内部流
concatMap Eager,它“一次”运行所有内部流,但输出流将按照创建内部流的顺序。
转载请注明原文地址: https://ju.6miu.com/read-663638.html