Background
In my Android app, I used the following typical Listener pattern to monitor updates from models:
public interface Listener {
void onUpdate(List<Item> list);
}
public interface Model {
void addListener(Listener l);
void removeListener(Listener l);
}
Listeners are implemented by several Fragments that expect update notifications coming from a global model object. The updating may be triggered manually by user or periodically by a scheduler.
After learning RxJava, I would like to simplify the model interface and at the same time remove the boilerplate of maintaining listener list in model implementation:
public interface Model {
Observable<List<Item> getItems();
}
Dealing with Different Life Cycles
Note that Listeners (i.e. Fragments) are registered to model during Fragment.onResume
and unregistered during Fragment.onPause
but Observable
are typically designed to be created for each update. That means I need a middleman that can:
- Subscribe to new Observable for each update
- Manage listener list
- Notify listener when source Observable has completed its emission
Subject to Rescue Us
The middleman can be implemented by deriving from Subject
in RxJava:
A Subject is a sort of bridge or proxy that acts both as an Subscriber and as an Observable. Because it is a Subscriber, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
What I need is a Subject
that can consume T
and emit List<T>
. Therefore Fragments can subscribe to this Subject once and this Subject subscribes to newly created source Observable for each update.
private static <T> Subject<T, List<T>> createAggregateSubject() {
final List<Subscriber<? super List<T>>> observers = new LinkedList<>();
final Observable.OnSubscribe<List<T>> onSubscribe = o -> {
o.onStart();
observers.add(o);
};
return new Subject<T, List<T>>(onSubscribe) {
private List<T> list = new LinkedList<>();
@Override
public boolean hasObservers() {
return !observers.isEmpty();
}
@Override
public void onCompleted() {
Iterables.removeIf(observers, Subscriber::isUnsubscribed);
for (Subscriber<? super List<T>> o: observers) {
o.onNext(list);
}
list.clear();
}
@Override
public void onError(Throwable e) {
for (Subscriber<? super List<T>> o: observers) {
o.onError(e);
}
}
@Override
public void onNext(T t) {
list.add(t);
}
};
}
Reference
To know more about how to use RxJava on Android, you can refer to Learning RxJava for Android by example for more creative usage.