private static <T> Subject<T, List<T>> createAggregateSubject() {
final List<Subscriber<? super List<T>>> observers = new LinkedList<>();
final Observable.OnSubscribe<List<T>> onSubscribe = o -> {
o.onStart();
observers.add(o);
};
return new Subject<T, List<T>>(onSubscribe) {
private List<T> list = new LinkedList<>();
@Override
public boolean hasObservers() {
return !observers.isEmpty();
}
@Override
public void onCompleted() {
Iterables.removeIf(observers, Subscriber::isUnsubscribed);
for (Subscriber<? super List<T>> o: observers) {
o.onNext(list);
}
list.clear();
}
@Override
public void onError(Throwable e) {
for (Subscriber<? super List<T>> o: observers) {
o.onError(e);
}
}
@Override
public void onNext(T t) {
list.add(t);
}
};
}