OnComplete никогда не вызывал с toSortedList () и groupBy ()

В настоящее время я использую библиотеку Android-ReactiveLocation ( Github ). LastKnownLocationObservable ( Code ) работает по назначению. Я использую flatMap для извлечения соседних станций из db и (из-за сферы) Я создаю модель из данных. Поэтому у меня есть список элементов, и я создаю новый Observable в flatMap с Observable.from(data) .

Затем я хочу сортировать местоположения, фильтровать их и группировать.

 .toSortedList() .flatMap { Observable.from(it) } .filter { it.distance <= (maxDistance.toDouble() * 1000) } .groupBy { //Group the stations in categories if (it.distance <= maxDistance && it.favorite) { "nearbyFavorite" } else if (it.favorite) { "outOfReachFavorite" } else { "nearby" } } 

Однако onComplete никогда не вызывается, когда я подписываюсь на Observable. Наблюдаемый просто toSortedList() на toSortedList() .

Подписка:

 .subscribe(object: Subscriber<GroupedObservable<String, NearbyLocationItem>>() { override fun onNext(p0: GroupedObservable<String, NearbyLocationItem>?) { val locationItems = ArrayList<NearbyLocationItem>() p0.subscribe { loc -> locationItems.add(loc) } locations.put(p0.key, locationItems) } override fun onCompleted() { Log.d(javaClass.simpleName, "Never called") } override fun onError(p0: Throwable?) { } }