Как динамически масштабировать отскок потока эмиссионных выбросов?

У меня есть буферизованный поток, ожидающий заданного количества времени молчания, прежде чем опубликовать список элементов, которые были буферизованы:

INTEGERS .share() .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler)) .map { durations -> ... } 

Я бы хотел, чтобы DEBOUNCE_TIME динамически настраивался в зависимости от среднего значения буферизированных элементов, но мне сложно понять, как это достичь.

Вы можете отложить debounce, взять один элемент и запустить повтор, как только будет определено новое значение дебюта:

 int DEBOUNCE_TIME = 100; AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME); PublishSubject<Integer> mayRepeat = PublishSubject.create(); AtomicInteger counter = new AtomicInteger(); Observable<Integer> INTEGERS = Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200) .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS) .map(w -> counter.incrementAndGet())); INTEGERS.publish(o -> o.buffer( Observable.defer(() -> o.debounce( debounceTime.get(), TimeUnit.MILLISECONDS) ) .take(1) .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b)) ) ) .map(list -> { int nextDebounce = Math.min(100, list.size() * 100); debounceTime.set(nextDebounce); mayRepeat.onNext(1); return list; }) .blockingSubscribe(System.out::println); 

Это печатает:

 [1, 2] [3, 4, 5] [6, 7, 8, 9] [10] 
Intereting Posts
Наблюдаемое свойство, позволяющее добавлять наблюдателей во время выполнения Преобразовать ByteArrayOutputStream в json в Котлин Какова точка назначения сопутствующего объекта в котлин Внедрение интерфейса с параметризованной функцией Свойство Kotlin: «Тип параметра свойства должен использоваться в его типе приемника» Сборник JAR для Scala & Kotlin Выражение лямбда не используется Изменить текучесть <Список <Obj1 >> в текущую <Список <Obj2 >> в комнате Не могли бы вы объяснить этот фрагмент кода с точки зрения кода C #? Компонент (не облачный) может не ссылаться на привязки с привязкой Как я могу построить SQL-запрос с динамическим именем столбца в пространственной постоянной библиотеке Джессинская десериализация Kotlin JS Как реализовать ленивое свойство в Котлине, которое требует другого имущества? Spring REST, Kotin и примитивные параметры по умолчанию приводят к ошибке PropertyModel не может работать с частным полем Kotlin с get ()