Intereting Posts
Какова основная разница между сбросом и сокращением в Котлине? Когда использовать какой? Android Studio ярлык для автозаполнения Toast (Kotlin) Какова цель по умолчанию для аннотации при аннотации свойства в Котлине? «Объект» не является частью схемы для этого Царства Построение проекта Kotlin + Java 9 с помощью Gradle как правильно инициализировать общий ресурс в kotlintest 2.x (interceptSpec) Вымытый объект, все еще вызывающий метод (Mockito + Kotlin) checkParameterIsNotNull ошибка в примере входа в систему Переопределение «writeToParcel» не допускается. Вместо этого используйте компаньон «Parceler» Android studio конвертирует в Kotlin: используйте @JvmStatic в некоторых случаях kotlin-runtime.jar не существует Коутин Котлин с возвращаемым значением Возможно ли создать рекурсивный тип функции в Котлин? Kotlin – как проверить класс данных Как исправить ошибку вывода типа в Completed, преобразованную с помощью RxLifecycle.bindToLifecycle ()?

ReactiveStreams NPE при использовании publishOn с пользовательским издателем

Когда я использую Reactive Streams ( https://github.com/reactor/reactor-core ) с пользовательским Publisher в сочетании с функцией publishOn , я всегда получаю NPE. Что не так с моим кодом? Использую ли я Publisher неправильно?

 Flux.from(MyPublisher()) .publishOn(Schedulers.single()) .subscribe { println("<-- $it received") } class MyPublisher : Publisher<Int> { override fun subscribe(sub: Subscriber<in Int>) { while (true) { Thread.sleep(300) sub.onNext(1) } } } 

Исключение составляет:

 Exception in thread "main" java.lang.NullPointerException at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212) at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18) at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96) at reactor.core.publisher.Flux.subscribe(Flux.java:6447) at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614) at reactor.core.publisher.Flux.subscribe(Flux.java:6440) at reactor.core.publisher.Flux.subscribe(Flux.java:6404) at reactor.core.publisher.Flux.subscribe(Flux.java:6347) at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11) 

    Publisher определяется стандартом «реактивные потоки» и имеет ряд требований. Одним из этих требований является то, что Subscriber.onSubscribe HAS должен быть вызван перед любым из других методов, чтобы следовать протоколу.

    Поскольку вы этого не сделали, это означает, что что-то, вероятно, не инициализировано должным образом, в результате чего NPE внутри класса реактора.

    Однако, даже если вы исправите эту проблему, стандарт разработан как реактивный, что означает, что он испускает данные только тогда, когда абонент просит его. Поскольку вы будете отправлять его данные независимо от того, что, вероятно, приведет к исключению позже в строке. Используйте Flux.create для создания эмиттера, который может корректно обрабатывать запросы, а не создавать собственную реализацию Publisher.