Преобразование наблюдаемого в текущее с противодавлением в RxJava2

Я наблюдаю за строками, создаваемыми NetworkResource , обертывая их в Observable.create . Вот код, отсутствующий try / catch и аннулирование для простоты:

 fun linesOf(resource: NetworkResource): Observable<String> = Observable.create { emitter -> while (!emitter.isDisposed) { val line = resource.readLine() Log.i(TAG, "Emitting: $line") emitter.onNext(line) } } 

Проблема заключается в том, что позже я хочу превратить ее в Flowable с помощью функции observable.toFlowable(LATEST) для добавления противодавления в случае, если мой потребитель не сможет идти в ногу с ним, но в зависимости от того, как я это делаю, потребитель перестает получать предметы после элемента 128.

А) таким образом все работает:

 val resource = ... linesOf(resource) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .toFlowable(BackpressureStrategy.LATEST) .subscribe { Log.i(TAG, "Consuming: $it") } 

B) здесь потребитель застревает после 128 пунктов (но испускание продолжается):

 val resource = ... linesOf(resource) .toFlowable(BackpressureStrategy.LATEST) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128 

В варианте A) все работает без каких-либо проблем, и я вижу, как Emitting: ... log бок о бок с журналом Consuming: ...

В варианте B) я вижу сообщение с сообщением Emitting: ... которое счастливо излучает новые строки, но я перестаю видеть сообщение Consuming: ... log после элемента 128, хотя испущенное продолжается.

Вопрос : Может ли кто-нибудь помочь мне понять, почему это происходит?

Прежде всего, вы используете неправильный тип и неправильный оператор. Использование Flowable устраняет необходимость в преобразовании. Использование Flowable.generate дает вам противодавление:

 Flowable.generate(emitter -> { String line = resource.readLine(); if (line == null) { emitter.onComplete(); } else { emitter.onNext(line); } }); 

Во-вторых, причина, по которой ваша версия зависает, связана с тем же самым тупиком в пуле, вызванным subscribeOn . Запросы из нисходящего потока запланированы за вашим нетерпеливым циклом выбросов и не могут вступить в силу, останавливая эмиссию по 128 элементам по умолчанию. Используйте Flowable.subscribeOn(scheduler, false) чтобы избежать этого случая.