Articles of rx java2

RxJava с использованием Kotlin – как синхронизировать 2 асинхронных метода, рефакторинг с Java

У меня есть 2 коллекции, события обновления буфера: private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>(); private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>(); В моем коде также присутствует: private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor(); private boolean mSaveDataScheduled; private final Object mEventsMonitor = new Object(); private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture; private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor(); Я добавляю событие к этим […]

OnErrorNotImplementedException с использованием RxJava2 и Retrofit2 Mosby MVI

Я получаю сообщение OnErrorNotImplementedException и приложение падает, несмотря на обработку ошибки downstream (?). исключение E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1 Process: pl.netlandgroup.smartsab, PID: 9920 io.reactivex.exceptions.OnErrorNotImplementedException: HTTP 401 Unauthorized at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74) at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63) at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:56) at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) at io.reactivex.Observable.subscribe(Observable.java:10838) at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10838) at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) at […]

Как указать версию RxJava при использовании RxKotlin?

Документы в репозитории RxKotlin на GitHub не указывают способ явно зависеть от последней версии RxJava. Если мы увидим файл build.gradle библиотеки, он теперь использует compile 'io.reactivex.rxjava2:rxjava:2.1.0' Но что, если мы хотим идти в ногу с последними релизами RxJava, а не с теми, которые присутствуют в библиотеке.

RxJava запускает часть плана в основной теме

Привет, я пытаюсь реализовать Единый наблюдаемый, который объединяет два запроса вместе. В промежутке между двумя запросами, которые я делаю, я уведомляю обратный вызов, чтобы обновить интерфейс с ответом от запроса, а затем запустить следующий запрос в потоке Schedulaers.io. Проблема, с которой я сталкиваюсь, заключается в том, что она пытается обновить пользовательский интерфейс из потока schedulars.io, […]

RX2.0 вложенная обработка ошибок подписки

Я пытаюсь понять, как исключения попадают в потоки RX. В частности, если у меня есть подписка в подписке, и внутренняя подписка получает исключение из ее наблюдаемого, мне нужно обрабатывать это в onError для внутренней подписки или я могу обработать ее в onError родительской подписки? observableThing1.subscribe({ _ -> observableThing2.subscribe({ _ -> // Something horrible happens here […]

Как использовать RxJava2 combLatest со списком наблюдаемых в Котлине

Я знаю, как это сделать в RxJava 2 . И я знаю, как RxKotlin помогает с подобными проблемами. Но кажется, что RxKotlin.Observables не имеет этой вспомогательной функции для перегрузки списка, и я не могу понять это. Как бы вы это сделали?

Как запомнить состояние с помощью операторов повтора в RxJava2

У меня есть сетевой клиент, который может возобновить работу от прерываний, но для последнего сообщения требуется последнее сообщение для этого. Пример в Котлине: fun requestOrResume(last: Message? = null): Flowable<Message> = Flowable.create({ emitter -> val connection = if (last != null) client.start() else client.resumeFrom(last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() emitter.onNext(msg) } }, BackpressureStrategy.MISSING) requestOrResume() […]

RxAndroid – обращение к ошибкам с помощью оператора Zip

Я пытаюсь найти способ выполнять запросы параллельно и обрабатывать их, когда каждый наблюдаемый заканчивается. Несмотря на то, что все работает, когда все наблюдаемые дают ответ, я не вижу способа справиться с каждой ошибкой, когда все будет закончено. Это пример zip-оператора, который в основном выполняет 2 запроса параллельно: Observable.zip( getObservable1() .onErrorResumeNext { errorThrowable: Throwable -> Observable.error(ErrorEntity(Type.ONE, […]

Единичное тестирование Rxjava наблюдаемых, которые имеют задержку

Я хочу, чтобы иметь возможность тестировать Observable с задержкой излучения, но без ожидания времени задержки. Есть ли способ сделать это? В настоящее время я использую CountDownHatch для задержки утверждения, и это работает нормально, но увеличивает время тестового запуска. Пример: val myObservable: PublishSubject<Boolean> = PublishSubject.create<Boolean>() fun myObservable(): Observable<Boolean> = myObservable.delay(3, TimeUnit.SECONDS) @Test fun testMyObservable() { val […]

Как повысить значение возвращаемого значения aa конструктора

Я хочу исправить ошибку компилятора несоответствия типа, которую я получаю от следующего кода, где вывод типа не делает то, что я хочу. У меня есть закрытый класс и поток rxjava: sealed class Result { data class Success(val logs: List<Log>) : Result() data class Failure(val throwable: Throwable) : Result() object InFlight : Result() } val logs: […]