private static Subject> createAggregateSubject() {
final List>> observers = new LinkedList<>();
final Observable.OnSubscribe> onSubscribe = o -> {
o.onStart();
observers.add(o);
};
return new Subject>(onSubscribe) {
private List list = new LinkedList<>();
@Override
public boolean hasObservers() {
return !observers.isEmpty();
}
@Override
public void onCompleted() {
Iterables.removeIf(observers, Subscriber::isUnsubscribed);
for (Subscriber super List> o: observers) {
o.onNext(list);
}
list.clear();
}
@Override
public void onError(Throwable e) {
for (Subscriber super List> o: observers) {
o.onError(e);
}
}
@Override
public void onNext(T t) {
list.add(t);
}
};
}