RDD编程
RDD创建:
第一种:读取一个外部数据集。比如,从本地文件加载数据集,或者从HDFS文件系统
- 从文件系统中加载数据创建RDD
- 从本地加载:
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
- 从HDFS加载:
scala> val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") scala> val lines = sc.textFile("/user/hadoop/word.txt") scala> val lines = sc.textFile("word.txt")
- 从本地加载:
- 从文件系统中加载数据创建RDD
第二种:调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
- 通过并行集合创建:
cala>val array = sc.parallelize(Array(1,2,3,4,5))
- 通过并行集合创建:
RDD操作:
转换操作:
每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的
并不会发生真正的计算,只是记录了转换的轨迹,只有遇到行动操作时,才会发生真正的计算。- filter(func):筛选出满足函数func的元素,并返回一个新的数据集
- map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
- flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
- groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
- reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
行动操作:
行动操作是真正触发计算的地方。从文件中加载数据,完成一次又一次转换操作- count() 返回数据集中的元素个数
- collect() 以数组的形式返回数据集中的所有元素
- first() 返回数据集中的第一个元素
- take(n) 以数组的形式返回数据集中的前n个元素
- reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
- foreach(func) 将数据集中的每个元素传递到函数func中运行*
实例:
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)
//可以写成:
scala> lines.map(_.split(" ").size)...
持久化:
在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。
如果需要调用多次 action(行动)操作,每次都要从头计算
每次计算经常需要多次重复使用同一组数据,这样就可以采用持久化的方式
使用persist()方法对一个RDD标记为持久化
之所以称为 “标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,
而是要等到遇到第一个行动操作触发真正计算以后,才会对计算结果进行初始化
persist(MEMORY_ONLY):
表示将RDD作为反序列化的对象存储于JVM中,内存不足,就要替换缓存中的内容persist(MEMORY_AND_DISK):
将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上
分区(以前不太理解,划重点!):
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。
分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,一个分区用一个cpu去处理
通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目
scala>val rdd = sc.parallelize(Array(1,2,3,4,5),2) #设置两个分区
打印元素:
一般会采用语句rdd.foreach(println)或者rdd.map(println)
但是如果有多个节点,在一个节点上运行上述 只输出在这个节点上的元素
通过 rdd.collect().foreach(println) collect方式输出所有的元素
这可能会导致内存溢出。因此,当你只需要打印RDD的部分元素时,
可以采用语句rdd.take(100).foreach(println)。
键值对RDD
- 键值对就是一组 map(k,value)的集合
第一种创建方式,从文件中加载:
val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt") val pairRDD = lines.flatMap(_split(" ")).map((_,1))
第二种创建方式,通过并行集合创建:
val rdd = sc.parallelize(List("Hadoop","Spark","Hive","Spark")) val pairRDD = rdd.map((_,1))
常用的键值对转换操作
reduceByKey(func)
使用func函数合并具有相同键的值
比如,reduceByKey((a, b) => a + b) 有四个键值对(“spark”, 1) (“spark”, 2)(“hadoop”, 3) (“hadoop”, 5)
合并后的结果就是:(“spark”, 3)、(“hadoop”, 8)。groupByKey()
对具有相同键的值进行分组
上面例子采用groupByKey()后得到的结果是:(“spark”, (1, 2)) 和
(“hadoop”, (3, 5))。keys
只会把键值对RDD中的key返回形成一个新的RDD
上面例子采用keys得到的结果为 {“spark”, ”spark”, ”hadoop”, ”hadoop”}。values
只会把键值对RDD中的value返回形成一个新的RDD
上面例子结果为:{1, 2, 3, 5}sortByKey()
返回一个根据键排序的RDD
(Hadoop, 1) (Hive, 1) (Spark, 1) (Spark, 1)mapValues(func)
对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化
pairRDD.mapValues(_ + 1)
(Hadoop, 4) (Hadoop, 5) (“spark”, 2) (Spark, 3)join
常用为内连接,对于给定的两个输入数据集(K,V1)和(K,V2),
只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。举个比较复杂的例子:
val pairRDD1 = sc.parallelize(Array(("aa",2),("bb",10),("bb",3),("cc",4))) val pairRDD2 = sc.parallelize(Array(("bb","haha"),("bb","iiii"),("cc",5),("cc","yyy"))) 执行 join: pairRDD1.join(pairRDD2).foreach(println) 结果: (bb,(10,haha)) (bb,(10,iiii)) (bb,(3,haha)) (bb,(3,iiii)) (cc,(4,5)) (cc,(4,yyy))
一个综合实例
给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),
键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,
也就是计算每种图书的每天平均销量。val rdd = sc.parallelize(Array(("“spark”",2),("“hadoop”",10),("“hadoop”",3),("“spark”",4))) rdd.mapValues(x => (x, 1)).reduceByKey((x,y) => (x._1 + y._1, x._2 +y._2)) .mapValues(x => (x._1 / x._2)).collect()
共享变量
当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,
它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。
但是,有时候,需要在多个任务之间共享变量
为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)
广播变量用来把变量在所有节点的内存之间进行共享。
累加器则支持在所有不同节点之间进行累加计算(比如计数或者求和)。
广播变量
允许程序开发人员在每个机器上缓存一个 只读 的变量
val broadcastVar = sc.broadcast(Array(1, 2, 3))
累加器
通常可以被用来实现计数器(counter)和求和(sum)
一个数值型的累加器,可以通过调用SparkContext.longAccumulator(),或者doubleAccumulator()来创建。
只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值,
在节点中只是进行累加操作:
val accum = sc.longAccumulator("My Accumulator") //指定这个累加器的名字
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
文件数据读写
不同文件格式的读写
文本文件
scala> val textFile = sc.textFile("file:///zyb/word.txt") //读 scala> textFile.saveAsTextFile("file:///zyb/writeback.txt") //写
JSON
people.json文件的内容如下: {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} val jsonStr = sc.textFile("file:///zyb/people.json") //JSON.parseFull:以一个JSON字符串作为输入并进行解析, //如果解析成功则返回一个Some(map: Map[String, Any]) jsonStrs.map(s => JSON.parseFull(s)) result.foreach(println()) 结果为: Map(name -> Michael) Map(name -> Andy, age -> 30.0) Map(name -> Justin, age -> 19.0)
读写HBase
读取:
首先创建一个HBase表
hbase> create 'student','info' hbase> put 'student','1','info:name','Xueqian' hbase> put 'student','1','info:gender','F' hbase> put 'student','1','info:age','23' //然后录入student表的第二个学生记录 hbase> put 'student','2','info:name','Weiliang' hbase> put 'student','2','info:gender','M' hbase> put 'student','2','info:age','24'
配置Spark
把HBase的lib目录下的一些jar文件拷贝到Spark中编写程序读取HBase数据
val conf = HBaseConfiguration.create() val sc = new SparkContext(new SparkConf()) //设置查询的表名 conf.set(TableInputFormat.INPUT_TABLE, "student") //通过调用newAPIHadoopRDD 得到一个RDD val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) stuRDD.count() //第一次行动 stuRDD.cache() //持久化 //遍历输出,第二次行动 stuRDD.foreach({ case (_,result) => val key = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes)) val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes)) val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age) })
通过sbt打包编译
在simple.sbt配置文件中,需要知道scalaVersion、spark-core、hbase-client、hbase-common、hbase-server的版本号执行后结果:
Students RDD Count:2 Row key:1 Name:Xueqian Gender:F Age:23 Row key:2 Name:Weiliang Gender:M Age:24
写入:
```
val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
val sc = new SparkContext(sparkConf)
//调用 hadoopConfiguration set 来传入表的名字
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, "student")
//创建一个job,来封装 student表的 列的数据格式
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//创建一个RDD,用来插入到表中
val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //构建两行记录
val rdd = indataRDD.map(_.split(',')).map{arr => {
//行健的值
val put = new Put(Bytes.toBytes(arr(0)))
//info:name列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
//info:gender列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
//info:age列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))
//生成一个 map
(new ImmutableBytesWritable, put)
}
}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
```