Slient Blog

RxJava RxBus进阶(Sticky事件,异常拦截)

2017-04-09

感谢yokey 本文只是摘要笔记

在使用之前写的RxBus的时候会出现一些问题

  • 需要RxBus支持Sticky功能
  • 在对每个事件的处理时候发生异常,后续的的事件都接受不了

什么是Sticky事件

在Android开发中,Sticky事件只指事件消费者在事件发布之后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常情况下如果发送者发送了某个广播,而接收者在这个广播发送后才注册自己的Receiver,这时接收者便无法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就可以接收到刚才已经发布的广播。这就使得我们可以预先处理一些事件,让有消费者时再把这些事件投递给消费者



比如说之前的RxBus,我想在Activity1中post一条消息,希望在Activity3中去处理它,然而我们的订阅事件发生在Activity3中,他只能处理订阅关系发生之后的Observer,这个时候就不能体现它像EventBus一样的事件总线的作用

使用Map实现Sticky

Map初始化

1
2
3
4
5
6
private final Map<Class<?>, Object> mStickyEventMap;
private RxBus() {
mBus = new SerializedSubject<>(PublishSubject.create());
mStickyEventMap = new ConcurrentHashMap<>();
}

在我们postSticky(Event)时,存入Map中

1
2
3
4
5
6
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}

订阅时toObservableSticky(Class eventType),先从Map中寻找是否包含该类型的事件,如果没有,则说明没有Sticky事件要发送,直接订阅Subject(此时作为被观察者Observable);如果有,则说明有Sticky事件需要发送,订阅merge(Subject 和 Sticky事件)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}

注意

在使用Sticky特性时,在不需要某Sticky事件时, 通过removeStickyEvent(Class eventType)移除它,最保险的做法是:在主Activity的onDestroy里removeAllStickyEvents()。
因为我们的RxBus是个单例静态对象,再正常退出app时,该对象依然会存在于JVM,除非进程被杀死,这样的话导致StickyMap里的数据依然存在,为了避免该问题,需要在app退出时,清理StickyMap。

1
2
3
4
5
6
7
// 主Activity(一般是栈底Activity)
@Override
protected void onDestroy() {
super.onDestroy();
// 移除所有Sticky事件
RxBus.getDefault().removeAllStickyEvents();
}

总的代码RxBus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class RxBus {
private final Subject<Object, Object> _bus;
private final Map<Class<?>, Object> mStickyEventMap;
private RxBus() {
_bus = new SerializedSubject<>(PublishSubject.create());
mStickyEventMap = new ConcurrentHashMap<>();
}
private static class RxBusHolder {
private static final RxBus INSTANCE = new RxBus();
}
public static RxBus getInstance() {
return RxBusHolder.INSTANCE;
}
public void post(Object o) {
_bus.onNext(o);
}
/**
*
* @param eventType
* @param <T>根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
* @return
*/
public <T> Observable<T> toObserverable(Class<T> eventType) {
return _bus.ofType(eventType);
}
/**
* 发送一个新Sticky事件
*/
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = _bus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
//如果之前发生了postSticky事件,则会在订阅事件发生后,发送这个事件
return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
//这里是一个冷热结合的Observable
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.onNext(eventType.cast(event));
}
}));
} else {
//如果之前并没有post事件的发生,和普通toObservable一样
return observable;
}
}
}
/**
* 根据eventType获取Sticky事件
*/
public <T> T getStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.get(eventType));
}
}
/**
* 移除指定eventType的Sticky事件
*/
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.remove(eventType));
}
}
/**
* 移除所有的Sticky事件
*/
public void removeAllStickyEvents() {
synchronized (mStickyEventMap) {
mStickyEventMap.clear();
}
}
}

异常处理

在使用RxBus过程中,你会发现你订阅了某个事件后,在后续接收到该事件时,如果处理的过程中发生了异常,你会发现后续的事件再也接收不到了,除非你重新订阅!

原因在于RxJava的事件序列机制,一个订阅事件是以onCompleted()或者onError()作为结束的,即:一旦订阅者的onCompleted()或onError()被调用,订阅者和被订阅者的订阅关系就解除了。

这里说下RxJava的异常传递机制:onError()在Observable序列传递过程中出现任何异常时被调用,然后终止Observable事件序列的传递,以此通知所有的订阅者发生了一个不可恢复的错误,即:异常总会传递到订阅者。

这本是RxJava的一个优点,反而在事件总线的场景下,成了让人头疼的问题!

所以我们的RxBus的订阅者在处理订阅事件时,一旦发生了异常,而又没Catch,那么最终都会调用到onError(),而一旦走到onError(),就意味着这个订阅者和该Subject解除了订阅关系,因此再也收不到后续发出的事件了~ 囧

我们用最传统的方式解决 –捕捉异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
xBus.getDefault().toObservableSticky(EventSticky.class) // 建议在Sticky时,在操作符内主动try,catch
.map(new Func1<EventSticky, EventSticky>() {
@Override
public EventSticky call(EventSticky eventSticky) {
try {
// 变换操作
} catch (Exception e) {
e.printStackTrace();
}
return eventSticky;
}
})
.subscribe(new Action1<EventSticky>() {
@Override
public void call(EventSticky eventSticky) {
try {
// 处理接收的事件
} catch (Exception e) {
e.printStackTrace();
}
}
});
Tags: Android
使用微信添加

若你觉得我的文章对你有帮助,请添加我为好友

扫描二维码,分享此文章