RxJava学习之转换型操作符
标签(空格分隔): RX系列
转换型操作符
下面展示了可用于Observable发射的数据执行变换操作的各种操作符
map()—对序列的每一项都应用一个函数来变换Observable发射的数据序列flatMap()、concatMap()、flatMapIterable()—将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的ObservableswitchMap()—将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据scan()—对Observable发射的每一项数据应用一个函数,然后按照顺序依次发射每一个值groupyBy()—将Observable分拆为Observable集合,将原始的Observable发射的数据按照key分组,每一个Observable发射一组不同的数据buffer()—它顶起从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射window()—定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项cast()—在发射之前强制将Observable发射的所有哦数据转换为指定数据类型
map操作符
对Observable发射的每一项数据应用一个函数,执行变换操作 Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。
RxJava将这个操作符实现为map函数。这个操作符默认不在任何特定的调度器上执行。
Map操作符的源码
/**
* Returns an Observable that applies a specified function to each item emitted by the source Observable and
* emits the results of these function applications.
*
<p>
*
<img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png" alt="">
*
<dl>
*
<dt><b>Scheduler:
</b></dt>
*
<dd>{@code map} does not operate by default on a particular {@link Scheduler}.
</dd>
*
</dl>
*
* @param func
* a function to apply to each item emitted by the Observable
* @return an Observable that emits the items from the source Observable, transformed by the specified
* function
* @see
<a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map
</a>
*/
public final
<R> Observable
<R> map(Func1
<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public interface Func1<T, R> extends Function {
R call(T t);
}
map例子
static List<Student> studentList =
new ArrayList<Student>(){
{
add(
new Student(
"FAALLDA",
28));
add(
new Student(
"小弟弟",
23));
add(
new Student(
"妻子的孜孜",
25));
}
};
/**
* Map
*
* 通过使用map中的方法对Observable中发射出来的所有数据进行变换
*
* test1()方法是得到多个Student对象中的name,保存到nameList中
* 注意:接口Func1包装的是有返回值的方法。
*
*/
private static void test1(){
List<String> nameList =
new ArrayList<>();
Observable.from(studentList)
.map(
new Func1<Student, String>() {
@Override
public String
call(Student student) {
return student.name;
}
})
.subscribe(
new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println(
"onCompleted nameList.size() = " + nameList.size());
}
@Override
public void onNext(String value) {
System.out.println(
"onSuccess value = " + value);
nameList.add(value);
}
@Override
public void onError(Throwable error) {
System.out.println(
"onError error = " + error);
}
});
}
Map操作符连续使用
/**
* Map操作符连续使用
*/
private static void test(){
Observable.from(studentList)
.map(
new Func1<Student, Integer>() {
@Override
public Integer
call(Student student) {
return student.age;
}
})
.map(
new Func1<Integer, String>() {
@Override
public String
call(Integer t) {
return String.valueOf(t+
10);
}
})
.subscribe(
new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println(
"onCompleted ");
}
@Override
public void onNext(String value) {
System.out.println(
"onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println(
"onError error = " + error);
}
});
}
Flatmap操作符
FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。
注意:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
/**
* FlatMap操作符
* FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
*/
private static void test4(){
List<String> nameList =
new ArrayList<>();
Observable.from(studentList)
.flatMap(
new Func1<Student, Observable<Entity>>() {
@Override
public Observable<Entity>
call(Student student) {
Course course = couseMap.get(student.name);
Entity entity =
new Entity(course, student);
return Observable.just(entity);
}
})
.subscribe(
new Subscriber<Entity>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Entity entity) {
System.out.println(
"onSuccess entity = " + entity);
}
@Override
public void onError(Throwable error) {
System.out.println(
"onError error = " + error);
}
});
}
concatMap操作符
还有一个concatMap操作符,它类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
/**
* ConcatMap操作符
* 类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
*
*/
private static void test5(){
Observable.from(studentList)
.concatMap(
new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course>
call(Student t) {
Course course = couseMap.get(t.name);
return Observable.just(course);
}
})
.subscribe(
new Subscriber<Course>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Course course) {
System.out.println(
"onSuccess course = " + course);
}
@Override
public void onError(Throwable error) {
System.out.println(
"onError error = " + error);
}
});
}
concatMap和flatMap的区别
/**
* flatMap与ConcatMap操作符比较
* 区别:
* 无序:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
* 有序:ConcatMap不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据。
*
* 说明:在同步线程中,FlatMap和ConcactMap的执行结果是一样的(结果是有序的),
* 只有在异步线程中,FlatMap结果可能是无序的,而ConcactMap始终能保持有序的结果。
*
* concatMap与flatMap操作符的比较
*/
private static void test(){
List<Integer> numbers = Arrays.asList(
2,
3,
4,
5,
6,
7,
8,
9,
10);
Observable.from(numbers)
.flatMap(
new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer>
call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
}
})
.subscribe(
new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println(
"flatMap onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println(
"onError error = " + error);
}
});
System.out.println(
"----------------------------");
Observable.from(numbers)
.concatMap(
new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer>
call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
}
})
.subscribe(
new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println(
"concatMap onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println(
"onError error = " + error);
}
});
}
switchMap操作符
它和flatMap很像,除了一点:当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个
/**
* switchMap
* 解释:将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
* 用法与FlatMap几乎一样,区别是SwitchMap操作符只会发射[emit]最近的Observables。
*
* 当源Observable发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项.
*
* 应用场景:http://blog.csdn.net/jdsjlzx/article/details/51730162
*
* 逻辑推演:
* A --> 取消空的,没有可以取消的
* B--> A1被取消
* C--> B1被取消
* D--> C1被取消
* E--> D1被取消
* 最终输出E1
*/
private static void test7(){
Observable.just(
"A",
"B",
"C",
"D",
"E")
.switchMap(
new Func1<String, Observable<String>>() {
@Override
public Observable<String>
call(String s) {
return Observable.just(s+
"1").subscribeOn(Schedulers.newThread());
}
})
.subscribe(
new Observer<String>() {
@Override
public void onCompleted() {
System.out.println(
"switchMap onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(
"switchMap onError :" + e);
}
@Override
public void onNext(String s) {
System.out.println(
"switchMap Next :" + s);
}
});
try {
Thread.sleep(
1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
switchMap与flatmap的区别
/**
* switchMap与flatmap的区别
*
* 说明:在同步线程中,switchMap发射[emit]所有的Observables,
* 在异步线程中,switchMap只会发射[emit]最近的Observables。
*
*/
private static void test8(){
ExecutorService service = Executors.newFixedThreadPool(
10);
List<Integer> numbers = Arrays.asList(
2,
3,
4,
5,
6,
7,
8,
9,
10);
Observable.from(numbers)
.flatMap(
new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer>
call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(service));
}
})
.subscribe(
new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println(
"flatMap onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println(
"onError error = " + error);
}
});
System.out.println(
"----------------------------------");
Observable.from(numbers)
.switchMap(
new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer>
call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(service));
}
})
.subscribe(
new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println(
"switchMap2 onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println(
"onError error = " + error);
}
});
service.shutdown();
}