Использование RxJava для объединения локальных данных с удаленными (или кэшированными) данными

Это рабочий код, но у меня есть несколько вопросов, а также запрос на консультации по его улучшению. Я новичок в RxJava, и я не полностью обернулся вокруг того, как объединить эти типы наблюдаемых вместе.

У меня есть два объекта модели: ListItem и UserInfo . ListItem существует в локальной базе данных, и UserInfo ListItem с сервера с помощью идентификатора, предоставленного в ListItem .

Веб-служба UserInfo принимает массив идентификаторов, для которых он вернет список объектов UserInfo .

Поток этого кода выглядит следующим образом:

  1. Загрузить ListItem из базы данных
  2. Используя ListItem извлеченные из базы данных, проверьте кеш в памяти, чтобы узнать, уже ли я пользователь UserInfo для определенного ListItem
  3. Для любых элементов, чей UserInfo не кэширован, вытащите их из сети
  4. Поместите UserInfo объекты UserInfo в кеш
  5. Повторно запустите шаг 2 (метод loadCachedUserInfo )
  6. Возврат результатов подписчикам

ПРИМЕЧАНИЕ. Объекты UserInfo должны быть ListItem только для ListItem если список считается isUserList .

Вот код:

 fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> { return Observable.create<List<ATListItem>> { subscriber -> val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder) subscriber.onNext(listItems) subscriber.onCompleted() }.flatMap { listItems -> if ( list.isUserList ) { return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) } return@flatMap Observable.just(listItems) }.flatMap { listItems -> if ( list.isUserList ) { return@flatMap fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false) } return@flatMap Observable.just(listItems) } } fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> { return Observable.create<List<ATListItem>> { subscriber -> for ( listItem in listItems ) { listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] } subscriber.onNext(listItems) subscriber.onCompleted() } } fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<ATListItem>> { val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null } val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() } val records = hashMapOf("records" to ids) if ( itemsToFetch.count() == 0 ) { return Observable.just(listItems) } return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records) .map { json -> val recordsArray = json.arrayValue("records") for ( i in 0..recordsArray.length() - 1) { val coreUserInfo = CoreUserInfo(recordsArray.getJSONObject(i)) coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo coreUserMap[coreUserInfo.userID] = coreUserInfo coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo } } return@map listItems }.flatMap { loadCachedUserInfo(listItems, userIDIndex = userIDIndex) } } 

Пользователь инициирует последовательность событий, вызывая:

ListController.itemsInList(list)

Мои вопросы об этом коде:

  1. В настоящее время loadCachedUserInfo принимает массив ListItem и возвращает тот же массив, что и наблюдаемый после того, как связанные с ним элементы кэширования. Это кажется мне неправильным. Я думаю, вместо этого этот вызов должен возвращать только те элементы, которые связаны с кешированным UserInfo . Однако мне нужно продолжить передачу полного массива ListItem следующему методу

2.) Нужно ли мне делать дополнительную работу для поддержки отмены подписки?

3.) Это аналогичный вопрос 1. Мой fetchUserInfoForListItems принимает массив элементов списка и возвращает наблюдаемый с тем же массивом элементов списка после того, как они были извлечены и повторно запущены через метод кеша. Это также кажется мне неправильным. Я бы предпочел, чтобы этот метод возвращал Observable<List<UserInfo>> для объектов, которые были извлечены. Я не понимаю, как в itemsInList затем связать ListItem с новым пользователем UserInfo и вернуть Observable из этих ListItem s.

Редактировать : после написания этого сообщения это помогло мне понять несколько вещей. Я могу flatMap обернуть мои вызовы в Observable.create, которые могут содержать smarts, которые я хотел бы извлечь из своих fetchUserInfoForListItems , позволив мне задать вопрос №3. Вот обновленный код:

  fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> { return Observable.create<List<ATListItem>> { subscriber -> val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder) subscriber.onNext(listItems) subscriber.onCompleted() }.flatMap { listItems -> if ( list.isUserList ) { return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) } return@flatMap Observable.just(listItems) }.flatMap { listItems -> if ( list.isUserList ) { return@flatMap Observable.create<List<ATListItem>> { subscriber -> fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false).map { userInfoList -> for (coreUserInfo in userInfoList) { coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo coreUserMap[coreUserInfo.userID] = coreUserInfo coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo } } }.flatMap { loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) }.subscribe { subscriber.onNext(listItems) subscriber.onCompleted() } } } return@flatMap Observable.just(listItems) } } fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> { return Observable.create<List<ATListItem>> { subscriber -> listItems.forEach { listItem -> listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] } subscriber.onNext(listItems) subscriber.onCompleted() } } fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<CoreUserInfo>> { val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null } val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() } val records = hashMapOf("records" to ids) if ( itemsToFetch.count() == 0 ) { return Observable.just(ArrayList<CoreUserInfo>()) } return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records) .map { json -> val userInfo = ArrayList<CoreUserInfo>() json.arrayValue("records").eachObject { userInfo.add(CoreUserInfo(it)) } return@map userInfo } } 

  1. В настоящее время loadCachedUserInfo принимает массив ListItem и возвращает тот же массив, что и наблюдаемый после того, как связанные с ним элементы кэширования. Это кажется мне неправильным. Я думаю, вместо этого этот вызов должен возвращать только те элементы, которые связаны с кешированным UserInfo. Однако мне нужно продолжить передачу полного массива ListItem следующему методу

Я не уверен, что правильно вас понимаю, но если вам нужен только побочный эффект (кеширование), вы можете просто использовать doOnNext . Например,

 .doOnNext { listItems -> if ( list.isUserList ) { cache(listItems, userIDIndex = list.userIDIndex!!) } } fun cache(listItems : List<ATListItem>, userIDIndex : Int) { // caching } 
  1. Нужно ли делать дополнительную работу для поддержки отмены подписки?

Нет, AFAIK.

Заметка:

Более подробную информацию о doOnNext можно найти в разделе Что такое doOnNext (…) в RxJava и здесь

Обычно вам не нужно return@... если последний оператор в лямбда является выражением. например:

 .flatMap { listItems -> if ( list.isUserList ) { return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) } return@flatMap Observable.just(listItems) } 

может быть написано так:

 .flatMap { listItems -> if ( list.isUserList ) loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) else Observable.just(listItems) } 

Я не тестировал код.