scala – Akka Stream连接到多个接收器

前端之家收集整理的这篇文章主要介绍了scala – Akka Stream连接到多个接收器前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我在akka流中实现了一个自定义组件,它将元素作为输入,组合并根据键合并它们并通过十几个出口中的一个发送出去.您可以将此组件视为一种GroupBy组件,它不会将流分为子流,而是实际流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生一些缓冲,使得1个元素不一定意味着通过插座输出1个元素.

以下是所述组件的简化实现.

class CustomGroupBy[A,B](k: Int,f: A => Int) extends GraphStage[FlowShape[B,B]] {

  val in = Inlet[A]("CustomGroupBy.in")
  val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out"))

  override val shape = new AmorphousShape(scala.collection.immutable.Seq(in),outs)

  /* ... */
}

我现在将该组件的每个插座连接到不同的接收器并将所有这些接收器的物化值组合在一起.

我已经用图形DSL尝试了一些东西,但还没有完全设法让它工作.有人会非常友好地为我提供一个片段来指导我或指向正确的方向吗?

提前致谢!

解决方法

您最有可能想要内置的 broadcast舞台.示例用法可以在 here找到:

val bcast = builder.add(Broadcast[Int](2))

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
            bcast ~> f4 ~> merge

猜你在找的Scala相关文章