Spark 实践总结

公司组会上分享的笔记,做个备份,以后有了新的内容,也会添加到其中。

scala就是操作spark的语言,利用spark以及sparkSQL提供的api来操作HDFS中保存的各种数据。

之前的各种操作都是先通过spark.sql先查询数据库得到sql.DataFrame数据,之后通过.rdd得到RDD。

spark.sql("select * from xxx").rdd

之后通过各种map,flatmap,filter,reduceByKey对RDD进行处理,处理之后还是一个RDD。

最后通过.repartition将所有的分布式RDD收集到一起,最后.saveAsTextFile,传入一个路径,就可以保存了。

注意:如果没有添加reparation直接保存,会造成保存的目录中有多个文件。

注意:spark保存到文件中,并不是本机的文件中,而是在hadoop体系中的文件中。

val output = "/user/spark/warehouse/nlp/note_terms_category_update"
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(output), true)
spark.sql(
      s"""
         |select discovery_id, desc, taxonomy1
         |from
         |temp.note_by_category_detail_update
         |order by time desc
         |limit 400000
     """.stripMargin
).rdd.map(parse).repartition(1).saveAsTextFile(output)

一种比较好的实践:当sql语句较长的时候,利用上文方式组织起来

也可以将RDD.toDF().createOrReplaceTempView保存到一个临时表中

保存到临时表时,比较常见的场景是要把现在的数据保存到多个column中。这时候可以先把之前一条记录先map到一个case class 对象,之后再保存,就会将class中的变量名作为列名同时保存了。

之后再次通过spark.sql 通过sql语句将刚才的临时表保存到我们希望的位置。

注意:在使用.toDF函数的时候,需要先 import spark.implicits._

这个import语句不是放到整个文件的最顶上,而是在spark对象已经创建之后,再次调用

import spark.implicits._

spark.sql("select * from xxx").rdd
  .map(mapToRecord)
  .filter(Objects.nonNull)
  .toDF().createOrReplaceTempView("table_raw_all")
spark.sql("drop table if exists temp.tmp_note_tags_yong_raw_all")
spark.sql("create table temp.tmp_note_tags_yong_raw_all as select * from table_raw_all")
case class RecordItem(target1_type: String, frequent: Int)

常用函数:

map, 函数接收一个输入(一行记录),返回一个输出,输入输出不需要是同种类型,将之前的RDD的一条记录映射为另一条记录

.map(x => x.getAs[String](“keywords”)) 可以简写为 .map(_.getAs[String](“keywords”))

.map(x => (x, 1)) 可以简写为 .map((_, 1))

flatmap, 函数接收一个输入(一行记录),返回一个对象,Array或者List(返回值是可以遍历的类型,从而实现flatMap)

.flatMap(x => x + 1)

.flatMap(x => Array(x))

.flatMap(x => List(x))

reduceByKey, 函数接收两个输入,返回一个输出。输入和输出的类型需要相同,输入的类型就是一行记录中的value

.reduceByKey((x1, x2) => x1 + x2) 可以简写为 .reduceByKey(_ + _) x1和x2可以是int double string等等

reduceByKey之前一般先将一个记录映射为一个tuple元祖(通过map函数),之后通过key相等,来对value进行聚合。

filter, 函数接收一个输入(一行记录),返回一个true或者false。返回为true的记录保留,返回为false丢弃

.filter(x => Objects.nonNull(x)) 可以简写为 .filter(Objects.nonNull)

scala调用java方法:

比如之前调用分词的接口来讲笔记分词之类的场景,都是调用现有的java接口来处理RDD中的一行数据。

还有一种情况,就是对于scala特别不熟悉,需要把稍微复杂点的逻辑放到java中来处理。

比较好的实践:java方法接收的是String, 返回的也是String。

例如:.map(FreqItemExtractUtil.mapFunction)

以后有了新的再更新吧~

    原文作者:彭彭
    原文地址: https://zhuanlan.zhihu.com/p/52706137
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞