序列化 – 无法反序列化ActorRef以将结果发送到不同的Actor

我开始使用Spark Streaming处理我得到的实时数据馈送.我的场景是我有一个Akka actor接收器使用“with ActorHelper”,然后我让我的Spark工作做一些映射和转换然后我想把结果发送给另一个actor.

我的问题是最后一部分.当试图发送给另一个演员时,Spark正在提出异常:

15/02/20 16:43:16 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use ‘akka.serialization.Serialization.currentSystem.withValue(system) { … }’

我创建这个最后一个actor的方式如下:

val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")

然后像这样使用它:

result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))

我不知道在哪里或如何建议“使用’akka.serialization.Serialization.currentSystem.withValue(system){…}’”.我是否需要通过配置设置任何特殊内容?或者以不同的方式创建我

最佳答案 请查看以下示例以访问Spark域之外的actor.

/ *
     *以下是使用actorStream将自定义actor插入接收器
     *
     *需要注意的重点:
     *由于Actor可能存在于spark框架之外,因此用户有责任
     *确保类型安全,即接收的数据类型和InputDstream
     *应该是一样的.
     *
     *例如:参数化了actorStream和SampleActorReceiver
     *为相同类型,以确保类型安全.
     * /

val lines = ssc.actorStream[String](
  Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
    host, port.toInt))), "SampleReceiver")
点赞