通过spark-submit提交的任务都需要指定Main类作为程序的入口,Main类执行结束即Spark任务终结。如果需要通过外部程序实时向Spark任务提交数据并获取结果又该如何呢?
思路很简单,让Spark任务的Main方法不终止,外部程序与Spark任务进行通信,交互数据。
通信方式很多,比如Socket,netty或者内置Tomcat,Jetty等,不过考虑编码的快捷,通过Akka是比较不错的选择。
开发分为2部分。1.编写Spark任务,该部分会提交到Spark集群中。2.外部调用代码,该部分模拟客户端代码。2者食用Akka Actor进行通信。
先看Spark任务部分
SparkConfig 定义SparkContext对象
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object SparkConfig {
val conf = new SparkConf().setAppName("testSpark")
val sc = new SparkContext(conf)
}
DataService 作为调用Spark RDD操作的业务类。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.sam.spark.demo.data.config.SparkConfig
import scala.collection.mutable.ArrayBuffer
class DataService {
def handler(list: ArrayBuffer[String]) : String = {
val array = SparkConfig.sc.parallelize(list).max()
array
}
}
Worker Akka的Actor对象,接收外部入参,调用DataService对象,并返回结果
import akka.actor.Actor
import org.slf4j.LoggerFactory
import com.sam.spark.demo.data.service.DataService
import com.sam.spark.demo.akka.msg.TextMessage
import java.util.UUID
import scala.collection.mutable.ArrayBuffer
class Worker extends Actor {
val dataService = new DataService()
def receive = {
case x: ArrayBuffer[String] => {
val tm = new TextMessage()
tm.msg = dataService.handler(x)
sender ! tm
}
}
}
TextMessage 作为返回的消息对象
class TextMessage extends Serializable {
var msg : String = null
}
AkkaConfig Actor配置类,创建Worker对象
import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory
object AkkaConfig {
val system = ActorSystem("ReactiveEnterprise",ConfigFactory.load().getConfig("serverSystem"))
val workerRef = system.actorOf(Props[Worker], "worker")
}
程序入口类
import com.sam.spark.demo.akka.AkkaConfig
import com.sam.spark.demo.data.config.SparkConfig
import scala.concurrent.duration.Duration
import scala.concurrent.Await
import java.util.concurrent.TimeUnit
object AppStart {
def main(args: Array[String]): Unit = {
SparkConfig
AkkaConfig
}
}
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>Main类名</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Akka配置文件
serverSystem {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
default-dispatcher {
throughput = 2
}
serializers {
java = "akka.serialization.JavaSerializer"
}
serialization-bindings {
"需要序列化的消息类名" = java
}
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "Akka Remote服务地址"
port = Akka Remote端口
}
}
}
}
打包
mvn clean scala:compile package -DskipTests=true
发布到Spark集群
./spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class Main类名 --master spark://Spark Master地址 ./spark.demo-0.0.1-SNAPSHOT.jar
再看客户端实现
本地Actor 获取远程ActorRef并发送消息
import akka.actor.Actor
import akka.actor.ActorSelection
import com.sam.spark.demo.akka.msg.TextMessage
import scala.collection.mutable.ArrayBuffer
class Client extends Actor {
var remoteActor : ActorSelection = context.actorSelection("akka.tcp://ReactiveEnterprise@10.16.64.146:2555/user/processManagers/worker")
override def receive: Receive = {
case msg: ArrayBuffer[String] => {
remoteActor ! msg
}
case msg: TextMessage => {
println(msg.msg)
}
}
}
本地Main类 模拟向远端Actor发送消息
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.actor.Props
import akka.pattern.Patterns
import scala.concurrent.duration.Duration
import scala.concurrent.Await
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import java.util.UUID
import scala.collection.mutable.ArrayBuffer
object ClientStart {
def main(args: Array[String]): Unit = {
val serverSystem = ActorSystem("clientSystem", ConfigFactory.load().getConfig("clientSystem"))
val clientRef = serverSystem.actorOf(Props[Client], "client")
while (true) {
var list = new ArrayBuffer[String]
for (i <- 1 to 100) {
list += UUID.randomUUID().toString()
}
clientRef ! list
Thread.sleep(500)
}
// val future = Patterns.ask(clientRef, "world", Timeout.apply(10L, TimeUnit.SECONDS));
// val result = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
// println(result)
}
}
Akka配置文件
clientSystem {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
default-dispatcher {
throughput = 2
}
serializers {
java = "akka.serialization.JavaSerializer"
}
serialization-bindings {
"需要序列化的消息类名" = java
}
}
}
}