RxBus是一个基于RxJava和RxAndroid的事件通讯工具
首先需要导入RxJava,RxAndroid库
compile 'io.reactivex:rxandroid:1.2.1' compile 'io.reactivex:rxjava:1.1.6'RxBus基本代码
public class RxBus { private static volatile RxBus defaultInstance; private Subject<Object,Object> bus; public RxBus(){ bus=new SerializedSubject<>(PublishSubject.create()); } //单例 public static RxBus getInstance(){ if(defaultInstance==null){ synchronized(RxBus.class){ if(defaultInstance==null){ defaultInstance=new RxBus(); } } } return defaultInstance; } //发送一个事件 public void post(Object o){ bus.onNext(o); } public <T> Observable<T> toObservable(Class<T> eventType){ return bus.ofType(eventType); } }使用
1. 发出事件
Student s=new Student(); s.setName("sss"); RxBus.getInstance().post(s);2.消费事件
subscribe = RxBus.getInstance().toObservable(Student.class) //在io线程进行订阅,可以执行一些耗时操作 .subscribeOn(Schedulers.io()) //在主线程进行观察,可做UI更新操作 .observeOn(AndroidSchedulers.mainThread()) //观察的对象 .subscribe(new Action1<Student>() { @Override public void call(Student s) { Toast.makeText(MainActivity.this, s.getName(), Toast.LENGTH_SHORT).show(); } });3.在onDestory中移除订阅
@Override protected void onDestroy() { super.onDestroy(); if(!subscribe.isUnsubscribed()) { subscribe.unsubscribe(); } }与EventBus相同RxBus也可以使用黏性事件 修改过后的支持黏性事件的RxBus
public class RxBus { private static volatile RxBus defaultInstance; private final Map<Class<?>,Object> mStickyEventMap; private Subject<Object,Object> bus; public RxBus(){ bus=new SerializedSubject<>(PublishSubject.create()); mStickyEventMap=new ConcurrentHashMap<>(); } //单例 public static RxBus getInstance(){ if(defaultInstance==null){ synchronized(RxBus.class){ if(defaultInstance==null){ defaultInstance=new RxBus(); } } } return defaultInstance; } /** * 发送一个事件 * @param o */ public void post(Object o){ bus.onNext(o); } public <T> Observable<T> toObservable(Class<T> eventType){ return bus.ofType(eventType); } //sticky事件 /** * 发送sticky事件 * @param event */ public void postSticky(Object event){ synchronized (mStickyEventMap){ mStickyEventMap.put(event.getClass(),event); } post(event); } public <T> Observable<T> toObservableSticky(final Class<T> eventType){ synchronized (mStickyEventMap){ Observable<T> observable = bus.ofType(eventType); final Object event=mStickyEventMap.get(eventType); if(event!=null){ return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { subscriber.onNext(eventType.cast(event)); } })); }else{ return observable; } } } /** * 获取sticky事件 * @param eventType * @param <T> * @return */ public <T> T getStickyEvent(Class<T> eventType){ synchronized(mStickyEventMap){ return eventType.cast(mStickyEventMap.get(eventType)); } } /** * 移除指定的sticky事件 * @param eventType * @param <T> * @return */ public <T> T removeStickyEvent(Class<T> eventType){ synchronized (mStickyEventMap){ return eventType.cast(mStickyEventMap.remove(eventType)); } } /** * 移除所有sticky事件 */ public void removeAllStickyEvents(){ synchronized (mStickyEventMap){ mStickyEventMap.clear(); } } }要发送黏性事件需要使用postSticky()发送事件使用toObservableSticky接受事件,与其他使用与普通事件相同
由于黏性事件保存在RxBus中,当不需要使用时除了要subscribe.unsubscribe()取消订阅还需要 RxBus.getInstance().removeStickyEvent(Class);移除粘性事件
关于RxJava中的异常 当RxJava中发生异常时会调用onError方法,并且结束整个订阅事件,这会导致之后发出的事件不能接受的问题,需要用try…catch…捕捉异常
参考博客: 用RxJava实现事件总线(Event Bus) [深入RxBus]:支持Sticky事件 [深入RxBus]:异常处理