scala – 如何使用封装的源和接收器测试akka流闭合形状可运行图

我创建了一个具有进程函数和传递给它的错误处理函数的akka​​流. Source和Sink完全封装在ClosedShape RunnableFlow中.我的意图是我将一个项目传递给父类并通过流程运行它.这一切似乎都有效,直到我开始测试.我正在使用
scala-test并在流程函数和错误处理函数中传递附加到Lists.我随机生成错误,看到事情也流向错误处理函数.问题是如果我将100个项目传递给父类,那么我希望错误函数中的项目列表和过程函数中的项目列表加起来为100.因为源和接收器是完全封装的,所以我不要有一个明确的方法告诉测试等待,它会在所有项目通过流处理之前到达assertion / should语句.我创建了
this gist来描述流.

以下是上述要点的示例测试:

import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._

class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
    with WordSpecLike with Matchers with BeforeAndAfterAll {
  def this() = this(ActorSystem("TestSpec"))

  override def afterAll = {
    Thread.sleep(500)
    mat.shutdown()
    TestKit.shutdownActorSystem(system)
  }

  implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))

  "TestSpec" must {
    "handle messages" in {
      val testStream = new Testing()                                                 // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
      (1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on

      testStream.errors.size should not be (0)                                       // passes
      testStream.processed.size should not be (0)                                    // passes
      (testStream.processed.size + testStream.errors.size) should be (100)           // fails due to checking before all items are processed
    }
  }
}

最佳答案 根据Viktor Klang关于链接Gist的评论.这被证明是一个很好的解决方案:

def consume(
    errorHandler: BadData => Unit, fn: Data => Unit, a: String
  ): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
    GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
      import GraphDSL.Implicits._

      val source = b.add(Source.single(a))
      val broadcast = b.add(Broadcast[String](2))
      val merge = b.add(Zip[String, String])
      val process = new ProcessorFlow(fn)
      val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
      val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
        (input: Xor[BadData, Data]) =>
          input.swap.getOrElse((new Throwable, ("", "")))
      ))

      source ~> broadcast.in
                broadcast.out(0) ~> Flow[String].map(_.reverse)       ~> merge.in0
                broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
                                                                         merge.out ~> process ~> failed ~> errors ~> sink

      ClosedShape
    }
  )

这允许我在RunnableGraph上使用Await.result进行测试.再次感谢Viktor提供此解决方案!

点赞