Spark 应用场景示例
Spark 项目搭建
环境介绍
框架 | 版本 |
---|---|
Centos | 7 |
Java | 8 |
Scala | 2.11.12 |
SBT | 1.0 |
Spark | 2.3.0 |
IDEA plugin | Scala |
准备工作
- 搭建一个
Standalone
模式的Spark 集群
在本地搭建开发环境时,不需要搭建Spark集群环境,在MasterUrl使用
local[n]
可以代替。
local[n]
自动配置了n个本地的Spark副本,与集群环境差异不大,且对本地调试友好。
- 搭建Kafka服务器
新建项目
使用IDE新建Scala 或 Java 工程,确保项目结构符合Maven
推荐的项目结构。
以IDEA为例:
新建工程 — Scala — sbt (需要安装
Scala 插件
) ,选择Scala版本为2.11.*
,Spark最新仅支持此版本。也可稍后再更改Scala版本。引入Spark依赖,此时
build.sbt
文件如下
name := "spark-scala"
version := "1.0"
// 这里可以更改Scala版本
scalaVersion := "2.11.12"
// Spark Sql 依赖,其中包含了 Spark core 等核心依赖
libraryDependencies += "org.apache.spark" % "spark-sql" % "2.3.0"
- 运行
sbt update
更新依赖
场景-静态数据(Spark SQL)
从静态数据源(Parquet,Json,CVS,JDBC,Hive,RDDs)读取数据,运行分析
再resource
目录构建一个Json
数据源data.json
:
注意: Spark 仅支持如下格式的Json文件,不支持
Standard Json
即常规json格式文件。每行一条数据记录。
{"name": "001", "age": 10, "gender": 0,"jobs":["joba","jobb"]}
{"name": "002", "age": 19, "gender": 1}
{"name": "003", "age": 13, "gender": 2}
新建Static Data Spark Demo.scala
:
package cn.fatalc.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object StaticDataSparkDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf setMaster "local[2]"
val sparkSession = SparkSession
.builder
.config(sparkConf)
.appName("static data spark")
.getOrCreate()
// 从数据源读取数据
val path = ClassLoader getSystemResource "data.json" toString
val data = sparkSession.read.json(path.toString)
data.show()
data.filter("age > 10").show()
// 创建临时表
data.createOrReplaceTempView("person")
sparkSession.sql("select * from person where age > 10").show()
}
}
以上,我们拟对数据进行展示和基本的筛选工作(age > 10)
开启调试,可以看到log
中Spark执行了 3 个Job
,并已经正确输出了预期的结果。
18/04/03 14:45:23 INFO DAGScheduler: Job 1 finished: show at StaticDataSparkDemo.scala:22, took 0.096747 s
+---+------+------------+----+
|age|gender| jobs|name|
+---+------+------------+----+
| 10| 0|[joba, jobb]| 001|
| 19| 1| null| 002|
| 13| 2| null| 003|
+---+------+------------+----+
...
18/04/03 14:45:24 INFO DAGScheduler: Job 2 finished: show at StaticDataSparkDemo.scala:23, took 0.391123 s
+---+------+----+----+
|age|gender|jobs|name|
+---+------+----+----+
| 19| 1|null| 002|
| 13| 2|null| 003|
+---+------+----+----+
...
18/04/03 14:45:25 INFO DAGScheduler: Job 3 finished: show at StaticDataSparkDemo.scala:28, took 0.196013 s
+---+------+----+----+
|age|gender|jobs|name|
+---+------+----+----+
| 19| 1|null| 002|
| 13| 2|null| 003|
+---+------+----+----+
接下来就可以根据需求进行更复杂的数据处理操作
场景-流式数据(Spark Stream)
从Kafka、Flume、S3/HDFS、kinesis、Twitter等数据源读取数据进行实时分析
例:从Kafka
读取流数据,进行实时处理。
开始之前
由于读取Kafka流式数据,我们需要模拟kafka流。
- 需要搭建本地Kafka服务器
- 需要有持续的流数据
搭建Kafka服务器
参考Kafka文档
新建SpringBoot项目向Kafaka服务器不断发送数据
核心文件KafkaApplication.class
package cn.fatalc.kafka;
import org.apache.commons.lang.math.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
@SpringBootApplication
public class KafkaApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
@Autowired
KafkaTemplate kafkaTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
while (true) {
Thread.sleep(1000);
HashMap<String, Object> map = new HashMap<>();
map.put("name", "user" + RandomUtils.nextInt(100));
map.put("gender", RandomUtils.nextInt(2));
System.out.println(map.toString());
kafkaTemplate.send("spark", map.toString());
}
}
}
application.yml
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
以上,我们向Kafka服务器的topic
为saprk
上不断发送数据以模拟数据流。
现在,启动程序开始模拟数据流
处理流数据
复用上例中的目录结构,也可以新建一个sbt
项目。
新建文件StreamDataSparkDemo.scala
package cn.fatalc.spark
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object StreamDataSparkDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
sparkConf setMaster "local[2]"
sparkConf setAppName "StreamDataSparkDemo"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.34.179:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
// "auto.offset.reset" -> "latest",
// "enable.auto.commit" -> (false: java.lang.Boolean),
"group.id" -> "spark_group"
)
val topics = Array("spark")
val streamingContext = new StreamingContext(sparkConf, Seconds(1))
val inputDStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
inputDStream.foreachRDD(rdd =>{
rdd.foreach(record => {
println(record.value())
})
} )
streamingContext.start()
streamingContext.awaitTermination()
}
}
以上,我们从Kafaka服务器读取一个topic
为spark
的流,然后进行展示。
运行程序,输出如下:
18/04/04 14:56:43 INFO AppInfoParser: Kafka version : 0.10.0.1
18/04/04 14:56:43 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
18/04/04 14:56:43 INFO CachedKafkaConsumer: Initial fetch for spark-executor-spark_group spark 0 13000
18/04/04 14:56:43 INFO AbstractCoordinator: Discovered coordinator fatal-centos:9092 (id: 2147483647 rack: null) for group spark-executor-spark_group.
18/04/04 14:56:44 INFO JobScheduler: Added jobs for time 1522825004000 ms
{gender=0, name=user83}
{gender=0, name=user52}
{gender=1, name=user96}
{gender=1, name=user92}
{gender=0, name=user58}
{gender=1, name=user27}
{gender=0, name=user47}
{gender=0, name=user98}
{gender=1, name=user77}
{gender=0, name=user15}
{gender=1, name=user10}
{gender=1, name=user53}
{gender=1, name=user57}
{gender=0, name=user94}
{gender=1, name=user59}
{gender=1, name=user80}
{gender=1, name=user97}
{gender=1, name=user71}
{gender=1, name=user35}
{gender=1, name=user46}
{gender=1, name=user19}
{gender=0, name=user34}
18/04/04 14:56:44 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver
18/04/04 14:56:44 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 467 ms on localhost (executor driver) (1/1)
18/04/04 14:56:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/04/04 14:56:44 INFO DAGScheduler: ResultStage 0 (foreach at StreamDataSparkDemo.scala:35) finished in 1.062 s
取出数据之后,就可以用于实时分析了。
假设topic spark
为新注册的用户信息,我们可以统计新用户的每实时注册量,以及阶段内新注册用户性别比例。
在StreamDataSparkDemo.scala
中修改
<未完待续…>