学习笔记
时间:2017年2月7日
使用工具:1. 在办公电脑上安装的Ubuntu虚拟机
编程语言:scala
val y=x.filter(x => x.split("\\|").length>=5 ).filter(line => line.split("\\|")(1).length()==16).map(x=> x.split("\\|")(1)).count()
spark安装目录:/usr/lib/spark/
采集系统上spark安装版本为1.1.0,hadoop安装版本为2.5.0,系统上目前spark采用的集群模式为standalone模式。
spark配置文件:/usr/lib/spark/conf/spark-env.sh,该文件中包含了指定master节点,UI界面端口。
读取文件端口:
x=sc.textFile("hdfs://192.168.10.17:8020/cucrz/data/look/1901/2016/12/*/"),为8020端口。
启动spark-shell或者pyspark或打包运行时参数:
--master spark://hadoop1:7077 ##设置集群master节点,不加则使用单机版
--executor-memory 10g
##设置集群每个节点运行的内存为10g,最大为14.6g
第一章 spark简介 (略)
第二章 spark下载与入门
- spark本身是用scala写的,运行在java虚拟机上,安装spark需要java 6以上版本的支持。
- 可以不需要安装hadoop,不过如果有了一个hadoop集群或安装好的hdfs,下载对应版本的spark。
使用命令 pyspark 进入python 版本的spark shell
使用命令 spark-shell 进入scala 版本的spark shell
- 在spark中,我们通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动的在集群上进行。这样的数据集成为弹性分布式数据集(resilient distributed dataset),RDD
val lines =sc.textFile("README.md")
lines.count()
lines.first()
- 要退出任意shell,按ctrl+D 或者使用命令
:quit
val lines =sc.textFile("README.md")
val x = lines.filter(line => line.contains("Python"))
x.first()
2.3 spark核心概念简介
- 每个spark应用都由一个驱动器程序来发起集群上的各种分布式并行操作。驱动器程序包含应用的main函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。
- 在上面的例子中,实际的驱动器程序就是spark shell本身,只要输入想运行的程序就可以了。
- 驱动器程序通过一个SparkContext对象来访问spark.这个对象代表对计算集群的一个连接。shell启动时已经自己创建了一个SparkContext对象,是一个叫做sc的变量。
- 一旦有了SparkContext,就可以用它来创建RDD。
2.4 独立应用
- spark也可以在java、scala或python的独立程序中被连接使用,这与在shell中的主要区别在于需要自行初始化SparkContext.
在scala中初始化spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf =new SparkConf().setMaster("local").setAppName("My App")
val sc =new SparkContext(conf)
//上述代码展示了创建SparkContext的最基本的方法,你只要传递两个参数
//集群URL:告诉Spark如何连接到集群上。上述例子中使用的是local.
//应用名:上诉代码中使用的是My App。
//最后,关闭spark可以使用SparkContext中的stop()方法,或者直接退出应用,System.exit(0)或者sys.exit().
- 在初始化SparkContext后,就可以使用之前展示的所有的方法(比如利用文本文件)来创建RDD并操控它们。
单词统计应用
val conf =new SparkConf().setAppName("wordcount")
val sc = new SparkContext(conf)
val input = sc.textFile(inputFile)//inputFile为数据的输入路径和文件名
val words = input.flatMap(line => line.split(" "))//flatMap方法对所有输入的元素进行运算,然后将得到列表结果进行连接
val counts=words.map(word => (word,1)).reduceByKey{case (x,y) => x+y}
counts.saveAsFile(outputFile)
//109节点默认输出的路径在/home/hdfsnfs/user/root/路径中
//或者在hadoop hdfs分布式系统中,使用命令hadoop fs -ls /user/root,在该目录路径下
- 打印RDD变量代码:words.take(10).foreach(println)
第三章 RDD编程
3.1 RDD基础
spark中RDD是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。
用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序中奋发驱动程序中的额对象集合(比如List和Set)。
创建出来后,RDD支持两种类型的操作:转化操作和行动操作。转化操作会由一个RDD生成一个新的RDD。而行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(HDFS)中。
默认情况下,spark的RDD会在你每次对它们进行操作时重新计算。如果在多个行动操作中重用同一个RDD,可以使用RDD.persist()让spark把这个RDD缓存下来。我们可以让spark把数据持久化到许多不同的地方,在第一次对持久化的RDD计算后,spark会把RDD的内容保存到内存中。我们也可以把RDD缓存到磁盘上而不是内存中。
在实际操作中,会经常用到persist()来把数据的一部分读取到内存中,并反复查询这部分数据。
// 例子
x.persist()
x.count()
x.first()
//把RDD持久化到内存中
总的来说,每个spark程序或shell会话都是按照以下的方式工作:
- 从外部数据创建出输入RDD。
- 使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD。
- 告诉spark对需要被重用的中间结果RDD执行persist()操作。
- 使用行动操作来出发一次并行计算,spark会对计算进行优化后再执行。
3.2 创建RDD
- 创建RDD两种方法:
- 一种是把程序中一个已有的集合传给SparkContext的parallelize()方法
val lines =sc.parallelize(List("pandas","i like pandas"))
- 另外一种更常用的方法是从外部存储中读取数据来创建RDD。
val lines = sc.textFile("/path/to/README.md")
3.3 RDD操作
- RDD支持两种操作:转化操作和行动操作。
- RDD的转化操作是返回一个新的RDD操作
- 行动操作是向驱动程序返回结果或把结果写入外部系统的操作,会触发实际的计算。
- 转化操作返回的是RDD,而行动操作返回的是其他的数据类型。
3.3.1 转化操作
val inputRDD =sc.textFile("log.txt")
val errorRDD =inputRDD(line => line.contains("error"))
//filter()操作不会改变已有的inputRDD中的数据。会返回一个新的RDD。
val warningRDD =inputRDD(line => line.contains("warning"))
val badlineRDD =errorRDD.union(warningRDD)
3.3.2 行动操作
println("Input had"+badLineRDD.count()+"concerning lines")
println("Here are 10 examples")
badLinesRDD.take(10).foreach(println)//打印前10条数据
上述例子在驱动器程序中使用take()获取了RDD中少量的元素。然后在本地遍历这些元素,并在驱动器端打印出来。
RDD还有一个collect()函数,可以用来获取整个RDD中的数据。
RDD.collect().foreach(println)
使用collect来打印RDD中的所有元素。
注意:只有当整个数据能在单台机器的内存中放得下时,才能使用collect函数,collect不能使用在大数据集上面。
使用函数saveAsTextFile()和saveAsSequenceFile(),可以将RDD数据保存到HDFS或Amazon S3分布式文件系统中。
每当我们调用一个新的行动操作时,整个RDD都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化,使用persist()函数来实现。
3.3.3 惰性求值
- RDD的转化操作都是惰性求值的,这意味着在被调用行动操作前spark不会开始计算。spark会在内部记录下所要求执行的操作的相关信息。例如,当我们调用sc.textFile()时,数据并没有读取进来,而是在必要时候才会读取。
- 在spark中,写一个非常复杂的映射并不见得比使用很多简单的连续操作获得多的性能。用户可以用更小的操作来组织他们的程序。
3.4 向spark传递函数
class SearchFunction(val query:String){
def isMatch(s:String):Boolean={
s.contains(query)
}
def getMatchesFunctionReference(rdd:RDD[String]):RDD[String]={
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd:RDD[String]):RDD[String]={
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd:RDD[String]):RDD[String]={
val query_=this.query
rdd.map(x => x.split(query_))
}
}
//如果在scala中出现了NotSerializableExecption,通常问题就在于我们传递了一个不可序列化的类中的函数或字段。
//记住,传递局部可序列化的变量或顶级对象中的函数始终是安全的。
3.5 常见的转化和行动操作
3.5.1 基本RDD
1.针对各个元素的转化操作
- map函数
- map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值。输入是每一个元素,返回的是对应类型的每一个运算后的结果。
- map()的返回类型不需要和输入类型一样。这样如果有一个字符串RDD,并且我们的map()函数是用来把字符串解析并返回一个Double值的,那么此时我们的输入RDD类型就是RDD[String],而输出类型就是RDD[Double].
- filter函数
- filter()函数接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。输入的是RDD中的每一个元素,返回的是该元素的是否满足条件的逻辑值。
val input = sc.parallelize(List(1,2,3,4))
val result=input.map(x => x*x)
println(result.collect().mkString(","))//mkString函数将元素连接在一起
- flatMap函数
- flatMap函数被分别应用了输入RDD的每一个元素上,不过返回的不是一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组成的。flatMap()的一个简单用途是把输入的字符串切分为单词。
val lines=sc.parallelize(List("hello world","hi"))
val words=lines.flatMap(line => line.split(" "))
words.first()
2.伪集合操作
- 尽管RDD不是严格意义上的集合,但是也支持数学上的集合操作,比如合并和相交。
表3.2 对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
map() | 将函数应用于RDD中的每个元素,将返回值构成新的RDD | rdd.map(x => x+1) | {2,3,4,4} |
flatMap() | 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来分切单词,执行扁平化操作。 | rdd.flatMap(x => x.to(3)) | {1,2,3,2,3,3,3} |
filter() | 返回一个由通过传给filter()的函数的元素组成的RDD | rdd.filter(x => x!=1) | {2,3,3} |
distinct() | 去重 | rdd.distinct() | {1,2,3} |
sample(withReplacement,fraction,[seed]) | 对RDD采样,以及是否替换,抽取比例等等 | rdd.sample(false,0.5) | {1,2,3} 非确定的 |
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 生成一个包含两个RDD中所有元素的RDD (不去重的并集) | rdd.union(other) | {1,2,3,3,4,5} |
intersection() | 求两个RDD共同的元素的RDD (交集) | rdd.intersection(other) | {3} |
subtract() | 求移除一个RDD中的内容 (差集) | rdd.subtract(other) | {1,2} |
cartesian() | 笛卡尔积 | rdd.cartesian(other) | {(1,3),(1,4),…,(3,5)} |
表3.3:对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 生成一个包含两个RDD中所有元素的RDD (不去重的并集) | rdd.union(other) | {1,2,3,3,4,5} |
intersection() | 求两个RDD共同的元素的RDD (交集) | rdd.intersection(other) | {3} |
subtract() | 求移除一个RDD中的内容 (差集) | rdd.subtract(other) | {1,2} |
cartesian() | 笛卡尔积 | rdd.cartesian(other) | {(1,3),(1,4),…,(3,5)} |
3.行动操作
- 函数reduce()接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个相同类型的新元素。
val sum = rdd.reduce((x,y) => x+y)
fold()和reduce类似,接收一个与reduce()接收的函数签名相同的函数,再加上一个”初始值”来作为每个分区第一次调用时的结果。
你所提供的初始值应当是你提供的操作的单位元素,也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如+对应的0,*对应的1,或拼接操作对应的空列表)
fold()和reduce()都要求函数的返回值类型需要和我们所操作的RDD中的元素类型相同。
aggregate()函数则把我们从返回值类型必须与所操作的RDD类型相同的限制中解放出来。与fold类似,使用aggregate()函数时,需要提供我们期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来放入累加器。
val result =input.aggregate((0,0))(
(acc,value) => (acc._1+value,acc._2+1),
(acc1,acc2) => (acc1._1 + acc2._1 ,acc1._2 +acc2._2)
)
val avg= result._1 /result._2.toDouble
//val input =sc.parallelize(List(1,2,3,4,5,6)),输入input值变量为一个列表类型,对其求平均
- 表3.4 对一个数据为{1,2,3,3}的RDD进行基本的RDD操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
collect() | 返回RDD中的所有元素 | rdd.collect() | {1,2,3,3} |
count() | RDD中元素的个数 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出现的次数 | rdd.countByValue() | {(1,1),(2,1),(3,2)} |
take(num) | 从RDD中返回num个元素 | rdd.take(2) | {1,2} |
top(num) | 从RDD中返回最前面的num个元素 | rdd.top(2) | {3,3} |
takeOrdered(num) | 从RDD中按照提供的顺序返回最前面的num个元素 | rdd.takeOrdered(2)(myOrdering) | {3,3} |
takeSampe(withReplacement,num,[seed]) | 从RDD中返回任意一些元素 | rdd.takeSample(false,1) | 非确定的 |
reduce(func) | 并行整合RDD中所有数据 | rdd.reduce((x,y) => x+y) | 9 |
fold(zero)(func) | 和reduce类似,但是需要提供初始值 | rdd.fold(0)((x,y) => x+y) | 9 |
aggregate(zeroValue)(seqOp,comOp) | 和reduce相似,但是通常返回不同类型的函数 | rdd.aggregate((0,0)) ((x,y)=> (x._1+x,x._2+1) ,(x,y) => (x._1+y._1,x._2+y._2)) | (9,4) |
foreach(func) | 对RDD中的每个元素使用给定的函数 | rdd.foreach(func) | 无 //和map进行对比,map也是对RDD中的每个元素进行操作,但是允许有返回值 |
3.5.2 在不同RDD类型间转换
有些函数只能用于特定类型的RDD,比如mean和variance 只能用于数值RDD上,而join只能用在键值对RDD上。
在Scala中,将RDD转化为有特定函数的RDD是由隐式转换来自动处理的。在2.4.1节中提到,需要加上 import org.apache.spark.SparkContext._来使用这些隐式转换。
3.6 持久化(缓存)
spark RDD是惰性求值的,而有时我们希望能多次使用同一个RDD。为了避免多次计算同一个RDD,可以让spark对数据进行持久化。
-当我们让spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求的分区数据。如果一个有持久化数据的节点发生故障,spark会在需要用当缓存的数据时重算丢失的数据分区。我们可以为RDD选择不同的持久化级别,在scala和Java中,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中
org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级别:如果有必要,可以通过在存储级别的末尾加上_2来把持久化数据存为两份
级别 | 使用的空间 | CPU时间 | 是否在内存中 | 是否在磁盘上 | 备注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果数据在内存中放不下,则溢写到磁盘上 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 如果数据在内存中放不下,则溢写到磁盘上。在内存中放序列化后的数据 |
DISK_ONLY | 低 | 高 | 否 | 是 |
val result =input.map(x => x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
- unpersist方法可以手动把持久化的RDD从缓存中移除。
第四章 键值对操作
- 键值对RDD通常用来进行聚合计算。
- spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为pair RDD。
4.2 创建pair RDD
//在scala中使用第一个单词为键创建出一个pair RDD
val pairs =lines.map(x => (x.split(" ")(0),x))
4.3 Pair RDD的转化操作
- Pair RDD可以使用所有标准RDD上的可用的转化操作。由于pair RDD中包含二元组,所以需要传递的函数应当操作二元组而不是独立的元素。表4-1和表4-2总结了对pair RDD的一些转化操作。
表4-1,pair RDD的转化操作 (以键值对集合 {(1,2),(3,4),(3,6)}为例)
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
reduceByKey(func) | 合并具有相同键的值,使用函数func | rdd.reduceByKey((x,y) => x+y) | {(1,2),(3,10)} |
groupByKey() | 对具有相同键的值进行分组 | rdd.groupByKey() | {(1,[2]),(3,[4,6])} |
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) | 使用不同的返回类型合并具有相同键的值 | ||
mapValues(func) | 对pair RDD中的每个值应用一个函数而不改变键 | rdd.mapValues(x => x+1) | {(1,3),(3,5),(3,7)} |
flatMapValues(func) | 对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。通常用于符号化 | rdd.flatMapValues(x => (x to 5)) | {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} |
keys | 返回一个仅包含键的RDD | rdd.keys | {1,3,3} |
values | 返回一个仅包含值的RDD | rdd.values | {2,4,6} |
sortByKey() | 返回一个根据键排序的RDD | rdd.sortByKey() | {(1,2),(3,4),(3,6)} |
表4-2,针对两个pair RDD 的转化操作 (rdd={(1,2),(3,4),(3,6)},other={(3,9)})
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
subtractByKey | 删掉RDD中键与otherRDD中的键相同的元素(差集) | rdd.subtractByKey(other) | {(1,2)} |
join | 对两个rdd进行内连接 | rdd.join(other) | {(3,(4,9)),(3,(6,9))} |
rightOuterJoin | 对两个RDD进行连接操作,以右边的为主,右外连接 | rdd.rightOuterJoin(other) | {(3,(Some(4),9)),(3,(Some(6),9))} |
leftOuterJoin | 对两个RDD进行左外连接,以左边的为主 | rdd.leftOuterJoin(other) | {(1,(2,None)),(3,(4,Some(9)),(3,(6,Some(9))} |
cogroup | 将两个RDD中拥有相同键的数据分组到一起 | rdd.cogroup(other) | {(1,([2],[])),(3,([4,6])),([4,6],[9])} |
val input=sc.textFile("README.md")
val pairs=input.map(x => (x.split(" ")(0),x))
pairs.filter(value => value._2.length<20).collect().foreach(println)
- 有时,我们只想访问pair RDD 的值部分,这时操作二元组很麻烦。由于这是一种常见的使用模式,因此spark提供了mapValues(func) 函数,功能类似于map.
4.3.1 聚合操作
- reduceByKey() 与reduce()相当类似,它们都接收一个函数,并使用该函数对值进行合并。reduceByKey()会为数据集中的每个键进行并行的归约操作,每个归约操作会将键相同的键合并起来。
- foldByKey() 函数与fold()相当类似,它们都使用一个与RDD和合并函数中的数据类型相同的零值作为初始值。
在scala中使用reduceByKey和mapByKey计算每个键对应的平均值
rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2+y._2))
- 代码预算流程图
key | value | key | value | key | value | ||
---|---|---|---|---|---|---|---|
panda | 0 | panda | (0,1) | panda | (1,2) | ||
pink | 3 | pink | (3,1) | pink | (7,2) | ||
pirate | 3 | pirate | (3,1) | pirate | (3,1) | ||
panda | 1 | panda | (1,1) | ||||
pink | 4 | pink | (4,1) |
在scala中实现单词计数
val input=sc.textFile("README.md")
val words=input.flatMap(x => x.split(" "))
val result=words.map(x => (x,1)).reduceByKey((x,y) => x+y)
- 事实上,我们可以对第一个RDD使用countByValue函数,以更快的实现单词计数:
input.flatMap(x => x.split(" ")).countByValue()
- countByValue() 函数统计个元素在RDD中的出现的次数
combineByKey函数
combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它来实现的。和aggregatte()一样,combineByKey() 可以让用户返回输入数据的类型不同的返回值。
由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combinByKey()会使用一个叫做createCombiner()的函数来创建那个键对应的累加器的初始值。由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。
//scala中使用combineByKey求每个键对应的平均值
val result =input.combineByKey(
(v) => (v,1),
(acc:(Int,Int),v) => (acc._1+v,acc._2+1)
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1+acc2._1,acc1._2+acc2._2)
).map{case (key,value) => (key,value._1/value._2.toFloat)}
result.collectAsMap().map(println(_))
//最后两个函数可以考虑更换代码
//result.mapValues(x => x._1/x._2.toFloat).collect().foreach(println)
combineByKey函数,首先将键值扩展成元组形式,然后对每个分区进行元组统计,然后再对所有分区进行叠加
最后,如果使用map函数,则对RDD中所有元素(包括键)进行元素,这里涉及到一个判断;也可以使用mapValues函数直接对键值进行操作。
并行度调优
- 每个RDD都有固定数目的分区,分区数决定了在RDD上执行操作时的并行度。
- spark始终尝试根据集群的大小推断出一个有意义的默认值,但是有时候可能要对并行度调优来获取更好性能的表现。
val data =sc.parallelize(Seq(("a",3),("b",4),("a",1)))
data.reduceByKey(_+_)
data.reduceByKey(_+_,10)
4.3.3 连接
- 将有键的数据与另一组有键的数据一起使用是对键值对数据执行的最有用的操作之一。连接方式包括:右外连接、左外连接、交叉连接、内连接。普通的join操作符表示内连接。
- 使用函数有:
join
,rightOuterJoin
,leftOuterJoin
,
4.3.2 数据排序
如果键已有定义的顺序,就可以对这种键值对RDD进行排序。当把数据排好序后,后续对数据进行collect()或save等操作都会得到有序的数据。
sortByKey()函数接收一个叫做ascending的参数,表示我们是否想要让结果按升序排序(默认值为true)。
也可以自己定义比较函数。
input.sortByKey(ascending=false).collect().foreach(println)
4.4 pair RDD的行动操作
- 和转化操作一样,所有基础RDD支持的传统行动操作也都在pair RDD上可用。
函数 | 描述 | 示例 | 结果 |
---|---|---|---|
countByKey() | 对每个键对应的元素分别计数 | rdd.countByKey() | {(1,1),(3,2)} |
collectAsMap() | 将结果以映射的形式返回,以便查询 | rdd.collectAsMap() | Map{(1,2),(3,4),(3,6)} |
lookup(key) | 返回给定键对应的所有值 | rdd.lookup(3) | [4,6] |
第五章 数据读取与保存
- spark支持很多种输入输出源。一部分原因是spark本身是基于Hadoop生态圈构建的。
主要有以下三种常见的数据源:
- 文件格式与文件系统(HDFS、NFS、Amazon S3)
- Spark SQL中的结构话 数据源
- 数据库与键值存储
5.2 文件格式
5.2.1 文本文件
- 在spark中读写文本文件很容易。当我们将一个文本文件读取为RDD时,输入的每一行都会成为RDD的一个元素。也可以将多个完整的文本文件一次性读取为一个pair RDD,其中键是文件名, 值是文件内容。
- 读取文本文件
- 只要使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件。
val input =sc.textFile("file:///home/shanjiajun/sparkfile/README.md")
//或 val input=sc.textFile("/home/shanjiajun/sparkfile/README.md")
如果多个输入文件以一个包含数据所有部分的目录的形式出现,可以用两种方式来处理。可以仍然使用textFile()函数,传递目录作为参数,这样它会把各部分都读取到RDD中。有时候有必要知道数据的各部分分别来自哪个文件(比如将键放在文件名中的时间数据),有时候希望同时处理整个文件。如果文件足够小,那么可以使用SparkContext.wholeTextFiles()方法,该方法会返回一个pair RDD,其中键是输入文件的文件名。
Spark支持读取给定目录中的所有文件,以及在输出路径中使用通配符(part-*.txt)。大规模数据集通常存放在多个文件中,因此这一特性很有用,尤其是在同一目录中存在一些级别的文件的时候。
- 保存文本文件
- 输出文本文件,使用函数saveAsTextFile()方法接收一个路径,并将RDD中的内容都输入到路径相应的文件中。spark将传入的路径作为目录对待,会在那个目录下输出多个文件。这样,spark就可以从多个节点上并行输出了。
result.saveAsTextFile(outputFile)
- 输出路径和输入路径一样,写入本地或者hdfs的路径名字
x.saveAsTextFile("file:///home/shanjiajun/sparkfile/datax")
- 使用该函数的时候需要注意,写出的数据需要是RDD类型,否则无法输出。
5.2.3 逗号分隔值与制表分隔值
- 读取csv数据需要先把文件当做普通文本文件来读取数据,再对数据进行处理。
//在scala中使用textFile()读取CSV
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input =sc.textFile(inputFile)
val result=input.map{line => val reader=new CSVReader(new StringReader(line));
reader.readNext();
}
5.3.1 本地文件系统
- spark支持从本地文件系统中读取文件,不过它要求文件在集群中所有节点的相同路径下都可以找到。
val input =sc.textFile("file:///root/README.md")
5.3.3 HDFS中读取数据
在spark中使用HDFS只需要将输入输出路径指定为
hdfs://master:port/path
就够了HDFS协议随Hadoop版本改变而变化,因此如果你使用的Spark是依赖于另一个版本的Hadoop编译的,那么读取会失败。如果从源代码编译,可在环境变量中指定SPARK_HADOOP_VERSION= 来基于另一版本的Hadoop进行编译;也可以直接下载预编译好的SPARK版本。可以根据运行hadoop version的结果来获得环境变量的值。
5.4 Spark SQL 中的结构化数据读取
- 在各种情况下,我们把一条SQL查询给Spark SQL,让它对一个数据源执行查询(选出一些字段或者对字段使一些函数),然后得到由Row对象组成的RDD,每个Row对象表示一条记录。在java和scala中,Row对象的访问时基于下标的。每个Row都有一个get()方法,会返回一个一般类型让我们可以进行类型转换。
5.4.1 Apache Hive
- 要把Spark SQL 连接到已有的Hive上,你需要提供Hive 的配置文件。你需要将hive-site.xml文件复制到spark 的./conf/目录下。这样做好以后,再创建出HiveContext对象,也就是Spark SQL的入口,然后就可以使用Hive查询语言来对表进行查询,并以由行组成的RDD的形式拿到返回数据。
import org.apache.spark.sql.hive.HiveContext
val hiveCtx =new org.apache.spark.sql.hive.HiveContext(sc)
val rows=hiveCtx.sql("select name,age from users")
val firstRow=rows.first()
println(firstRow.getString(0))
- 注意:在13集群上不能使用HiveContext类,但是109集群上可以使用,慢慢来,不要急,以后总会搞定的。
- 也许是版本问题,也许是路径问题。
第六章 spark编程进阶
6.2 累加器
-累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
val sc=new SparkContext(...)
val file=sc.textFile("file.txt")
val blankLines=sc.accumulator(0)
val callSigns =file.flatMap(line => {
if(line == ""){
blankLines +=1 //累加器加1
}
line.split(" ")
})
callSing.saveAsTextFile("output.txt")
println("Blank lines:"+blankLines.value)
//x.saveAsTextFile("file:///home/shanjiajun/sparkfile/datax")
- 注意:只有在运行saveAsTextFile()行动操作以后才能看到正确的计数,以为行动操作之前转化操作flatMap()是惰性的,所以作为计算副产品的累加器只有在惰性的转化操作flatMap()被saveAsTextFile()行动操作强制触发时才会开始求值。
- 当然,也可以使用reduce这样的行动操作将整个RDD中值都聚合到驱动器中。只是我们有时也希望用一个种更简单的方法来对那些与RDD本身的范围和粒度不一样的值进行聚合。
6.6 数值RDD的操作
- spark的数值操作都是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据会在调用stats()时通过一次遍历数据计算出来,并以StatsCounter对象返回。
方法 | 含义 |
---|---|
count() | RDD中的元素个数 |
mean() | 元素的平均值 |
sum() | 总和 |
max() | 最大值 |
min() | 最小值 |
variance() | 元素的方差 |
sampleVariance() | 从采样中计算出的方差 |
stdev() | 标准差 |
smapleStdev() | 采样的标准差 |
表6-2 StatsCounter中可用的汇总统计数据
方法 | 含义 |
---|---|
count() | RDD中的元素个数 |
mean() | 元素的平均值 |
sum() | 总和 |
max() | 最大值 |
min() | 最小值 |
variance() | 元素的方差 |
sampleVariance() | 从采样中计算出的方差 |
stdev() | 标准差 |
smapleStdev() | 采样的标准差 |
- 如果你只想计算这些统计数据中的一个,也可以直接对RDD调用对应的方法,比如:rdd.mean()或者 rdd.sum().
//用scala移除异常值
val distanceDouble=distance.map(string => string.toDouble)
val stats =distinceDoubles.stats()
val stddev=stats.stdev
val mean=stats.mean
val reasonableDistances =distanceDoubles.filter(x => math.abs(x-mean) <3 * stddev)
println(reasonableDistance.collect().toList)
第七章 在集群上运行spark
7.2 Spark运行时构架
- 在分布式环境下颚,spark集群采用的是主/从结构。在一个spark集群中。有一个节点负责中央协调,调度哥哥分布式工作节点。这个中央协调节点被称为驱动器节点,预支对应的工作节点被称为执行器节点。驱动器节点可以和大量的执行器节点进行通信,他们也都作为独立的java进程运行。驱动器节点和所有的执行器节点一起被称为一个spark应用。
7.2.1 驱动器节点
- spark驱动器是执行程序中main()方法的进程。它执行用户编写的用来创建SparkContext、创建RDD、以及进行RDD的转化操作和行动操作的代码。(比如当启动一个spark-shell时,就启动了一个spark驱动器程序。)
- 主要有两个职责:1. 把用户程序转化为任务 2.为执行器节点调度任务
7.2.2 执行器节点
- spark执行器节点是一种工作进程,负责在spark作业中运行任务,任务间相互独立。
7.2.5 小结
- 用户通过spark-submit脚本提交应用。
- spark-submit脚本启动驱动器程序,调用用户定义的main()方法。
- 驱动器进程执行用户 应用中的操作。根据程序中所定义的对RDD的转化操作和行动操作,驱动器节点把工作以任务的形式发送到执行器进程。
- 任务在执行器程序中进行计算并保存结果。
- 如果驱动器程序的main()方法退出,或者调用了SparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源。
7.3 使用spark-submit部署应用
初始化sparkSQL
采集系统上spark不能依赖于hive,该问题需要解决,109集群上可以依赖hive
目前使用sparkSQL
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
val sc=new SparkContext()
val hiveCtx=new SQLContext(sc)