ReactiveStreams NPE при использовании publishOn с пользовательским издателем

Когда я использую Reactive Streams ( https://github.com/reactor/reactor-core ) с пользовательским Publisher в сочетании с функцией publishOn , я всегда получаю NPE. Что не так с моим кодом? Использую ли я Publisher неправильно?

 Flux.from(MyPublisher()) .publishOn(Schedulers.single()) .subscribe { println("<-- $it received") } class MyPublisher : Publisher<Int> { override fun subscribe(sub: Subscriber<in Int>) { while (true) { Thread.sleep(300) sub.onNext(1) } } } 

Исключение составляет:

 Exception in thread "main" java.lang.NullPointerException at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212) at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18) at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96) at reactor.core.publisher.Flux.subscribe(Flux.java:6447) at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614) at reactor.core.publisher.Flux.subscribe(Flux.java:6440) at reactor.core.publisher.Flux.subscribe(Flux.java:6404) at reactor.core.publisher.Flux.subscribe(Flux.java:6347) at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11) 

    Publisher определяется стандартом «реактивные потоки» и имеет ряд требований. Одним из этих требований является то, что Subscriber.onSubscribe HAS должен быть вызван перед любым из других методов, чтобы следовать протоколу.

    Поскольку вы этого не сделали, это означает, что что-то, вероятно, не инициализировано должным образом, в результате чего NPE внутри класса реактора.

    Однако, даже если вы исправите эту проблему, стандарт разработан как реактивный, что означает, что он испускает данные только тогда, когда абонент просит его. Поскольку вы будете отправлять его данные независимо от того, что, вероятно, приведет к исключению позже в строке. Используйте Flux.create для создания эмиттера, который может корректно обрабатывать запросы, а не создавать собственную реализацию Publisher.