我在akka流中实现了一个自定义组件,它将元素作为输入,组合并根据键合并它们并通过十几个出口中的一个发送出去.您可以将此组件视为一种GroupBy组件,它不会将流分为子流,而是实际流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生一些缓冲,使得1个元素不一定意味着通过插座输出1个元素.
以下是所述组件的简化实现.
class CustomGroupBy[A,B](k: Int,f: A => Int) extends GraphStage[FlowShape[B,B]] { val in = Inlet[A]("CustomGroupBy.in") val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out")) override val shape = new AmorphousShape(scala.collection.immutable.Seq(in),outs) /* ... */ }
我现在将该组件的每个插座连接到不同的接收器并将所有这些接收器的物化值组合在一起.
我已经用图形DSL尝试了一些东西,但还没有完全设法让它工作.有人会非常友好地为我提供一个片段来指导我或指向正确的方向吗?
提前致谢!