公司组会上分享的笔记,做个备份,以后有了新的内容,也会添加到其中。
scala就是操作spark的语言,利用spark以及sparkSQL提供的api来操作HDFS中保存的各种数据。
之前的各种操作都是先通过spark.sql先查询数据库得到sql.DataFrame数据,之后通过.rdd得到RDD。
spark.sql("select * from xxx").rdd
之后通过各种map,flatmap,filter,reduceByKey对RDD进行处理,处理之后还是一个RDD。
最后通过.repartition将所有的分布式RDD收集到一起,最后.saveAsTextFile,传入一个路径,就可以保存了。
注意:如果没有添加reparation直接保存,会造成保存的目录中有多个文件。
注意:spark保存到文件中,并不是本机的文件中,而是在hadoop体系中的文件中。
val output = "/user/spark/warehouse/nlp/note_terms_category_update"
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(output), true)
spark.sql(
s"""
|select discovery_id, desc, taxonomy1
|from
|temp.note_by_category_detail_update
|order by time desc
|limit 400000
""".stripMargin
).rdd.map(parse).repartition(1).saveAsTextFile(output)
一种比较好的实践:当sql语句较长的时候,利用上文方式组织起来
也可以将RDD.toDF().createOrReplaceTempView保存到一个临时表中
保存到临时表时,比较常见的场景是要把现在的数据保存到多个column中。这时候可以先把之前一条记录先map到一个case class 对象,之后再保存,就会将class中的变量名作为列名同时保存了。
之后再次通过spark.sql 通过sql语句将刚才的临时表保存到我们希望的位置。
注意:在使用.toDF函数的时候,需要先 import spark.implicits._
这个import语句不是放到整个文件的最顶上,而是在spark对象已经创建之后,再次调用
import spark.implicits._
spark.sql("select * from xxx").rdd
.map(mapToRecord)
.filter(Objects.nonNull)
.toDF().createOrReplaceTempView("table_raw_all")
spark.sql("drop table if exists temp.tmp_note_tags_yong_raw_all")
spark.sql("create table temp.tmp_note_tags_yong_raw_all as select * from table_raw_all")
case class RecordItem(target1_type: String, frequent: Int)
常用函数:
map, 函数接收一个输入(一行记录),返回一个输出,输入输出不需要是同种类型,将之前的RDD的一条记录映射为另一条记录
.map(x => x.getAs[String](“keywords”)) 可以简写为 .map(_.getAs[String](“keywords”))
.map(x => (x, 1)) 可以简写为 .map((_, 1))
flatmap, 函数接收一个输入(一行记录),返回一个对象,Array或者List(返回值是可以遍历的类型,从而实现flatMap)
.flatMap(x => x + 1)
.flatMap(x => Array(x))
.flatMap(x => List(x))
reduceByKey, 函数接收两个输入,返回一个输出。输入和输出的类型需要相同,输入的类型就是一行记录中的value
.reduceByKey((x1, x2) => x1 + x2) 可以简写为 .reduceByKey(_ + _) x1和x2可以是int double string等等
reduceByKey之前一般先将一个记录映射为一个tuple元祖(通过map函数),之后通过key相等,来对value进行聚合。
filter, 函数接收一个输入(一行记录),返回一个true或者false。返回为true的记录保留,返回为false丢弃
.filter(x => Objects.nonNull(x)) 可以简写为 .filter(Objects.nonNull)
scala调用java方法:
比如之前调用分词的接口来讲笔记分词之类的场景,都是调用现有的java接口来处理RDD中的一行数据。
还有一种情况,就是对于scala特别不熟悉,需要把稍微复杂点的逻辑放到java中来处理。
比较好的实践:java方法接收的是String, 返回的也是String。
例如:.map(FreqItemExtractUtil.mapFunction)
以后有了新的再更新吧~