我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题并分别使用FlinkKafkaProducer09和FlinkKafkaConsumer09从相同的kafka主题读取数据.我正在传递产品中的测试数据:
DataStream<String> stream = env.fromElements("tom","jerry","bill");
并检查相同的数据是否来自消费者:
List<String> expected = Arrays.asList("tom","bill"); List<String> result = resultSink.getResult(); assertEquals(expected,result);
使用TestListResultSink.
我可以通过打印流来查看来自消费者的数据.但无法获得Junit测试结果,因为消费者即使在消息完成后也会继续运行.所以它没有来测试部分.
在Flink或FlinkKafkaConsumer09中以任何方式停止进程或运行特定时间吗?