Как продолжить обработку после ошибки в RxJava 2?

У меня есть PublishSubject и Subscriber которые я использую для обработки (возможно) бесконечного потока предварительно обработанных данных. Проблема в том, что некоторые элементы могут содержать некоторую ошибку. Я бы хотел проигнорировать их и продолжить обработку. Как я могу это сделать? Я пробовал что-то вроде этого:

  val subject = PublishSubject.create<String>() subject.retry().subscribe({ println("next: $it") }, { println("error") }, { println("complete") }) subject.onNext("foo") subject.onNext("bar") subject.onError(RuntimeException()) subject.onNext("wom") subject.onComplete() 

Моя проблема заключается в том, что ни один из методов обработки ошибок не помогает мне здесь:

onErrorResumeNext() – инструктирует Observable испускать последовательность элементов, если встречает ошибку

onErrorReturn( ) – инструктирует Observable испускать определенный элемент, когда он сталкивается с ошибкой

onExceptionResumeNext( ) – инструктирует Observable продолжать выдавать элементы после того, как он встречает исключение (но не другое разнообразие броски)

retry( ) – если источник Observable испускает ошибку, повторно подпишитесь на нее в надежде, что она будет завершена без ошибок

retryWhen( ) – если источник Observable испускает ошибку, передайте эту ошибку другому наблюдаемому, чтобы определить, следует ли повторно подписываться на источник

Например, я попытался retry() но он зависает мой процесс после ошибки на неопределенный срок.

Я также попытался onErrorResumeNext() но он не работает onErrorResumeNext() :

  val backupSubject = PublishSubject.create<String>() val subject = PublishSubject.create<String>() var currentSubject = subject subject.onErrorResumeNext(backupSubject).subscribe({ println("next: $it") }, { println("error") currentSubject = backupSubject }, { println("complete") }) backupSubject.subscribe({ println("backup") }, { println("backup error") }) currentSubject.onNext("foo") currentSubject.onNext("bar") currentSubject.onError(RuntimeException()) currentSubject.onNext("wom") currentSubject.onComplete() 

Это только печатает foo и bar .

Если вы хотите продолжить обработку после ошибки, это означает, что ваша ошибка является значением, аналогичным вашим onNext и должна пройти onNext . Чтобы обеспечить безопасность типов в этом случае, вы должны использовать некоторую форму оболочки, которая может принимать регулярное значение или ошибку; например, io.reactivex.Notification<T> доступен в RxJava 2:

 PublishSubject<Notification<String>> subject = PublishSubject.create(); subject.subscribe(System.out::println); subject.onNext(Notification.createOnNext("Hello")); subject.onNext(Notification.<String>createOnError(new RuntimeException("oops"))); subject.onNext(Notification.createOnNext("World"));