1. 目的
在Spark提供的Scala环境, 编写实例, 测试Scala语法
2. 实例
2.1 读取本地文件
# test.txt如下:
abcd
heihei
heihei
# 读取文件, 并对每行出现频次进行统计, 使用reduceByKey
val lines = sc.textFile("file:///home/xx/test.txt")
val pairs = lines.map(s=>(s,1))
pairs.reduceByKey((x, y)=>x+y).foreach(println)
# 输出结果如下
(abcd,1)
(heihei,2)
# reduceByKey 巧妙替代 groupByKey, 减少数据的shuffle
# 如下: 如果用groupByKey也可以实现, 需要把每个分组数据全部shuffle到一起后在sorted, 之后在去topN
scala> val a = List((1, 2), (1, 3), (1,4), (1,5), (2,10), (2,11), (2,12))
a: List[(Int, Int)] = List((1,2), (1,3), (1,4), (1,5), (2,10), (2,11), (2,12))
val N = 3
# 下面是获取分组后, 排序取top N
rdda.map { case (name, time) =>
val listBuffer = new ListBuffer[Long]()
listBuffer.append(time)
(name, listBuffer)
}.reduceByKey { case (t1, t2) =>
val listBuffer = new ListBuffer[Long]()
listBuffer.appendAll(t1)
// listBuffer.append(t2) // 注意 reduceByKey, t1, t2可能代表已经汇聚完的值, 所以要appendAll
listBuffer.appendAll(t2)
listBuffer.sorted.take(N)
}.collect
# 上面更好的方法, 是用List代替ListBuffer, 见下:
rdd.map { case (name, time) =>
(name, List(time))
}.reduceByKey { case (t1, t2) =>
(t1 ++ t2).sorted.take(threshold)
}.collect
2.2 join实例
val a1 = List((2,(200,300)), (3,(400,500)), (4,(500,600)))
val a2 = List((2,(200,300)), (3,(400,500)), (1,(500,600)))
val rdd1 = sc.parallelize(a1)
val rdd2 = sc.parallelize(a2)
rdd1.leftOuterJoin(rdd2).foreach(println)
// 关于Option的处理:
val rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.map{ case (id,(x,y))=>
val (y3,y2) = y.getOrElse((null, null))
(id, x._1, x._2, y3, y2)
}.foreach(println)
// 打印输出结果:
(4,500,600,null,null)
(3,400,500,400,500)
(2,200,300,200,300)
// 交互输入如下:
scala> val a1 = List((2,(200,300)), (3,(400,500)), (4,(500,600)))
a1: List[(Int, (Int, Int))] = List((2,(200,300)), (3,(400,500)), (4,(500,600)))
scala> val a2 = List((2,(200,300)), (3,(400,500)), (1,(500,600)))
a2: List[(Int, (Int, Int))] = List((2,(200,300)), (3,(400,500)), (1,(500,600)))
scala> val rdd1 = sc.parallelize(a1)
rdd1: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ParallelCollectionRDD[14] at parallelize at <console>:26
scala> val rdd2 = sc.parallelize(a2)
rdd2: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ParallelCollectionRDD[15] at parallelize at <console>:26
scala> rdd1.join(rdd2).foreach(println)
(3,((400,500),(400,500)))
(2,((200,300),(200,300)))
scala> rdd2.leftOuterJoin(rdd1).foreach(println)
(3,((400,500),Some((400,500))))
(2,((200,300),Some((200,300))))
(1,((500,600),None))
2.2 groupByKey实例
// 目的: 生成可以进行groupByKey的rdd
var arr1 = new Array[(Int, Int)](10)
arr1(0) = (1,1)
arr1(1) = (1,2)
arr1(2) = (2,1)
arr1(3) = (3,1)
arr1(4) = (10,1)
val rddA = sc.parallelize(arr1.filter(_ != null).map{case (n1,n2)=>(n1,n2)})
rddA.groupByKey().foreach(println)
2.3 cogroup实例
scala> val rdd1 = sc.parallelize(List((1,2),(1,3),(1,4),(2,10),(2,10),(3,11),(3,12),(4,100)))
scala> val rdd2 = sc.parallelize(List((1,2), (1,3),(10,111)))
scala> rdd1.cogroup(rdd2).map{case (id, (f1,f2)) =>
| val f = if (f1.isEmpty) -1 else f1
| (id, f, f1, f2)}.foreach(println)
(1,CompactBuffer(2, 3, 4),CompactBuffer(2, 3, 4),CompactBuffer(2, 3))
(4,CompactBuffer(100),CompactBuffer(100),CompactBuffer())
(3,CompactBuffer(11, 12),CompactBuffer(11, 12),CompactBuffer())
(10,-1,CompactBuffer(),CompactBuffer(111))
(2,CompactBuffer(10, 10),CompactBuffer(10, 10),CompactBuffer())
2.4 Option[Boolean]应用
scala> val myMap: Map[String, Boolean] = Map("key1" -> true)
scala> val myMap2 = myMap + ("key2"-> false)
scala> val c1 = myMap.get("a")
c1: Option[Boolean] = None
scala> val c2 = myMap2.get("key2")
c2: Option[Boolean] = Some(false)
scala> val c3 = myMap.get("key1")
c3: Option[Boolean] = Some(true)
scala> def getAOrB(a: Boolean, b: Boolean) : Option[Boolean]= {
if (a == null && b == null) None
else if (a != null && b == null) Some(a)
else if (a == null && b != null) Some(b)
else if (a != null && b != null) Some(a || b)
else None
}
scala> getAOrB(c3,c2)
res72: Option[Boolean] = Some(true)
scala> getAOrB(c3,c3)
res73: Option[Boolean] = Some(true)
scala> getAOrB(c1,c1)
res79: Option[Boolean] = None
scala> getAOrB(c1,c1).isDefined
res80: Boolean = false
2.5 注意Long与Int
- 小坑介绍: 溢出
- 实例
scala> import org.joda.time.DateTime
scala> val l3 = new DateTime("20171010").getMillis
l3: Long = 636473427120000000
scala> val l4 = l3 - 28 * 86400 * 1000 // 这里, 28 * 86400 * 1000 溢出得负值, 所以 结果: l4 < l3
l4: Long = 636473428995767296
scala> l3 > l4
res132: Boolean = false
scala> val l4 = l3 - 28 * 86400 * 1000L
l4: Long = 636473424700800000
scala> l3 > l4
res133: Boolean = true
2.6 进入某个版本scala
sbt console //进入scala
scala> scalaVersion := "2.10.4"
2.7 Array操作
// 目的: 学习本地生成数组, 用于语法测试
scala> val arr1:Array[(String, Int)] = Array(("a1", 1))
arr1: Array[(String, Int)] = Array((a1,1))
scala> val arr2:Array[(String, Int)] = Array(("a1", 1))
arr2: Array[(String, Int)] = Array((a1,1))
// 数组运算
scala> arr1 ++ arr2
res53: Array[(String, Int)] = Array((a1,1), (a1,1))
// 数组转RDD
scala> val rddArr = sc.parallelize(arr1, 1)
rddArr: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[54] at parallelize at <console>:27
2.8 Map
// 本地产生map
scala> val mymap: Map[Int, Array[Int]] = Map(12->Array(1,2,3,4,5))
mymap: Map[Int,Array[Int]] = Map(12 -> Array(1, 2, 3, 4, 5))
scala> val mymap2 = Map(10->Array(100,99,2,3,4,5))
mymap2: scala.collection.immutable.Map[Int,Array[Int]] = Map(10 -> Array(100, 99, 2, 3, 4, 5))
scala> val mymap3 = mymap ++ mymap2
mymap3: scala.collection.immutable.Map[Int,Array[Int]] = Map(12 -> Array(1, 2, 3, 4, 5), 10 -> Array(100, 99, 2, 3, 4, 5))
2.9 Array二维数组进行flatMap
scala> val a =Array(Array(1,2,3), Array(4,5,6), Array(7,8,9))
a: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))
scala> val rddTest = sc.parallelize(a)
rddTest: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[163] at parallelize at <console>:28
scala> a.flatMap{case x=> x.map(y=>y+100)}
res253: Array[Int] = Array(101, 102, 103, 104, 105, 106, 107, 108, 109)
scala> a.flatMap{case x=> x.map(x=> x*30)}.foreach(println)
30
60
90
120
150
180
210
240
270
# 对于格式为(String, Array[Int]) , 希望flatMap展开, 如:
scala> val c = Array( ("a", Array(1, 2, 3)), ("b", Array(4, 5, 6)), ("c", Array(7, 8, 9)))
c: Array[(String, Array[Int])] = Array((a,Array(1, 2, 3)), (b,Array(4, 5, 6)), (c,Array(7, 8, 9)))
scala> val c1 = sc.parallelize(c)
c1: org.apache.spark.rdd.RDD[(String, Array[Int])] = ParallelCollectionRDD[6] at parallelize at <console>:28
# 方式1:
scala> c1.flatMap{case(x, y) => y}.collect
res89: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
# 方式2:
scala> c1.flatMap{case(x, y) => y.map{m=> (1,m)}}.collect
res90: Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5), (1,6), (1,7), (1,8), (1,9))
# 方式3:
scala> c1.flatMap{case(x, y) => y.map{m=> (x,m)}}.collect
res91: Array[(String, Int)] = Array((a,1), (a,2), (a,3), (b,4), (b,5), (b,6), (c,7), (c,8), (c,9))
# 注意,flatMap中再嵌套flatMap
rdd.flatMap { case (_, status, list) => // 这里展开的map为list
list.flatMap { dayTime => // 这里展开的map为status
status.map { case (x, y) => // 这里存在map
val s = x.getStatus(dayTime)
(dayTime, s)
}
}
}
2.10 匿名函数
// 如下三个匿名函数: 可理解为,函数名为var生命的变量名,变量类型也为标量var对应的类型
// 如下,即匿名函数变量m, 参数输入为String, 函数体为=>之后的部分,返回String
scala> var m:String => String = {i=>
| (for(j<- 1 to i.toInt) yield j).mkString("\t")}
m: String => String = <function1>
scala> m("3")
res1: String = 1 2 3
// 如下,即匿名函数变量x, 参数输入为Int, 函数体为=>之后的部分,返回Int, 函数体内_表示变量
scala> var x:Int => Int = {_*10}
x: Int => Int = <function1>
scala> x(9)
res12: Int = 90
// 如下,即匿名函数变量x, 参数输入为Int, 函数体为=>之后的部分,返回String, 函数体内_表示变量
scala> var x:Int => String = {_.toString+"hi"}
x: Int => String = <function1>
scala> x(100)
res14: String = 100hi
// 声明一个新的var变量函数y,然后将原来的x赋值给y,我就变成x指向的函数
scala> var y:Int => String = {k => (for (i <- 1 to k) yield i).mkString("\t")}
y: Int => String = <function1>
scala> y(5)
res19: String = 1 2 3 4 5
scala> y = x
y: Int => String = <function1>
scala> x(3)
res20: String = 3hi
2.11 yield用法
- for 循环中的 yield 会把当前的元素记下来,保存在集合中,循环结束后将返回该集合。Scala 中 for 循环是有返回值的。如果被循环的是 Map,返回的就是 Map,被循环的是 List,返回的就是 List,以此类推。
scala> for (i <- 1 to 5) yield i
res15: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3, 4, 5)
scala> for (i <- 1 to 5) yield i * 2
res11: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 4, 6, 8, 10)
- 之前提到过 for 循环 yield 会构造并返回与给定集合相同类型的集合. 为此, 我们来看看以下用 Scala 数组上的例子. 注意把 yield(我们可以把 yield 用作一个动词) 出来的集合类型与前面的几个例子对比:
scala> val a = Array(1, 2, 3, 4, 5)
a: Array[Int] = Array(1, 2, 3, 4, 5)
scala> for (e <- a) yield e
res5: Array[Int] = Array(1, 2, 3, 4, 5)
scala> for (e <- a) yield e * 2
res6: Array[Int] = Array(2, 4, 6, 8, 10)
scala> for (e <- a) yield e % 2
res7: Array[Int] = Array(1, 0, 1, 0, 1)
- 先学先用
scala> val i = 10
i: Int = 10
scala> for(j<- 1 to i) yield j
res16: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> (for(j<- 1 to i) yield j).mkString("\t")
res17: String = 1 2 3 4 5 6 7 8 9 10
2.12 Array[Byte] <==> String
scala> val b1: Byte = 98
b1: Byte = 98
scala> val b2: Byte = 97
b2: Byte = 97
scala> val bArr = Array(b1, b2)
bArr: Array[Byte] = Array(98, 97)
scala> val bStr: String = new java.lang.String(bArr)
bStr: String = ba
2.13 HashMap
scala> import scala.collection.mutable
import scala.collection.mutable
scala> val a = new mutable.HashMap[String, String]()
a: scala.collection.mutable.HashMap[String,String] = Map()
scala> a.put("a", "valueA")
res61: Option[String] = None
scala> a
res62: scala.collection.mutable.HashMap[String,String] = Map(a -> valueA)
scala> a.update("a", "aaaa")
scala> a
res66: scala.collection.mutable.HashMap[String,String] = Map(b -> valueB, a -> aaaa)
scala> val a2 = a.map{ case(k,v) => val v2 = if (k == "a") "100" else v;(k, v2) }
a2: scala.collection.mutable.HashMap[String,String] = Map(b -> valueB, a -> 100)
scala> mutable.HashMap("x" -> 24, "y" -> 25, "z" -> 26)
res84: scala.collection.mutable.HashMap[String,Int] = Map(z -> 26, y -> 25, x -> 24)
2.14 正则替换
scala> def expand(path: String): String = {
| val pattern1 = "(.*)\\$\\{date\\}(.*)".r
| val pattern2 = "(.*)\\$\\{date-([0-9]+)\\}(.*)".r
| path match {
| case pattern1(prefix, suffix) => prefix + new DateTime().toString("yyyyMMdd") + suffix
| case pattern2(prefix, minusDays, suffix) => prefix +
| new DateTime().minusDays(minusDays.toInt).toString("yyyyMMdd") + suffix
| case _ => path
| }
| }
expand: (path: String)String
scala> expand("/zhang/haha")
res80: String = /zhan/haha
scala> expand("/zhan/haha/${date}")
res81: String = /zhan/haha/20180130
scala> expand("/zhan/haha/${date-1}")
res82: String = /zhan/haha/20180129
scala> expand("/zhan/haha/${date-10}")
res83: String = /zhan/haha/20180120
2.15 Scala: String 与 Byte转化
scala> val st = "abc:999:opq"
st: String = abc:999:opq
scala> st.getBytes
res2: Array[Byte] = Array(97, 98, 99, 58, 57, 57, 57, 58, 111, 112, 113)
scala> byte.toString
res3: String = [B@13b666b4
scala> val bToS = new String(byte)
bToS: String = abc:999:opq
2.16 排序: sort
# sortBy函数默认升序排列, 可自定义排序方式
# 按照两列排序
val l = List((1,2, 4), (2,5, 9), (2, 10, 50), (0, 1, 6), (-1. 7, 10), (-2, 0, 9))
rdd = sc.parallelize(l)
scala> rdd.sortBy(r => (r._1, r._2), ascending=false).collect
res64: Array[(Int, Int, Int)] = Array((2,20,4), (2,10,50), (2,5,9), (2,1,6), (-1,7,10), (-1,0,9))
2.17 reduceByKey vs groupByKey vs aggregateByKey
# reduceByKey 深度玩
val rdd5 = sc.parallelize(List((100,1), (101,9), (100, 11), (101, 8)))
val rdd6 = rdd5.map{ case(t1, t2)=> val t3 = new ListBuffer[Int]() ; t3.append(t2); (t1, t3) }
rdd6.reduceByKey{case (t1, t2) => val l = new ListBuffer[Int](); l.append(t1(0));l.append(t2(0)); l}.collect
res95: Array[(Int, scala.collection.mutable.ListBuffer[Int])] = Array((100,ListBuffer(1, 11)), (101,ListBuffer(9, 8)))
2.18 match
# 实例
a match {
case 10 => println("ten")
case x if x > 10 => println("greater than ten")
case _ => println("less than ten")
}
# 正在匹配提取版本号
def getMajorVersion(version: String): Any = {
val regex = """([^\.]*)\.([^\.]*)\.(.*)""".r
val stableRegex = "^V".r
val devRegex = "^\\d".r
val majorVersion = version match {
case regex(v1, v2, v3) if devRegex.findPrefixOf(version).isDefined => v1 + "." + v2 + "." + v3
case regex(v1, v2, _) if stableRegex.findPrefixOf(version).isDefined => v1 + "." + v2
case _ => null
}
majorVersion
}
# 优化上述代码:
def getMajorVersion(version: String): Any = {
val stableRegex = """^V([^\.]*)\.([^\.]*)\.(.*)""".r
// 注意正则
// val devRegex = """^\\d([^\.]*)\.([^\.]*)\.(.*)""".r
val devRegex = """([0-9]*)\.([0-9]*)\.(.*)""".r
version match {
case stableRegex(v1, v2, _) => "V" + v1 + "." + v2
case devRegex(v1, v2, v3) => v1 + "." + v2 + "." + v3
case _ => null
}
}
2.19 map与mapPartitions
# 与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。
# 如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
# 举例:
val a = sc.parallelize(1 to 10, 3)
//定义两个输入变换函数,它们的作用均是将rdd a中的元素值翻倍
//map的输入函数,其参数e为rdd元素值
def myfuncPerElement(e:Int):Int = {
println("e="+e)
e*2
}
//mapPartitions的输入函数。iter是分区中元素的迭代子,返回类型也要是迭代子
def myfuncPerPartition ( iter : Iterator [Int] ) : Iterator [Int] = {
println("run in partition")
var res = for (e <- iter ) yield e*2
res
}
# map 如下: 输出10次
val b = a.map(myfuncPerElement).collect
>>
e=4
e=1
e=5
e=8
e=6
e=2
e=9
e=3
e=10
# mapPartitions, 输出3次
val c = a.mapPartitions(myfuncPerPartition).collect
>>
run in partition
run in partition
run in partition
# 另外一个mapPartitions写法:
scala> a.mapPartitions{itr =>
| println("haha");
| itr.map(x=>x*2)}.collect
haha
haha
haha
res112: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20.
2.20 view in scala
# 让我们看一个例子。假设你有一个带有int型数据的vector对象,你想用map函数对它进行两次连续的操作
scala> val v = Vector(1 to 10: _*)
v: scala.collection.immutable.Vector[Int] =
Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> v map (_ + 1) map (_ * 2)
res5: scala.collection.immutable.Vector[Int] =
Vector(4, 6, 8, 10, 12, 14, 16, 18, 20, 22)
# 最后一条语句中,表达式v map (_ + 1) 构建了一个新的vector对象,该对象被map第二次调用(_ * 2)而转换成第3个vector对象。
# 很多情况下,从map的第一次调用构造一个中间结果有点浪费资源。尝试view:
scala> (v.view map (_ + 1) map (_ * 2)).force
res12: Seq[Int] = Vector(4, 6, 8, 10, 12, 14, 16, 18, 20, 22)
2.21 yield用法
yield的主要作用是记住每次迭代中的有关值,并逐一存入到一个数组中。
用法如下:
for {子句} yield {变量或表达式}
注意: 要将结果存放到数组的变量或表达式必须放在yield{}里最后位置
scala> val a = for (i<- 1.to(10,1)) yield i+100
scala> a.foreach(println)
101
102
103
104
105
106
107
108
109
110
2.22 for循环替代flatMap
# 如下: 对应yield用法参考上面介绍
scala> val l = List(List(1,2), List(3,4))
l: List[List[Int]] = List(List(1, 2), List(3, 4))
scala> for { x <- l; y <-x} yield y
res60: List[Int] = List(1, 2, 3, 4)
3. spark shell
3.1 spark shell启动前导入jar
- 方法:
# 第一步:
spark-shell --jars /home/xxx.jar # 如果有多个jar包, 需要中间用逗号隔开
# 第二步: 在交互界面, 手动输入需要import该jar包的对象
import com.xxx.xx.classname
4. spark 读取数据
4.1 SparkHBase
4.1.1 Spark读取Hbase Snapshot
# 实例:
val hConf = HBaseConfiguration.create()
hConf.set("hbase.rootdir", argMap.getOrElse("hbase.rootdir", "hdfs://clustername/somepath"))
val job = Job.getInstance(hConf)
val path = new Path(argMap.getOrElse("hbase.snapshot.path", "hdfs://clustername/"))
val snapName = argMap.getOrElse("hbase.snapshot.name", "snapshot.name")
TableSnapshotInputFormat.setInput(job, snapName, path)
// read hbase snapshot
val userGroupByRdd = sc.newAPIHadoopRDD(job.getConfiguration,
classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
5. 本地测试运行spark
- 目的: 在本地新建SparkConf运行程序
- 环境: idea intellij
- 错误处理: 报错: java.lang.NoClassDefFoundError: org/apache/spark/SparkConf
pom添加依赖, 类似如下:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
<scope>compile</scope>
</dependency>
如果, 仍然报错, 则在idea IDE添加配置:
菜单-> RUN -> Edit Configurations -> VM options: 填写如下字符:
-Dspark.master=local