Форма входа с повторным запросом на rxJava и дооснащение

Я хочу сделать регистрационную форму на rxJava и модифицировать. Но если у меня возникла ошибка при запросе сервера, вызывается отказ от подписки со дна rx

У меня есть ui методы, которые реализованы rxAndroidBinding lib

fun validEmail(): Observable<CharSequence> //last well formated login fun validPassword(): Observable<CharSequence> //last password of length fun clicks(): Observable<Unit> //clicks on login buttons 

У меня есть метод регистрации, который реализуется Retrofit fun authorize(email:String, pass:String): Observable<Unit>

Я хочу сделать запрос, если оба входа действительны, и у меня есть кнопка входа в систему

“ `

  val validPair = rx.Observable.combineLatest(iView.validEmail(), iView.validPassword(), ::ValidLoginPair) .doOnNext { iView.setLoginButtonEnabled(true) } subscription = rx.Observable.combineLatest(validPair, iView.clicks(), { pair, unit -> pair }) .doOnNext { iView.setProgress(true) } .flatMap { model.get().authorize(it.email.toString(), it.password.toString()) .observeOn(AndroidSchedulers.mainThread()) .doOnError { it.printStackTrace(); iView.setProgress(false); iView.showError("NetWorkError", it.message ?: "Unknown error") } } .observeOn(AndroidSchedulers.mainThread()) .subscribe({ iView.setProgress(false); iView.onLogin() }) {it.printStackTrace(); iView.setProgress(false);} 

“ `

Если у меня есть ответ 200, он работает. Но если вызывается onError, все подписки на события ui будут неподписными. Таким образом, при втором щелчке ничего не произойдет. Что я делаю неправильно? Почему такая модернизация работает?


версии compile 'io.reactivex:rxandroid:1.2.1' compile 'com.squareup.retrofit2:retrofit:2.1.0' compile 'io.reactivex:rxjava:1.1.6'

стоп без подписчиков

  java.lang.Thread.State: WAITING at com.jakewharton.rxbinding.widget.TextViewAfterTextChangeEventOnSubscribe$2.onUnsubscribe(TextViewAfterTextChangeEventOnSubscribe.java:40) at rx.android.MainThreadSubscription.unsubscribe(MainThreadSubscription.java:72) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.operators.OnSubscribeCombineLatest$LatestCoordinator.cancel(OnSubscribeCombineLatest.java:178) at rx.internal.operators.OnSubscribeCombineLatest$LatestCoordinator.unsubscribe(OnSubscribeCombineLatest.java:165) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.operators.OnSubscribeCombineLatest$LatestCoordinator.cancel(OnSubscribeCombineLatest.java:178) at rx.internal.operators.OnSubscribeCombineLatest$LatestCoordinator.unsubscribe(OnSubscribeCombineLatest.java:165) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:814) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:573) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562) at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:846) at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:72) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219) at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:135) at android.app.ActivityThread.main(ActivityThread.java:5254) at java.lang.reflect.Method.invoke(Method.java:-1) at java.lang.reflect.Method.invoke(Method.java:372) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698) 

Из документации ReactiveX:

Наблюдатель может делать ноль или более уведомлений OnNext, каждый из которых представляет собой один испускаемый элемент, и затем он может следовать за уведомлениями о выбросах либо с уведомлением OnCompleted, либо с уведомлением OnError, но не с обоими. После выдачи уведомления OnCompleted или OnError он не может впоследствии выдавать какие-либо дальнейшие уведомления.

onError если есть сетевая ошибка или ошибка API (код вне диапазона 200-300). После вызова onError Observable завершается.

Извините за использование Java в моем примере, но одно решение будет:

 authorize(username, password).flatMap(() -> Observable.just(true)) .onErrorReturn(e-> Observable.just(false)) 

Это гарантирует, что Observable не будет завершен в случае ошибки. Вместо этого он испустит ложь. Для успеха это испустит истину. Вы можете заменить boolean другим классом, содержащим больше данных (возможно, сообщение об ошибке).

Примечание: onErrorReturn будет использовать эту ошибку. doOnError не использует ошибку, onError все равно будет вызываться.