自定义RxBus,RxManager with RxJava2

    xiaoxiao2021-03-25  65

    现在查到的关于RxBus, RxManager的封装大部分是基于RxJava1的,从RxJava1到RxJava2的变化很大,很难平滑地过度,所以自己根据RxJava2重新封装下RxBus和RxManager

    RxBus.java

    public class RxBus { private static RxBus instance; /** * ConcurrentHashMap: 线程安全集合 * Subject 同时充当了Observer和Observable的角色 */ @SuppressWarnings("rawtypes") private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>(); public static synchronized RxBus getInstance() { if(null == instance) { instance = new RxBus(); } return instance; } private RxBus() { } /** * 订阅事件源 * * @param observable * @param consumer * @return */ public RxBus onEvent(Observable<?> observable, Consumer<Object> consumer) { observable.observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { throwable.printStackTrace(); } }); return getInstance(); } /** * 注册事件源 * * @param tag key * @param <T> * @return */ @SuppressWarnings({"rawtypes"}) public <T> Observable<T> register(@NonNull Object tag) { List<Subject> subjectList = subjectMapper.get(tag); if(null == subjectList) { subjectList = new ArrayList<>(); subjectMapper.put(tag, subjectList); } Subject<T> subject = PublishSubject.create(); subjectList.add(subject); // LogUtil.log("register" + tag + " size:" + subjectList.size()); return subject; } /** * 取消整个tag的监听 * * @param tag key */ @SuppressWarnings("rawtypes") public void unregister(@NonNull Object tag) { List<Subject> subjectList = subjectMapper.get(tag); if(null != subjectList) { subjectMapper.remove(tag); } } /** * 取消tag里某个observable的监听 * * @param tag key * @param observable 要删除的observable * @return */ @SuppressWarnings("rawtypes") public RxBus unregister(@NonNull Object tag, @NonNull Observable<?> observable) { if(null == observable) { return getInstance(); } List<Subject> subjectList = subjectMapper.get(tag); if(null != subjectList) { // 从subjectList中删去observable subjectList.remove((Subject<?>) observable); // 若此时subjectList为空则从subjectMapper中删去 if(isEmpty(subjectList)) { subjectMapper.remove(tag); } } return getInstance(); } /** * 触发事件 * * @param content */ public void post(@NonNull Object content) { post(content.getClass().getName(), content); } /** * 触发事件 * * @param tag key * @param content */ @SuppressWarnings({"unchecked", "rawtypes"}) public void post(@NonNull Object tag, @NonNull Object content) { List<Subject> subjectList = subjectMapper.get(tag); if(!isEmpty(subjectList)) { for(Subject subject : subjectList) { subject.onNext(content); } } } /** * 判断集合是否为空 * * @param collection 集合 * @return */ @SuppressWarnings("rawtypes") public static boolean isEmpty(Collection<Subject> collection) { return null == collection || collection.isEmpty(); } } RxManager.java

    public class RxManager { public RxBus mRxBus = RxBus.getInstance(); /** * 管理观察源 */ private Map<String, Observable<?>> mObservableMap = new HashMap<>(); /** * 管理订阅者 */ private CompositeDisposable mCompositeSubscription = new CompositeDisposable(); public void on(String eventName, Consumer<Object> consumer) { // 注册 Observable<?> mObservable = mRxBus.register(eventName); mObservableMap.put(eventName, mObservable); mCompositeSubscription .add(mObservable.observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { throwable.printStackTrace(); } })); } /** * 添加订阅者到mCompositeSubscription * * @param m 要添加的订阅者 */ public void add(Disposable m) { mCompositeSubscription.add(m); } /** * 取消所有注册 */ public void clear() { // 取消订阅 mCompositeSubscription.dispose(); for(Map.Entry<String, Observable<?>> entry : mObservableMap.entrySet()) { // 取消注册 mRxBus.unregister(entry.getKey(), entry.getValue()); } } /** * 触发事件 * * @param tag * @param content */ public void post(Object tag, Object content) { mRxBus.post(tag, content); } }

    转载请注明原文地址: https://ju.6miu.com/read-40264.html

    最新回复(0)