Spark 编程基础

RDD编程

RDD创建:
  • 第一种:读取一个外部数据集。比如,从本地文件加载数据集,或者从HDFS文件系统

    • 从文件系统中加载数据创建RDD
      • 从本地加载:
        scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
      • 从HDFS加载:
        scala> val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
        scala> val lines = sc.textFile("/user/hadoop/word.txt")
        scala> val lines = sc.textFile("word.txt")
        
  • 第二种:调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

    • 通过并行集合创建:
      cala>val array = sc.parallelize(Array(1,2,3,4,5))
RDD操作:
  • 转换操作:
    每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的
    并不会发生真正的计算,只是记录了转换的轨迹,只有遇到行动操作时,才会发生真正的计算。

    • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
    • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
    • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
    • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
    • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
  • 行动操作:
    行动操作是真正触发计算的地方。从文件中加载数据,完成一次又一次转换操作

    • count() 返回数据集中的元素个数
    • collect() 以数组的形式返回数据集中的所有元素
    • first() 返回数据集中的第一个元素
    • take(n) 以数组的形式返回数据集中的前n个元素
    • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
    • foreach(func) 将数据集中的每个元素传递到函数func中运行*

实例:

scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)
//可以写成:
scala> lines.map(_.split(" ").size)...

持久化:
在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。
如果需要调用多次 action(行动)操作,每次都要从头计算
每次计算经常需要多次重复使用同一组数据,这样就可以采用持久化的方式

使用persist()方法对一个RDD标记为持久化
之所以称为 “标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,
而是要等到遇到第一个行动操作触发真正计算以后,才会对计算结果进行初始化

  • persist(MEMORY_ONLY):
    表示将RDD作为反序列化的对象存储于JVM中,内存不足,就要替换缓存中的内容

  • persist(MEMORY_AND_DISK):
    将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上

分区(以前不太理解,划重点!):
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。
分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,一个分区用一个cpu去处理

通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目
scala>val rdd = sc.parallelize(Array(1,2,3,4,5),2) #设置两个分区

打印元素:
一般会采用语句rdd.foreach(println)或者rdd.map(println)

但是如果有多个节点,在一个节点上运行上述 只输出在这个节点上的元素
通过 rdd.collect().foreach(println) collect方式输出所有的元素

这可能会导致内存溢出。因此,当你只需要打印RDD的部分元素时,
可以采用语句rdd.take(100).foreach(println)。

键值对RDD

  • 键值对就是一组 map(k,value)的集合
    • 第一种创建方式,从文件中加载:

      val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
      val pairRDD = lines.flatMap(_split(" ")).map((_,1))
      
    • 第二种创建方式,通过并行集合创建:

      val rdd = sc.parallelize(List("Hadoop","Spark","Hive","Spark"))
      val pairRDD = rdd.map((_,1))
      
    • 常用的键值对转换操作

      • reduceByKey(func)
        使用func函数合并具有相同键的值
        比如,reduceByKey((a, b) => a + b) 有四个键值对(“spark”, 1) (“spark”, 2)(“hadoop”, 3) (“hadoop”, 5)
        合并后的结果就是:(“spark”, 3)、(“hadoop”, 8)。

      • groupByKey()
        对具有相同键的值进行分组
        上面例子采用groupByKey()后得到的结果是:(“spark”, (1, 2)) 和
        (“hadoop”, (3, 5))。

      • keys
        只会把键值对RDD中的key返回形成一个新的RDD
        上面例子采用keys得到的结果为 {“spark”, ”spark”, ”hadoop”, ”hadoop”}。

      • values
        只会把键值对RDD中的value返回形成一个新的RDD
        上面例子结果为:{1, 2, 3, 5}

      • sortByKey()
        返回一个根据键排序的RDD
        (Hadoop, 1) (Hive, 1) (Spark, 1) (Spark, 1)

      • mapValues(func)
        对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化
        pairRDD.mapValues(_ + 1)
        (Hadoop, 4) (Hadoop, 5) (“spark”, 2) (Spark, 3)

      • join
        常用为内连接,对于给定的两个输入数据集(K,V1)和(K,V2),
        只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

        举个比较复杂的例子:

        val pairRDD1 = sc.parallelize(Array(("aa",2),("bb",10),("bb",3),("cc",4)))
        val pairRDD2 = sc.parallelize(Array(("bb","haha"),("bb","iiii"),("cc",5),("cc","yyy")))
        
        执行 join:
            pairRDD1.join(pairRDD2).foreach(println)
        结果:
            (bb,(10,haha))
            (bb,(10,iiii))
            (bb,(3,haha))
            (bb,(3,iiii))
            (cc,(4,5))
            (cc,(4,yyy))
        
    • 一个综合实例
      给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),
      键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,
      也就是计算每种图书的每天平均销量。

      val rdd = sc.parallelize(Array(("“spark”",2),("“hadoop”",10),("“hadoop”",3),("“spark”",4)))
      rdd.mapValues(x => (x, 1)).reduceByKey((x,y) => (x._1 + y._1, x._2 +y._2))
          .mapValues(x => (x._1 / x._2)).collect()
      

共享变量

当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,
它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。
但是,有时候,需要在多个任务之间共享变量

为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)
广播变量用来把变量在所有节点的内存之间进行共享。
累加器则支持在所有不同节点之间进行累加计算(比如计数或者求和)。

广播变量

允许程序开发人员在每个机器上缓存一个 只读 的变量
val broadcastVar = sc.broadcast(Array(1, 2, 3))

累加器

通常可以被用来实现计数器(counter)和求和(sum)
一个数值型的累加器,可以通过调用SparkContext.longAccumulator(),或者doubleAccumulator()来创建。

只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值,
在节点中只是进行累加操作:

val accum = sc.longAccumulator("My Accumulator") //指定这个累加器的名字
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

文件数据读写

不同文件格式的读写

  • 文本文件

    scala> val textFile = sc.textFile("file:///zyb/word.txt") //读
    scala> textFile.saveAsTextFile("file:///zyb/writeback.txt") //写
    
  • JSON

    people.json文件的内容如下:
        {"name":"Michael"}
        {"name":"Andy", "age":30}
        {"name":"Justin", "age":19}
        
    val jsonStr = sc.textFile("file:///zyb/people.json")
    //JSON.parseFull:以一个JSON字符串作为输入并进行解析,
    //如果解析成功则返回一个Some(map: Map[String, Any])
    jsonStrs.map(s => JSON.parseFull(s))
    result.foreach(println())
    
    结果为:
        Map(name -> Michael)
        Map(name -> Andy, age -> 30.0)
        Map(name -> Justin, age -> 19.0)
    

读写HBase

读取:
  • 首先创建一个HBase表

    hbase>  create 'student','info'
    hbase> put 'student','1','info:name','Xueqian'
    hbase> put 'student','1','info:gender','F'
    hbase> put 'student','1','info:age','23'
    //然后录入student表的第二个学生记录
    hbase> put 'student','2','info:name','Weiliang'
    hbase> put 'student','2','info:gender','M'
    hbase> put 'student','2','info:age','24'
    
  • 配置Spark
    把HBase的lib目录下的一些jar文件拷贝到Spark中

    • 编写程序读取HBase数据

      val conf = HBaseConfiguration.create()
      val sc = new SparkContext(new SparkConf())
      
      //设置查询的表名
      conf.set(TableInputFormat.INPUT_TABLE, "student")
      
      //通过调用newAPIHadoopRDD 得到一个RDD
      val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
          
      stuRDD.count() //第一次行动
      stuRDD.cache() //持久化
      
      //遍历输出,第二次行动
      stuRDD.foreach({ case (_,result) =>
          val key = Bytes.toString(result.getRow)
          val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
          val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
          val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
          println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
      })
      
    • 通过sbt打包编译
      在simple.sbt配置文件中,需要知道scalaVersion、spark-core、hbase-client、hbase-common、hbase-server的版本号

    • 执行后结果:

      Students RDD Count:2
      Row key:1 Name:Xueqian Gender:F Age:23
      Row key:2 Name:Weiliang Gender:M Age:24
      
写入:
```
val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")  
val sc = new SparkContext(sparkConf)

//调用 hadoopConfiguration set 来传入表的名字
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, "student")  

//创建一个job,来封装 student表的 列的数据格式
val job = new Job(sc.hadoopConfiguration)  
job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
job.setOutputValueClass(classOf[Result])    
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    

//创建一个RDD,用来插入到表中
val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //构建两行记录


val rdd = indataRDD.map(_.split(',')).map{arr => {
        //行健的值 
        val put = new Put(Bytes.toBytes(arr(0))) 
        
        //info:name列的值
        put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) 
        
        //info:gender列的值                
        put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
        
        //info:age列的值
        put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))  
        
        //生成一个 map
        (new ImmutableBytesWritable, put)   
    }
}

rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) 
```
    原文作者:博弈史密斯
    原文地址: https://www.jianshu.com/p/4f074889bbd9
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞