Intereting Posts
Перенос файлов Java в Kotlin в Android Studio Тип несоответствия, требуемый узел, найденная строка Отправлять ответ json в Котлине Как я могу использовать метод (String) Java String непосредственно из источника Kotlin? Как назвать assertEquals с Double Epsilon / Precision в Котлине? Как получить второй фрагмент из списка popBackStack в pop? Как реализовать в jna структуру sizeof () с типом объединения Почему mutableMapOf возвращает значение NULL? Могут ли в производстве использоваться «экспериментальные» Kotlin-сопрограммы? Создать файл из байт-массива, отправленного внутри JSON Object kotlin Можете ли вы конкатенировать заявления во время выполнения в Котлине? Расширения и переменные Android Kotlin Могу ли я использовать сторонние java-библиотеки (.jar) для разработки Android с Kotlin? Проблемы, определяющие оператор equals () Kotlin-android-расширение связи между классами, аналогичными общению с другими фрагментами

Как я могу явно передать завершение Flowable в RxJava?

Я пытаюсь создать Flowable который обертывает Iterable . Я периодически нажимаю элементы на свой Iterable но кажется, что событие завершения неявно. Я не знаю, как сообщить, что обработка завершена. Например, в моем коде:

  // note that this code is written in Kotlin val iterable = LinkedBlockingQueue<Int>() iterable.addAll(listOf(1, 2, 3)) val flowable = Flowable.fromIterable(iterable) .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.computation()) flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")}) iterable.add(4) Thread.sleep(1000) iterable.add(5) Thread.sleep(1000) 

Это печатает:

1 2 3 4 завершено

Я проверил источник интерфейса Flowable но, похоже, я не могу сказать, что Flowable полностью заполнен. Как я могу это сделать? В моей программе я публикую события, которые имеют некоторую задержку между ними, и я хотел бы быть явным, когда нужно complete поток событий.

Уточнение : у меня длительный процесс, который испускает события. Я собираю их в очереди, и я выставляю метод, который возвращает Flowable, который обертывается вокруг моей очереди. Проблема в том, что в очереди могут быть элементы, когда я создаю Flowable. Я обработаю события только один раз, и я знаю, когда поток событий останавливается, поэтому я знаю, когда мне нужно завершить Flowable.

Использование .fromIterable – неправильный способ создания Flowable для вашего .fromIterable использования.
Я действительно не PublishSubject что такое вариант использования, но вы, вероятно, захотите использовать Flowable.create() или PublishSubject

 val flowable = Flowable.create<Int>( { it.onNext(1) it.onNext(2) it.onComplete() }, BackpressureStrategy.MISSING) val publishSubject = PublishSubject.create<Int>() val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING) //This data will be dropepd unless something is subscribed to the flowable. publishSubject.onNext(1) publishSubject.onNext(2) publishSubject.onComplete() 

Конечно, как вы справляетесь с противодавлением, будет зависеть природа источника данных.

Как и предлагал akarnokd, ReplayProcessor делает именно то, что вы хотите. Замените iterable.add(item) на processor.onNext(item) и вызовите processor.onComplete() когда вы закончите.