Spark文档 - 快速入门

本文简要介绍一下Spark。首先通过交互式shell介绍Spark API,然后是如何使用Scala编写应用程序。

要注意的是,Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。而Spark 2.0之后,RDD被Dataset取代,Dataset是一种强类型的RDD,但是底层做了很多优化。Spark依然支持RDD接口,可以在[RDD编程指南]中查看更多内容。不过Spark更推荐使用Dataset,它的性能比RDD更好,更多内容包含在[SQL编程指南]中。

使用Spark shell进行交互式分析

基本用法

Spark shell即可用于学习API,也是一个交互式分析数据的利器。这里选用Scala shell,首先执行脚本启动shell:

./spark-shell

Spark的主要抽象是Dataset(分布式数据项集合)。Dataset可以从Hadoop输入格式创建,也可以从其他Dataset转换而来。下面代码从README文件创建了一个Dataset:

scala> val textFile = spark.read.textFile("README.md")

textFile: org.apache.spark.sql.Dataset[String] = [value: string]

可以直接调用动作算子从Dataset获取值,也可以使用转换算子将Dataset转换成新Dataset。

scala> textFile.count() // Number of items in this Dataset

res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset

res1: String = # Apache Spark

下面代码使用filter算子返回一个原Dataset的子集:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))

linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

也可以将多个算子结合起来使用:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?

res3: Long = 15

更多Dataset操作

Dataset算子可以应用于更复杂的计算。假设我们想找到单词数最多的行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

res4: Long = 15

首先map算子将行映射成单词数,生成一个新Dataset,接着调用reduce算子找到最大单词数。mapreduce的参数都是Scala函数直接量(闭包),也可以使用Scala的类库。例如,可以直接使用预先声明的函数:

scala> import java.lang.Math

import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

res5: Int = 15

更常见的数据流是MapReduce,Spark也可以很容易的实现:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

这里,flatMap算子将行Dataset转换成单词Dataset,之后将groupByKeycount结合起来计算单词出现次数。collect算子可以打印结果:

scala> wordCounts.collect()

缓存

Spark可以在内存中缓存数据。对于那些需要被频繁查询的“热点”数据,或者运行PageRank这样的迭代算法时,这一特性很有用。下面是一个简单示例:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

自包含应用程序

这里使用Scala创建一个简单的应用程序。

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

应用程序应当直接定义main方法,不要继承scala.App类,scala.App的子类可能无法正常工作。

该程序统计了包含字母’a’和’b’的行数。不同于Spark shell,程序中需要初始化一个SparkSession。

调用SparkSession.builder方法直接构造一个SparkSession,之后设置应用程序名,然后调用getOrCreate创建一个SparkSession实例。

应用程序依赖Spark API,需要在sbt配置文件中添加依赖。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

如果sbt工作正常,将源代码打包,之后可以使用spark-submit运行应用程序。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46, Lines with b: 23
    原文作者:sungoshawk
    原文地址: https://www.jianshu.com/p/a395f1b5928a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞