kotlin grpc.StreamObserver для удаления в rx.PublishSubject

При использовании GRPC всякий раз, когда мы объявляем streaming api

rpc heartBeat(Empty) returns (stream ServiceStatus){} 

у нас есть простой интерфейс для шаблона StreamObserver (это то, что protobuf будет генерировать для нас)

 public interface StreamObserver<V> { void onNext(V var1); void onError(Throwable var1); void onCompleted(); } 

Теперь то, что вы хотите сделать, – это преобразовать это в фактическое Observable и только после этого передать его для дальнейшего использования.

 override fun heartBeat(arg: Empty): Observable<ServiceStatus> { // we create rx java subject val subject = PublishSubject.create<ServiceStatus>() // we create grpc observer and delegate all calls to rx java val observer = object : StreamObserver<ServiceStatus> { override fun onNext(value: ServiceStatus) { subject.onNext(value) } override fun onError(error: Throwable) { subject.onError(error) } override fun onCompleted() { subject.onCompleted() } } // we use grpc observer for generated api asyncStub.heartBeat(arg, observer) // but we pass rx observable (subject) to client code return subject } 

теперь я новичок в Kotlin, но я не могу разобраться с существующими функциями делегирования, есть ли способ сделать делегата Subject из StreamObserver? Есть ли более выразительный способ написать этот фрагмент кода в Котлине?

Я бы создал общий метод, который создает StreamObserver , передает его его аргументу лямбда и завершает результат в Observable .

 inline fun <T> asObservable( crossinline body: (StreamObserver<T>) -> Unit): Observable<T> { return Observable.create { subscription -> val observer = object : StreamObserver<T> { override fun onNext(value: T) { subscription.onNext(value) } override fun onError(error: Throwable) { subscription.onError(error) } override fun onCompleted() { subscription.onCompleted() } } body(observer) } } 

Затем вы можете реализовать методы RPC следующим образом.

 override fun heartBeat(arg: Empty): Observable<ServiceStatus> = asObservable { asyncStub.heartBeat(arg, it) } 
Intereting Posts
Как импортировать каталог / полный пакет в Intellij для целей тестирования Ошибка установки Gradle: Javadoc Возможно ли распространять список внутри списка в Котлине? Класс не найден в котлин? Как заменить webView пользовательским webView? Способы жизненного цикла в статически типизированных языках Как получить выбранный элемент из ListView в Котлине? TornadoFX – удалить элемент с помощью контекстного меню правой кнопкой мыши Apache Tomcat 8.0 не может загрузить класс сервлета, написанный в Kotlin, используя Eclipse для Java EE, Mars 2 Android Studio 3 canary & Kotlin Использовать аргумент, переданный, когда в состоянии филиала в Котлин? Невозможно внедрить один и тот же экземпляр в Сервис и ViewModel Kotlin: "val someVar = if (xx) 1 else 1.0", почему someVar является "Any"? Есть ли какой-либо примерный рабочий код для создания предварительно подписанных URL S3 в Котлине можно ли добавить шаблон в getter / setter класса данных?