PublishSubject.create()
.subscribe(
new Subscriber<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
});
Single
.just(
4)
.map(
new Func1
<Integer,
String>() {
@Override
public String call(
Integer integer) {
return String.valueOf(
integer);
}
})
.subscribe(
new SingleSubscriber
<String>() {
@Override
public void onSuccess(
String value) {
}
@Override
public void onError(Throwable error) {
}
});
/综合使用
final PublishSubject<String> mSearchResultsSubject = PublishSubject.create();
sub = mSearchResultsSubject
.debounce(
400, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.map(
new Func1<Object, Object>() {
@Override
public Object
call(Object o) {
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Observer<Object>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
});
/**
* 如果在doOnSubscribe()之后指定了subscribeOn(),它决定了doOnSubscribe()在哪种线程中执行。
* (1)doOnSubscribe()之前的subscribeOn()不会影响它。
* (2)doOnSubscribe()之后的subscribeOn(),且是最近的才会影响它。
* doOnSubscribe用于在call之前执行一些初始化操作
*/
Observable.create(
new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<?
super Integer> subscriber) {
subscriber.onNext(
2323);
}
}).subscribeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(
new Action0() {
@Override
public void call() {
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
转载请注明原文地址: https://ju.6miu.com/read-13651.html