Background

In my Android app, I used the following typical Listener pattern to monitor updates from models:

public interface Listener {
    void onUpdate(List<Item> list);
}

public interface Model {
    void addListener(Listener l);
    void removeListener(Listener l);
}

Listeners are implemented by several Fragments that expect update notifications coming from a global model object. The updating may be triggered manually by user or periodically by a scheduler.

After learning RxJava, I would like to simplify the model interface and at the same time remove the boilerplate of maintaining listener list in model implementation:

public interface Model {
    Observable<List<Item> getItems();
}

Dealing with Different Life Cycles

Note that Listeners (i.e. Fragments) are registered to model during Fragment.onResume and unregistered during Fragment.onPause but Observable are typically designed to be created for each update. That means I need a middleman that can:

  • Subscribe to new Observable for each update
  • Manage listener list
  • Notify listener when source Observable has completed its emission

Subject to Rescue Us

The middleman can be implemented by deriving from Subject in RxJava:

A Subject is a sort of bridge or proxy that acts both as an Subscriber and as an Observable. Because it is a Subscriber, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

What I need is a Subject that can consume T and emit List<T>. Therefore Fragments can subscribe to this Subject once and this Subject subscribes to newly created source Observable for each update.

private static <T> Subject<T, List<T>> createAggregateSubject() {
    final List<Subscriber<? super List<T>>> observers = new LinkedList<>();
    final Observable.OnSubscribe<List<T>> onSubscribe = o -> {
        o.onStart();
        observers.add(o);
    };
    return new Subject<T, List<T>>(onSubscribe) {
        private List<T> list = new LinkedList<>();

        @Override
        public boolean hasObservers() {
            return !observers.isEmpty();
        }

        @Override
        public void onCompleted() {
            Iterables.removeIf(observers, Subscriber::isUnsubscribed);

            for (Subscriber<? super List<T>> o: observers) {
                o.onNext(list);
            }
            list.clear();
        }

        @Override
        public void onError(Throwable e) {
            for (Subscriber<? super List<T>> o: observers) {
                o.onError(e);
            }
        }

        @Override
        public void onNext(T t) {
            list.add(t);
        }
    };
}

Reference

To know more about how to use RxJava on Android, you can refer to Learning RxJava for Android by example for more creative usage.