Delay
Delay操作符是延时数据的发射。
原理图如下:
Delay操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread());
}
运行代码,结果如下:
DelaySubscription
DelaySubscription操作符是延时订阅Observable。
原理图如下:
DelaySubscription操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(1, 2, 3)
.delaySubscription(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread());
}
运行代码,结果如下:
DoOnEach
DoOnEach操作符为Observable设置一个回调,每当Observable发射数据时,都会触发这个回调,类似于观察者模式。
原理图如下:
DoOnEach操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = createObservable(5).doOnEach(new Action1<Notification<? super Integer>>() {
@Override
public void call(Notification<? super Integer> notification) {
String message = "value = " + notification.getValue() + " kind = " + notification.getKind();
displayLogcat(message);
}
});
}
private Observable createObservable(final int index) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < index; i++) {
subscriber.onNext(i);
if (i == index - 1) {
subscriber.onError(new Throwable("error"));
}
}
}
});
}
运行代码,结果如下:
DoNext
doOnNext操作符类似于doOnEach(Action1),但是它的Action不是接受一个Notification参数,而是接受发射的数据项。
原理图如下:
DoNext操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = createObservable(5).doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
displayLogcat("integer = " + integer);
}
});
}
private Observable createObservable(final int index) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < index; i++) {
subscriber.onNext(i);
if (i == index - 1) {
subscriber.onError(new Throwable("error"));
}
}
}
});
}
运行代码,结果如下:
DoOnSubscribe
doOnSubscribe操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。
原理图如下:
DoOnSubscribe操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(1, 2, 3).doOnSubscribe(new Action0() {
@Override
public void call() {
displayLogcat("doOnSubscribe");
}
});
}
运行代码,结果如下:
DoOnUnSubscribe
doOnUnsubscribe操作符注册一个动作,当观察者取消订阅它生成的Observable它就会被调用。
原理图如下:
DoOnUnSubscribe操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(1, 2, 3).doOnUnsubscribe(new Action0() {
@Override
public void call() {
displayLogcat("doOnUnsubscribe");
}
});
}
运行代码,结果如下:
DoOnCompleted
doOnCompleted操作符注册一个动作,当它产生的Observable正常终止调用onCompleted时会被调用。
原理图如下:
DoOnCompleted操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(1, 2, 3).doOnCompleted(new Action0() {
@Override
public void call() {
displayLogcat("doOnCompleted");
}
});
}
运行代码,结果如下:
DoOnError
doOnError操作符注册一个动作,当它产生的Observable异常终止调用onError时会被调用。
原理图如下:
DoOnError操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = createObservable(5).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
displayLogcat("doOnError throwable = " + throwable);
}
});
}
private Observable createObservable(final int index) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < index; i++) {
subscriber.onNext(i);
if (i == index - 1) {
subscriber.onError(new Throwable("error"));
}
}
}
});
}
运行代码,结果如下:
DoOnTerminate
doOnTerminate操作符注册一个动作,当它产生的Observable终止之前会被调用,无论是正常还是异常终止。
原理图如下:
DoOnTerminate操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(1, 2, 3).doOnTerminate(new Action0() {
@Override
public void call() {
displayLogcat("doOnTerminate");
}
});
}
运行代码,结果如下:
FinallyDo
finallyDo操作符注册一个动作,当它产生的Observable终止之后会被调用,无论是正常还是异常终止。
原理图如下:
FinallyDo操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(1, 2, 3).finallyDo(new Action0() {
@Override
public void call() {
displayLogcat("finallyDo");
}
});
}
运行代码,结果如下:
Materialize
Materialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来。
原理图如下:
Materialize操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(1, 2, 3).materialize();
}
运行代码,结果如下:
Dematerialize
Dematerialize操作符跟Materialize操作符正好相反。
原理图如下:
Dematerialize操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.just(Notification.createOnNext(1),
Notification.createOnNext(2),
Notification.createOnNext(3),
Notification.<Integer>createOnCompleted()).dematerialize();
}
运行代码,结果如下:
SubscribeOn
SubscribeOn操作符用来决定Observable在哪个线程上运行。
原理图如下:
SubscribeOn操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
}
运行代码,结果如下:
ObserverOn
ObserverOn操作符用来决定观察者运行在哪个线程上。
原理图如下:
ObserverOn操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.timer(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread());
}
运行代码,结果如下:
TimeInterval
TimeInterval操作符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔。
原理图如下:
TimeInterval操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).timeInterval().observeOn(AndroidSchedulers.mainThread());
}
运行代码,结果如下:
Timestamp
TimeStamp操作符会对数据封装为Timestamped对象,该对象包含发射出去的数据和发射该数据时的时间戳。
原理图如下:
Timestamp操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).timestamp().observeOn(AndroidSchedulers.mainThread());
}
运行代码,结果如下:
Timeout
TimeOut操作符是如果Observable是timeout指定的时间内,没发射出数据,timeout则会终止这次Observable的发射,并自己发射一个数据;反之发射Observable的数据。
原理图如下:
Timeout操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(123);
}
}).timeout(3, TimeUnit.SECONDS, Observable.just(456))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
运行代码,结果如下:
Using
Using操作符是创建一个资源,让Observable引用这个资源,在Observable用完之后对资源进行回收。
原理图如下:
Using操作符使用如下:
@Override
protected void createObservable() {
super.createObservable();
mObservable = Observable.using(new Func0<Integer>() {
@Override
public Integer call() {
return 1;
}
}, new Func1<Integer, Observable<Integer>>() {
@Override
public Observable call(Integer o) {
displayLogcat("Func1 call o = " + o);
return Observable.just(o);
}
}, new Action1<Integer>() {
@Override
public void call(Integer o) {
displayLogcat("Action1 call o = " + o);
unsubscribe();
}
});
}
运行代码,结果如下:
接下来就是鸣谢了,非常感谢以下两位博主,相关链接如下:
https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Observable-Utility-Operators.html
http://mushuichuan.com/2015/12/11/rxjava-operator-6/
github地址