# 数据分析最佳实践 - spark Dataset/DataFrame数据存取及处理

0x01前言

官网上的spark with scala 的文档比较难理解,内容也特别少。初学遇到很多实际情况,会很迷茫怎么处理数据。
在此把自己踩的坑列一列,供初学者参考。
大牛请轻拍,有问题欢迎指教。

0x02 理解

其实spark不算难理解,但是加上一门不太熟悉的scala语言,整个人刚学的时候都是懵逼的。

Spark:

实际上它本身还是一个Map Reduce操作,Map负责对数据块操作,Reduce对结果汇总。只是Spark提出了RDD的概念,让我们不再需要关注底层的计算,而是更专注于数据集的操作。

它的操作后端会有一个自动的调度系统,帮我们完成计算的过程。我们不需要关心这个任务被分配给了多少台机器,怎样的计算过程,我们只需要得到计算完毕的结果。

Scala:

scala是一门神奇的语言,老司机的代码你可能根本看不懂,对,就是那种一行搞定一个算法的那种。它的面向对象和函数式并不只是表面的意思。目前理解不够深入,希望有缘能再研究研究..

Spark with scala:

scala作为spark的编程语言,相较python此类扩展性更强,更少出现某个问题卡住没办法解决的问题。

0x03 吐槽结束,正文来了!

两周的研究中,发现从示例程序入手最好理解!如有不用再去查询相关文档!

用它做了一个小功能,以下为本次实践中需求的场景,
读写存储部分:

  • 从mongodb中读数据,spark处理后入mongodb

数据处理部分:

  • 处理获取的mongodb数据及json数据。
读写存储数据部分:

首先,初上手的我,参考了一下mongodb针对spark的官网文档:
https://docs.mongodb.com/spark-connector/master/scala/write-to-mongodb/

这里有详细的方法去对mongodb数据进行读写。
对后续数据处理,这类方法在文档中最清晰明了,于是我使用的这类方法(参见上方链接datasets and SQL部分):

var sparkSession = SparkSession.builder().master("local[2]").appName("conn").config("spark.mongodb.input.uri", "mongodb://[username]:[password]@10.10.10.10:28018/[dbname].[collections]").config("spark.mongodb.output.uri", "mongodb://[username]:[password]@10.10.10.10:28018/[dbname].[collections]").getOrCreate()

因为习惯性将每个功能点放进每个函数里分别调用,这里遇到了第一个问题,

!SparkSession在一个任务中只能存在一个,多个会报冲突

如果你多个函数中均要使用这个sparkSession,可以放入公共区域或声明为共享变量以供调用。

共享变量了解一下:

    rdd.foreach会在集群里的不同机器上创建spark工作线程, 而connection对象则不会在集群里的各个机器之间传递, 所以有些spark工作线程就会产生connection对象没有被初始化的执行错误。 

    解决的办法可以是在spark worker里为每一个worker创建一个connection对象, 但是如果你这么做, 程序要为每一条record创建一次connection,显然效率和性能都非常差。

    另一种改进方法是为每个spark分区创建一个connection对象,同时维护一个全局的静态的连接迟对象, 这样就可以最好的复用connection。 另外需要注意: 虽然有多个connection对象,但在同一时间只有一个connection.send(record)执行,因为在同一个时间里,只有一个微批次的rdd.

实际情况中,在读出数据处理完毕后,并不是所有情况都如官网示例一样,需要存储的是简单的数据,更多时候是需要糅合的数据,此时我们可能要从多种类型转换成dataframe/rdd类型数据进行存储。

!多种类型数据转换成df/rdd类型
import sparkSession.implicits._

case class DealData(user:String,date:String,array_acc:ListBuffer[Map[String,String]],array_err:ListBuffer[Map[String,String]],array_fro:ListBuffer[Map[String,String]])

val seq_df = Seq(DealData(user,NowDate(),array_acc,array_err,array_fro)).toDF()
#这是一个将多类数据糅合进一个rdd的例子,DealData不是一个函数,是指明其类型,使用Seq必须在object中指明它包含的所有元素的类型。

val rdd = seq_df.rdd
#Dataframe to rdd

val testDF = rdd.map {line=>
      (line._1,line._2)
    }.toDF("col1","col2")
#rdd to dataframe,rdd是没有sql方法的,df和rdd的区别为:rdd是无字段名的。

MongoSpark.save(seq_df.write.option("collection","[dbname]").mode("Append"))
#以Dataframe格式保存进mongodb


看官网demo中读取出来数据,会发现,可以将读出的数据具象为一个数据库形式,去展示,筛选等操作。

但是实际上,除了show()函数检查数据是否具象成功,并不知道如何处理数据。下例或许会给你一点启示:


val df = MongoSpark.load(sparkSession) 

df.createOrReplaceTempView("[dbname]") 
#这里是你给这个虚拟的db起一个名字,需与下句中表名保持一致。

val df1 = sparkSession.sql("select user from [dbname] as l where cast(l.actTime as String) >= '2018/05/28 18:00:00' ")
#这是一个依据时间筛选的例子,df1的数据类型为DataFrame

val df_count0 = df1.groupBy("user").count()
#这里是按照user与每个user对应的数量做一个新的表。表中只有user和count两个字段,df_count0依旧是DataFrame类型。

val df_count = df_count0.orderBy("count")
#按照count排序,此时df_count是DataSet类型。

val users = df_count.rdd.map(x=>x(0)).collect()
#取df_count所有数据集中的第一列元素,将其转换成Array类型-数组类型

实际上还有一点google了很久都没有结果的数据处理需求,由spark的例子我们知道如何读取文件,但是如何处理有规律的文件,如,将json内容,转换成rdd或df的格式进行数据分析:

!json内容转换成RDD/dataframe类型。
import com.alibaba.fastjson.{JSON, JSONArray}

val json = JSON.parseObject(content)
val data = json.getJSONObject("DATA").get("data")
#处理两层包含的json数据。


row.replace("{","").replace("}","").split(",").map(row0=>try{
    val row0_array = row0.toString.split(":")
    array_map += (row0_array(0)->row0_array(1))
})
#实际上我们要做的事是把json数据转换成可处理的数据形式,
#我们需要先把数据规律性的处理成key:value形式,
#然后使用map函数对其进行指向,做成一个字典的形式,
#之后再使用Seq方法将其转换成dataframe/rdd类型就可以啦~

另外在环境搭建初期,遇到一些的环境错误Q&A分享一下~:

Q:
IDEA里运行代码时出现Error:scalac: error while loading JUnit4, Scala signature JUnit4 has wrong version expected: 5.0 found: 4.1 in JUnit4.class错误的解决办法
A:
删除test/scala/下的文件

Q:
Exception in thread “main” java.lang.NoClassDefFoundError: scala/Product$class
A:
切换scala2.11.0

Q:
Exception in thread “main” java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
A:
scala版本和pom.xml版本不一致。

Q:
A master URL must be set in your configuration?
A:
没配置materURL,可在代码中加setMaster
val conf = new SparkConf().setAppName(“WordCount”).setMaster(“local[]”)*

Q:
res.withPipeline(Seq(Document.parse(“{ $match: { ‘errno’ : ‘220’ } }”))) Document未定义问题
A:
import org.bson.Document

Q:
ERROR Executor:91 – Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.analysis.TypeCoercion$.findTightestCommonTypeOfTwo()Lscala/Function2;
A:
切换spark和spark-mongo版本为2.2版本。

欢迎大佬指出不足和认知错误,scala这个萝卜坑,有缘再蹲。

    原文作者:沉迷代码的Looooony酱
    原文地址: https://www.jianshu.com/p/e52efb11dbaa
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞