java – 合并多个相同的Kafka Streams主题

前端之家收集整理的这篇文章主要介绍了java – 合并多个相同的Kafka Streams主题前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有2个Kafka主题从不同来源流式传输完全相同的内容,因此我可以在其中一个源失败的情况下获得高可用性. @H_502_2@我正在尝试使用Kafka Streams 0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何有关失败的消息,并且当所有源都启动时没有重复.

当使用KStream的leftJoin方法时,其中一个主题可以没有问题(次要主题),但是当主要主题发生故障时,不会向输出主题发送任何内容.这似乎是因为,根据Kafka Streams developer guide,

KStream-KStream leftJoin is always driven by records arriving from the primary stream

因此,如果没有来自主流的记录,它将不会使用辅助流中的记录,即使它们存在.主流重新联机后,输出将恢复正常.

我也尝试使用outerJoin(添加重复记录),然后转换为KTable和groupByKey以消除重复,

KStream mergedStream = stream1.outerJoin(stream2,(streamVal1,streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,JoinWindows.of(2000L))

mergedStream.groupByKey()
            .reduce((value1,value2) -> value1,TimeWindows.of(2000L),stateStore))
            .toStream((key,value) -> value)
            .to(outputStream)

但我偶尔也会重复一遍.我也使用commit.interval.ms = 200来让KTable经常发送到输出流.

处理此合并以获得多个相同输入主题的一次输出的最佳方法是什么?

解决方法@H_404_21@
使用任何类型的连接都无法解决您的问题,因为您总是会丢失结果(内部连接以防一些流停止)或“重复”时返回null(左连接或外连接以防两个流都是线上).有关Kafka Streams中的连接语义的详细信息,请参阅 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics.

因此,我建议使用处理器API,您可以使用KStream process(),transform()或transformValues()与DSL混合搭配.有关详细信息,请参见How to filter keys and value with a Processor using Kafka Stream DSL.

您还可以向处理器(How to add a custom StateStore to the Kafka Streams DSL processor?)添加自定义存储,以使重复过滤容错.

原文链接:https://www.f2er.com/java/128898.html

猜你在找的Java相关文章