Как я могу условно добавить асинхронную операцию в середине потока RxJava?

Вот упрощенная версия того, что я пытаюсь сделать (используя Kotlin и RxJava)

makeServerCall() .doOnNext { doStuff(it) } //TODO: if it == 0, call asyncOperation() and wait for its callback to fire //before running the rest of the stream. Otherwise immediately run the rest //of the stream .flatMap { observable1(it) observable2(it) Observable.merge( getSpotSearchObservable(observable1), getSpotSearchObservable(observable2) } .subscribeBy(onNext = { allDone() view? }) 

Как я могу сжать вызов asyncOperation() и заставить остальную часть потока ждать, пока его обратный вызов будет asyncOperation() , но только при выполнении определенного условия? Кажется, что это, вероятно, тривиальная операция в Rx, но очевидное решение не приходит на ум.

FlatMap это!

 .flatMap { if (it == 0) { return@flatMap asyncOperation() .ignoreElements() .andThen(Observable.just(0)) } return@flatMap Observable.just(it) } .flatMap { observable1(it) observable2(it) Observable.merge( getSpotSearchObservable(observable1), getSpotSearchObservable(observable2) ) }