Intereting Posts
Как перебирать hasmap в котлин? Что именно создает Regex с помощью Regex.fromLiteral ()? Как получить правильный тип при возврате шаблона <T?> Из статической функции с нулевым значением Kotlin: переопределение общего свойства внутри подтипа Результат тот же, но тестовый пример не проходит в модульном тесте Какова цель Декларации объекта внутри герметичного класса в Котлине? Как вводить слушателей? Gradle KTS. Как переместить конфигурацию зависимостей в отдельный файл из основной сборки? Переменные в Котлине, отличия от Java. var vs val? Замок Kotlin подстановочного знака в параметре обратного вызова списка Наблюдаемое свойство, позволяющее добавлять наблюдателей во время выполнения Android: дублирование фрагментов в FragmentManager MQTT Android в режиме «Доза» (Android 7.0) Удалите все неиспользуемые ресурсы из проекта Android Как указать начальную задержку для периодического задания Android с помощью JobScheduler?

Как я могу приостановить событие, проходящее через наблюдаемое?

Я пытаюсь создать Observable, который может быть приостановлен таким образом, что элементы перестают толкаться через наблюдаемые, пока они не перестанут.

В этот момент я бы хотел, чтобы он возобновил обработку всех необработанных элементов. Мой источник данных поступает из-за пределов класса, поэтому у меня возникают такие ситуации:

class Agent { val publisher = PublishSubject.create<Event>() val subscription = createSubscription() fun trackEvent(e: Event) { publisher.onNext(e) } fun pause() { // ??? } fun resume() { // ??? } private fun createSubscription(): Subscription { return publisher .map { stringify(it) } .buffer(10L, TimeUnit.SECONDS, 500) // capture 500 events or 10 seconds worth, whichever comes first. .map { /* create HttpPost request */ } .flatMap { /* send request to server */ } .subscribe { println("Received response: $it") } } } 

То, что я намереваюсь, заключается в том, что функция pause останавливает события даже от входа на сервер (но будет держаться за них до окончательного resume ). На момент resume события мы отправляем события. (Очевидно, мы добавили бы дополнительную помощь для противодавления, если у нас было слишком много событий во время приостановленного состояния.

Я пробовал различные применения буферизации и оконной обработки, чтобы сделать эту работу, но она никогда не останавливает наблюдаемые. Вместо этого происходит одна из двух вещей:

  1. Событие полностью прекращается (в случае отмены подписки, фильтрации и т. Д.),
  2. Событие протекает, как будто ничего не произошло.

Есть ли что-нибудь, что я могу сделать, чтобы поддержать этот вариант использования? Или я должен писать это в ожидании того, что один из вышеупомянутых двух результатов – это то, что произойдет?

Хитрость заключается в использовании другого BehaviorSubject в качестве закрывающего события для дополнительной буферизации:

 val publisher = PublishSubject.create<Event>() fun trackEvent(e: Event) { publisher.onNext(e) isPaused.onNext(isPaused.value) } val isActive = BehaviorSubject.create(true) fun pause() { isActive.onNext(false) } fun resume() { isActive.onNext(true) } private fun createSubscription(): Subscription { return publisher .buffer(10L, TimeUnit.SECONDS, 500) // -> Observable<List<Event>> .buffer({ isActive.filter { it } }) // -> Observable<List<List<Event>>> .flatMap { Observable.from(it) } // -> Observable<List<Event>> .map { /* create HttpPost request */ } .flatMap { /* send request to server */ } .subscribe { println("Received response: $it") } } 

Первый buffer вызов помещает входящие события в ведра с указанным размером или по истечении времени. Второй buffer закрывает текущее ведро на событиях, испускаемых наблюдаемым, указывая, что Agent не приостановлен ( isActive.filter { it } ). isActive испускает значение для каждого события, и поскольку isActiveBehaviorSubject он будет isActive свое последнее значение каждому новому подписчику. То есть на каждом ковше, испускаемом первым вызовом buffer он будет либо продолжать сразу, либо ждать, пока Agent будет возобновлен.