Spark本地: Scala实例

1. 目的

在Spark提供的Scala环境, 编写实例, 测试Scala语法

2. 实例

2.1 读取本地文件

# test.txt如下:
abcd
heihei
heihei

# 读取文件, 并对每行出现频次进行统计, 使用reduceByKey
val lines =  sc.textFile("file:///home/xx/test.txt")
val pairs = lines.map(s=>(s,1))
pairs.reduceByKey((x, y)=>x+y).foreach(println)

# 输出结果如下
(abcd,1)
(heihei,2)

# reduceByKey 巧妙替代 groupByKey, 减少数据的shuffle
# 如下: 如果用groupByKey也可以实现, 需要把每个分组数据全部shuffle到一起后在sorted, 之后在去topN
scala> val a = List((1, 2), (1, 3), (1,4), (1,5), (2,10), (2,11), (2,12))
a: List[(Int, Int)] = List((1,2), (1,3), (1,4), (1,5), (2,10), (2,11), (2,12))
val N = 3
# 下面是获取分组后, 排序取top N
rdda.map { case (name, time) =>
                val listBuffer = new ListBuffer[Long]()
                listBuffer.append(time)
                (name, listBuffer)
            }.reduceByKey { case (t1, t2) =>
                val listBuffer = new ListBuffer[Long]()
                listBuffer.appendAll(t1)
                // listBuffer.append(t2) // 注意 reduceByKey, t1, t2可能代表已经汇聚完的值, 所以要appendAll
                listBuffer.appendAll(t2)
                listBuffer.sorted.take(N)
            }.collect
# 上面更好的方法, 是用List代替ListBuffer, 见下:
rdd.map { case (name, time) =>
                (name, List(time))
            }.reduceByKey { case (t1, t2) =>
                (t1 ++ t2).sorted.take(threshold)
            }.collect

2.2 join实例

val a1 = List((2,(200,300)), (3,(400,500)), (4,(500,600)))
val a2 = List((2,(200,300)), (3,(400,500)), (1,(500,600)))
val rdd1 = sc.parallelize(a1)
val rdd2 = sc.parallelize(a2)
rdd1.leftOuterJoin(rdd2).foreach(println)

// 关于Option的处理:
val rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.map{ case (id,(x,y))=> 
val (y3,y2) = y.getOrElse((null, null))
(id, x._1, x._2, y3, y2)
}.foreach(println)

// 打印输出结果:
(4,500,600,null,null)
(3,400,500,400,500)
(2,200,300,200,300)

// 交互输入如下:
scala> val a1 = List((2,(200,300)), (3,(400,500)), (4,(500,600)))
a1: List[(Int, (Int, Int))] = List((2,(200,300)), (3,(400,500)), (4,(500,600)))

scala> val a2 = List((2,(200,300)), (3,(400,500)), (1,(500,600)))
a2: List[(Int, (Int, Int))] = List((2,(200,300)), (3,(400,500)), (1,(500,600)))

scala> val rdd1 = sc.parallelize(a1)
rdd1: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ParallelCollectionRDD[14] at parallelize at <console>:26

scala> val rdd2 = sc.parallelize(a2)
rdd2: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ParallelCollectionRDD[15] at parallelize at <console>:26

scala> rdd1.join(rdd2).foreach(println)
(3,((400,500),(400,500)))
(2,((200,300),(200,300)))

scala> rdd2.leftOuterJoin(rdd1).foreach(println)
(3,((400,500),Some((400,500))))
(2,((200,300),Some((200,300))))
(1,((500,600),None))

2.2 groupByKey实例

// 目的: 生成可以进行groupByKey的rdd
var arr1 = new Array[(Int, Int)](10)
arr1(0) = (1,1)
arr1(1) = (1,2)
arr1(2) = (2,1)
arr1(3) = (3,1)
arr1(4) = (10,1)
val rddA = sc.parallelize(arr1.filter(_ != null).map{case (n1,n2)=>(n1,n2)})
rddA.groupByKey().foreach(println)

2.3 cogroup实例

scala> val rdd1 = sc.parallelize(List((1,2),(1,3),(1,4),(2,10),(2,10),(3,11),(3,12),(4,100)))
scala> val rdd2 = sc.parallelize(List((1,2), (1,3),(10,111)))
scala> rdd1.cogroup(rdd2).map{case (id, (f1,f2)) =>
     | val f = if (f1.isEmpty) -1 else f1
     | (id, f, f1, f2)}.foreach(println)
(1,CompactBuffer(2, 3, 4),CompactBuffer(2, 3, 4),CompactBuffer(2, 3))
(4,CompactBuffer(100),CompactBuffer(100),CompactBuffer())
(3,CompactBuffer(11, 12),CompactBuffer(11, 12),CompactBuffer())
(10,-1,CompactBuffer(),CompactBuffer(111))
(2,CompactBuffer(10, 10),CompactBuffer(10, 10),CompactBuffer())

2.4 Option[Boolean]应用

scala> val myMap: Map[String, Boolean] = Map("key1" -> true)
scala> val myMap2 = myMap + ("key2"-> false)
scala> val c1 = myMap.get("a")
c1: Option[Boolean] = None
scala> val c2 = myMap2.get("key2")
c2: Option[Boolean] = Some(false)
scala> val c3 = myMap.get("key1")
c3: Option[Boolean] = Some(true)
scala> def getAOrB(a: Boolean, b: Boolean) :  Option[Boolean]= {
    if (a == null && b == null) None
    else if (a != null && b == null) Some(a)
    else if (a == null && b != null) Some(b)
    else if (a != null && b != null) Some(a || b)
    else None
}

scala> getAOrB(c3,c2)
res72: Option[Boolean] = Some(true)
scala> getAOrB(c3,c3)
res73: Option[Boolean] = Some(true)
scala> getAOrB(c1,c1)
res79: Option[Boolean] = None

scala> getAOrB(c1,c1).isDefined
res80: Boolean = false

2.5 注意Long与Int

  • 小坑介绍: 溢出
  • 实例
scala> import org.joda.time.DateTime

scala> val l3 = new DateTime("20171010").getMillis
l3: Long = 636473427120000000

scala> val l4 = l3 - 28 * 86400 * 1000  // 这里, 28 * 86400 * 1000 溢出得负值, 所以 结果: l4 < l3
l4: Long = 636473428995767296

scala> l3 > l4
res132: Boolean = false

scala> val l4 = l3 - 28 * 86400 * 1000L
l4: Long = 636473424700800000

scala> l3 > l4
res133: Boolean = true

2.6 进入某个版本scala

sbt console  //进入scala
scala> scalaVersion := "2.10.4"

2.7 Array操作

// 目的: 学习本地生成数组, 用于语法测试
scala> val arr1:Array[(String, Int)] = Array(("a1", 1))
arr1: Array[(String, Int)] = Array((a1,1))

scala> val arr2:Array[(String, Int)] = Array(("a1", 1))
arr2: Array[(String, Int)] = Array((a1,1))

// 数组运算
scala> arr1 ++ arr2
res53: Array[(String, Int)] = Array((a1,1), (a1,1))

// 数组转RDD
scala> val rddArr = sc.parallelize(arr1, 1)
rddArr: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[54] at parallelize at <console>:27

2.8 Map

// 本地产生map
scala> val mymap: Map[Int, Array[Int]] = Map(12->Array(1,2,3,4,5))
mymap: Map[Int,Array[Int]] = Map(12 -> Array(1, 2, 3, 4, 5))
scala> val mymap2 =  Map(10->Array(100,99,2,3,4,5))
mymap2: scala.collection.immutable.Map[Int,Array[Int]] = Map(10 -> Array(100, 99, 2, 3, 4, 5))

scala> val mymap3 = mymap ++ mymap2
mymap3: scala.collection.immutable.Map[Int,Array[Int]] = Map(12 -> Array(1, 2, 3, 4, 5), 10 -> Array(100, 99, 2, 3, 4, 5))

2.9 Array二维数组进行flatMap

scala> val a =Array(Array(1,2,3), Array(4,5,6), Array(7,8,9))
a: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))

scala> val rddTest = sc.parallelize(a)
rddTest: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[163] at parallelize at <console>:28

scala> a.flatMap{case x=> x.map(y=>y+100)}
res253: Array[Int] = Array(101, 102, 103, 104, 105, 106, 107, 108, 109)
scala> a.flatMap{case x=> x.map(x=> x*30)}.foreach(println)
30
60
90
120
150
180
210
240
270

# 对于格式为(String, Array[Int]) , 希望flatMap展开, 如:
scala> val c = Array( ("a", Array(1, 2, 3)), ("b", Array(4, 5, 6)),  ("c", Array(7, 8, 9)))
c: Array[(String, Array[Int])] = Array((a,Array(1, 2, 3)), (b,Array(4, 5, 6)), (c,Array(7, 8, 9)))
scala> val c1 = sc.parallelize(c)
c1: org.apache.spark.rdd.RDD[(String, Array[Int])] = ParallelCollectionRDD[6] at parallelize at <console>:28

# 方式1:
scala> c1.flatMap{case(x, y) => y}.collect
res89: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
# 方式2:
scala> c1.flatMap{case(x, y) => y.map{m=> (1,m)}}.collect
res90: Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5), (1,6), (1,7), (1,8), (1,9))
# 方式3:
scala> c1.flatMap{case(x, y) => y.map{m=> (x,m)}}.collect
res91: Array[(String, Int)] = Array((a,1), (a,2), (a,3), (b,4), (b,5), (b,6), (c,7), (c,8), (c,9))


# 注意,flatMap中再嵌套flatMap
rdd.flatMap { case (_, status, list) =>    // 这里展开的map为list
            list.flatMap { dayTime =>          // 这里展开的map为status
                status.map { case (x, y) =>   // 这里存在map
                    val s = x.getStatus(dayTime)
                    (dayTime, s)
                }
            }
        }

2.10 匿名函数

// 如下三个匿名函数: 可理解为,函数名为var生命的变量名,变量类型也为标量var对应的类型
// 如下,即匿名函数变量m, 参数输入为String, 函数体为=>之后的部分,返回String
scala> var m:String  => String = {i=>
     | (for(j<- 1 to i.toInt) yield j).mkString("\t")}
m: String => String = <function1>

scala> m("3")
res1: String = 1    2   3

// 如下,即匿名函数变量x, 参数输入为Int, 函数体为=>之后的部分,返回Int, 函数体内_表示变量
scala> var x:Int => Int = {_*10}
x: Int => Int = <function1>

scala> x(9)
res12: Int = 90

// 如下,即匿名函数变量x, 参数输入为Int, 函数体为=>之后的部分,返回String, 函数体内_表示变量
scala> var x:Int => String = {_.toString+"hi"}
x: Int => String = <function1>

scala> x(100)
res14: String = 100hi


// 声明一个新的var变量函数y,然后将原来的x赋值给y,我就变成x指向的函数
scala> var y:Int => String = {k => (for (i <- 1 to k) yield i).mkString("\t")}
y: Int => String = <function1>

scala> y(5)
res19: String = 1   2   3   4   5

scala> y = x
y: Int => String = <function1>

scala> x(3)
res20: String = 3hi

2.11 yield用法

  • for 循环中的 yield 会把当前的元素记下来,保存在集合中,循环结束后将返回该集合。Scala 中 for 循环是有返回值的。如果被循环的是 Map,返回的就是 Map,被循环的是 List,返回的就是 List,以此类推。
scala> for (i <- 1 to 5) yield i
res15: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3, 4, 5)
scala> for (i <- 1 to 5) yield i * 2
res11: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 4, 6, 8, 10)

  • 之前提到过 for 循环 yield 会构造并返回与给定集合相同类型的集合. 为此, 我们来看看以下用 Scala 数组上的例子. 注意把 yield(我们可以把 yield 用作一个动词) 出来的集合类型与前面的几个例子对比:
scala> val a = Array(1, 2, 3, 4, 5)
a: Array[Int] = Array(1, 2, 3, 4, 5)
 
scala> for (e <- a) yield e
res5: Array[Int] = Array(1, 2, 3, 4, 5)
 
scala> for (e <- a) yield e * 2
res6: Array[Int] = Array(2, 4, 6, 8, 10)
 
scala> for (e <- a) yield e % 2
res7: Array[Int] = Array(1, 0, 1, 0, 1)
  • 先学先用
scala> val i = 10
i: Int = 10

scala> for(j<- 1 to i) yield j
res16: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> (for(j<- 1 to i) yield j).mkString("\t")
res17: String = 1   2   3   4   5   6   7   8   9   10

2.12 Array[Byte] <==> String

scala> val b1: Byte = 98
b1: Byte = 98

scala> val b2: Byte = 97
b2: Byte = 97

scala> val bArr = Array(b1, b2)
bArr: Array[Byte] = Array(98, 97)

scala> val bStr: String = new java.lang.String(bArr)
bStr: String = ba

2.13 HashMap

scala> import scala.collection.mutable
import scala.collection.mutable
scala> val a = new mutable.HashMap[String, String]()
a: scala.collection.mutable.HashMap[String,String] = Map()
scala> a.put("a", "valueA")
res61: Option[String] = None

scala> a
res62: scala.collection.mutable.HashMap[String,String] = Map(a -> valueA)
scala> a.update("a", "aaaa")

scala> a
res66: scala.collection.mutable.HashMap[String,String] = Map(b -> valueB, a -> aaaa)

scala> val a2 = a.map{ case(k,v) => val v2 = if (k == "a")  "100" else v;(k, v2) }
a2: scala.collection.mutable.HashMap[String,String] = Map(b -> valueB, a -> 100)
scala> mutable.HashMap("x" -> 24, "y" -> 25, "z" -> 26)
res84: scala.collection.mutable.HashMap[String,Int] = Map(z -> 26, y -> 25, x -> 24)

2.14 正则替换

scala> def expand(path: String): String = {
     |         val pattern1 = "(.*)\\$\\{date\\}(.*)".r
     |         val pattern2 = "(.*)\\$\\{date-([0-9]+)\\}(.*)".r
     |         path match {
     |             case pattern1(prefix, suffix) => prefix + new DateTime().toString("yyyyMMdd") + suffix
     |             case pattern2(prefix, minusDays, suffix) => prefix +
     |                     new DateTime().minusDays(minusDays.toInt).toString("yyyyMMdd") + suffix
     |             case _ => path
     |         }
     |     }
expand: (path: String)String

scala> expand("/zhang/haha")
res80: String = /zhan/haha

scala> expand("/zhan/haha/${date}")
res81: String = /zhan/haha/20180130

scala> expand("/zhan/haha/${date-1}")
res82: String = /zhan/haha/20180129

scala> expand("/zhan/haha/${date-10}")
res83: String = /zhan/haha/20180120

2.15 Scala: String 与 Byte转化

scala> val st = "abc:999:opq"
st: String = abc:999:opq

scala> st.getBytes
res2: Array[Byte] = Array(97, 98, 99, 58, 57, 57, 57, 58, 111, 112, 113)

scala> byte.toString
res3: String = [B@13b666b4

scala> val bToS = new String(byte)
bToS: String = abc:999:opq

2.16 排序: sort

# sortBy函数默认升序排列, 可自定义排序方式
# 按照两列排序
val l = List((1,2, 4), (2,5, 9), (2, 10, 50), (0, 1, 6), (-1. 7, 10), (-2, 0, 9))
rdd = sc.parallelize(l)
scala> rdd.sortBy(r => (r._1, r._2), ascending=false).collect
res64: Array[(Int, Int, Int)] = Array((2,20,4), (2,10,50), (2,5,9), (2,1,6), (-1,7,10), (-1,0,9))

2.17 reduceByKey vs groupByKey vs aggregateByKey

# reduceByKey 深度玩
val rdd5 =  sc.parallelize(List((100,1), (101,9), (100, 11), (101, 8)))
val rdd6 = rdd5.map{ case(t1, t2)=> val t3 = new ListBuffer[Int]() ; t3.append(t2); (t1, t3) }
rdd6.reduceByKey{case (t1, t2) => val l = new ListBuffer[Int](); l.append(t1(0));l.append(t2(0)); l}.collect

res95: Array[(Int, scala.collection.mutable.ListBuffer[Int])] = Array((100,ListBuffer(1, 11)), (101,ListBuffer(9, 8)))

2.18 match

# 实例
a match {
    case 10 => println("ten")
    case x if x > 10 => println("greater than ten")
    case _ => println("less than ten")
}

# 正在匹配提取版本号
    def getMajorVersion(version: String): Any = {
        val regex = """([^\.]*)\.([^\.]*)\.(.*)""".r
        val stableRegex = "^V".r
        val devRegex = "^\\d".r

        val majorVersion = version match {
            case regex(v1, v2, v3) if devRegex.findPrefixOf(version).isDefined => v1 + "." + v2 + "." + v3
            case regex(v1, v2, _) if stableRegex.findPrefixOf(version).isDefined => v1 + "." + v2
            case _ => null
        }
        majorVersion
    }

# 优化上述代码:
    def getMajorVersion(version: String): Any = {
        val stableRegex = """^V([^\.]*)\.([^\.]*)\.(.*)""".r
        // 注意正则
        // val devRegex = """^\\d([^\.]*)\.([^\.]*)\.(.*)""".r
        val devRegex = """([0-9]*)\.([0-9]*)\.(.*)""".r

        version match {
            case stableRegex(v1, v2, _) => "V" + v1 + "." + v2
            case devRegex(v1, v2, v3) => v1 + "." + v2 + "." + v3
            case _ => null
        }
    }

2.19 map与mapPartitions

# 与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。
# 如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
# 举例:
   val a = sc.parallelize(1 to 10, 3)
    //定义两个输入变换函数,它们的作用均是将rdd a中的元素值翻倍
    //map的输入函数,其参数e为rdd元素值   
    def myfuncPerElement(e:Int):Int = {
           println("e="+e)
           e*2
      }
     //mapPartitions的输入函数。iter是分区中元素的迭代子,返回类型也要是迭代子
    def myfuncPerPartition ( iter : Iterator [Int] ) : Iterator [Int] = {
         println("run in partition")
         var res = for (e <- iter ) yield e*2
          res
    }

# map 如下: 输出10次
    val b = a.map(myfuncPerElement).collect
>>
e=4
e=1
e=5
e=8
e=6
e=2
e=9
e=3
e=10

# mapPartitions, 输出3次
    val c =  a.mapPartitions(myfuncPerPartition).collect
>>
run in partition
run in partition
run in partition

# 另外一个mapPartitions写法:
scala> a.mapPartitions{itr => 
     | println("haha");
     | itr.map(x=>x*2)}.collect
haha
haha
haha
res112: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20.

2.20 view in scala

# 让我们看一个例子。假设你有一个带有int型数据的vector对象,你想用map函数对它进行两次连续的操作
scala> val v = Vector(1 to 10: _*)
v: scala.collection.immutable.Vector[Int] =
   Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> v map (_ + 1) map (_ * 2)
res5: scala.collection.immutable.Vector[Int] =
   Vector(4, 6, 8, 10, 12, 14, 16, 18, 20, 22)

# 最后一条语句中,表达式v map (_ + 1) 构建了一个新的vector对象,该对象被map第二次调用(_ * 2)而转换成第3个vector对象。
# 很多情况下,从map的第一次调用构造一个中间结果有点浪费资源。尝试view:
scala> (v.view map (_ + 1) map (_ * 2)).force
res12: Seq[Int] = Vector(4, 6, 8, 10, 12, 14, 16, 18, 20, 22)  

2.21 yield用法

yield的主要作用是记住每次迭代中的有关值,并逐一存入到一个数组中。
用法如下:
for {子句} yield {变量或表达式}
注意: 要将结果存放到数组的变量或表达式必须放在yield{}里最后位置

scala> val a = for (i<- 1.to(10,1)) yield i+100
scala> a.foreach(println)
101
102
103
104
105
106
107
108
109
110

2.22 for循环替代flatMap

# 如下: 对应yield用法参考上面介绍
scala> val l = List(List(1,2), List(3,4))
l: List[List[Int]] = List(List(1, 2), List(3, 4))
scala> for { x <- l; y <-x} yield y
res60: List[Int] = List(1, 2, 3, 4)

3. spark shell

3.1 spark shell启动前导入jar

  • 方法:
# 第一步: 
spark-shell --jars /home/xxx.jar # 如果有多个jar包, 需要中间用逗号隔开
# 第二步: 在交互界面, 手动输入需要import该jar包的对象
import com.xxx.xx.classname

4. spark 读取数据

4.1 SparkHBase

4.1.1 Spark读取Hbase Snapshot

# 实例:
        val hConf = HBaseConfiguration.create()
        hConf.set("hbase.rootdir", argMap.getOrElse("hbase.rootdir", "hdfs://clustername/somepath"))

        val job = Job.getInstance(hConf)

        val path = new Path(argMap.getOrElse("hbase.snapshot.path", "hdfs://clustername/"))
        val snapName = argMap.getOrElse("hbase.snapshot.name", "snapshot.name")
        TableSnapshotInputFormat.setInput(job, snapName, path)

        // read hbase snapshot
        val userGroupByRdd = sc.newAPIHadoopRDD(job.getConfiguration,
            classOf[TableSnapshotInputFormat],
            classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
            classOf[org.apache.hadoop.hbase.client.Result])

5. 本地测试运行spark

  • 目的: 在本地新建SparkConf运行程序
  • 环境: idea intellij
  • 错误处理: 报错: java.lang.NoClassDefFoundError: org/apache/spark/SparkConf
pom添加依赖, 类似如下:
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>1.5.2</version>
  <scope>compile</scope>
</dependency>

如果, 仍然报错, 则在idea IDE添加配置:
菜单-> RUN -> Edit Configurations -> VM options: 填写如下字符:
-Dspark.master=local
    原文作者:leobupt
    原文地址: https://www.jianshu.com/p/81d381d2525b
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞