我开始使用Spark Streaming处理我得到的实时数据馈送.我的场景是我有一个Akka actor接收器使用“with ActorHelper”,然后我让我的Spark工作做一些映射和转换然后我想把结果发送给另一个actor.
我的问题是最后一部分.当试图发送给另一个演员时,Spark正在提出异常:
15/02/20 16:43:16 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use ‘akka.serialization.Serialization.currentSystem.withValue(system) { … }’
我创建这个最后一个actor的方式如下:
val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")
然后像这样使用它:
result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))
我不知道在哪里或如何建议“使用’akka.serialization.Serialization.currentSystem.withValue(system){…}’”.我是否需要通过配置设置任何特殊内容?或者以不同的方式创建我
最佳答案 请查看以下示例以访问Spark域之外的actor.
/ *
*以下是使用actorStream将自定义actor插入接收器
*
*需要注意的重点:
*由于Actor可能存在于spark框架之外,因此用户有责任
*确保类型安全,即接收的数据类型和InputDstream
*应该是一样的.
*
*例如:参数化了actorStream和SampleActorReceiver
*为相同类型,以确保类型安全.
* /
val lines = ssc.actorStream[String](
Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")