единичный тест с штормом апачей и кассандра: локальная топология, не потребляющая сообщение от кафки

У меня есть тест unit / integration, настроенный для публикации событий в очереди Kafka. У меня также есть локальная топология шторма, которая настроена на то, чтобы потреблять события в очереди kafka, сделать некоторую трансформацию в болт, а затем сохранить ее в базу данных cassandra. Тест настраивается следующим образом:

class StormPersistorTest{ lateinit var cluster: LocalCluster private val zk = ZookeeperHelper() @Before fun setup() { cluster = runBlocking { // same code as in main function val localCluster = LocalCluster() val persistorTopology = PersistorsTopology() val conf = persistorTopology.createLocalConfiguration() conf.setDebug(true) val topology = persistorTopology.createTopology(arrayOf(), conf) localCluster.submitTopology(persistorTopology.name, conf, topology) localCluster } zk.setUp() // creates a ZkClient } @After fun tearDown(){ cluster.killTopology(GraphPersistorsTopology.APPNAME); cluster.shutdown() zk.tearDown() // closes the ZkClient } @Test fun EventPersistorTest(){ val payload = getEventSerializer().initialize().serialize(generateTestEvent()).toByteBuffer() val dataMessage = DataMessage .newBuilder() .setId(null) .setPayload(payload) .build() KafkaIntegrationTestHelpers.enqueueToKafka(topicName, dataMessage, getDataMessageSerializer()) Thread.sleep(5000) Assert.assertEquals(1, countRowsInTable("cassandra_table")) } 

Для приложения у меня есть основная функция в пакете, который использует топологию:

 fun main(args: Array<String>) { val persistorTopology = GraphPersistorsTopology() val conf = persistorTopology.createLocalConfiguration() val topology = persistorTopology.createTopology(arrayOf(), conf) val cluster = LocalCluster() cluster.submitTopology(persistorTopology.name, conf, topology) } } 

То, что я наблюдал, заключается в том, что если я запустил основную функцию для запуска топологии в отдельном экземпляре (и закомментирую блок async вместе с первыми двумя строками в tearDown ), то топология сможет успешно использовать сообщение из кафка. С другой стороны, если я не запускаю основную забаву, чтобы начать топологию и вместо нее использовать async блок, тогда она не nextTuple ошибку, но метод nextTuple даже не выполняется, что не дает моего теста.

Примечание. Я попытался просто запустить код в runBlocking не завернув его в runBlocking . Я подумал, может быть, причина, по которой главная функция работала для меня, заключалась в том, что топология шторма была запущена в другом потоке.