相比于传统代码,Spark是比较难调试的,所以对其进行单元测试是非常必要的。
RDD测试
RDD在集群中运行,每次修改bug后,都要上传到集群进行测试,代价非常大。
所以优先在本地进行单元测试,可以减少小模块的逻辑错误。
例如,要测试一个WordCount程序:
//定义一个简单的wordcount
object WordCount extends Serializable{
def count(lines:RDD[String]): RDD[(String,Int)]={
val rdd=lines.flatMap(line=>line.split("\\s")).map(word=>(word,1)).reduceByKey(_ + _)
rdd
}
}
可以通过sc.parallelize()
和 sc.textFile()
模拟创建RDD,对返回的RDD进行遍历校验:
//引入scalatest建立一个单元测试类,混入特质BeforeAndAfter,在before和after中分别初始化sc和停止sc,
//初始化SparkContext时只需将Master设置为local(local[N],N表示线程)即可,无需本地配置或搭建集群,
class WordCountTests extends FlatSpec with BeforeAndAfter{
val master="local" // sparkcontext的运行master
var sc:SparkContext=_
it should("test success") in{
//其中参数为rdd或者dataframe可以通过通过简单的手动构造即可
val seq=Seq("the test test1","the test","the")
val rdd=sc.parallelize(seq)
val wordCounts=WordCount.count(rdd)
wordCounts.map(p=>{
p._1 match {
case "the"=>
assert(p._2==3)
case "test"=>
assert(p._2==2)
case "test1"=>
assert(p._2==1)
case _=>
None
}
}).foreach(_=>())
}
//这里before和after中分别进行sparkcontext的初始化和结束,如果是SQLContext也可以在这里面初始化
before{
val conf=new SparkConf()
.setAppName("test").setMaster(master)
sc=new SparkContext(conf)
}
after{
if(sc!=null){
sc.stop()
}
}
}
在将 master URL 设置为
local
来测试时会简单的创建一个SparkContext
,运行您的操作,然后调用SparkContext.stop()
将该作业停止
无返回值方法测试
很多情况下,方法并没有返回值,而是把结果输出到文件或其他介质。这种情况下,需要check输出文件进行测试,比较困难。
例如:
trait WriterHandle{
def writer(records:Seq[GenericRecord]):Unit={
val parquetWriter=...
records.foreach(parquetWriter.writer(..))
}
}
//一个类处混入这个特质,经过一定转换后将结果数据写入parquet中
class ProcessHandle(objects:Iterator[T]) extends Serializable with WriterHandle{
def process():Unit={
val records:Seq[GenericRecord]=build(objects)={
...
}
//这里调用了特质writer中的writer方法,实际单元测试运行到这里可能写入的时候会出错,不能正常测试通过
writer(records)
}
}
这种情况,可以把写文件逻辑抽象成trait
,测试时进行替换:
class Writertests extends FlatSpec {
it should("write success") in{
val objects=Seq(object1,object2..).toIterator
//在new处理类,混入原先特质的一个子特质
val process=new ProcessHandle(objects) with Writerhandletest
}
}
//可以自定义一个trait继承自原先的特质,通过将原先的方法覆盖,然后在重写后的方法里面的根据传入值定义所需要断言即可
trait Writerhandletest extends WriterHandle{
override def writer(records:Seq[GenericRecord]):Unit={
assert(records.length==N)
assert(records(0).XX=="xxx")
}
}
私有方法测试
如果有必要,可以也可对私有方法进行测试。例如:
class MyTest(s:String){
//此公有方法可能不方便测试
def ():Unit={
...
doSth(s)
}
//这里私有方法,可能是逻辑关键所在,有必要测试
private def doSth(s:String):String={
...
}
}
这种情况需要借助于PrivateMethodTester,进行私有方法的调用:
//要混入PrivateMethodTester特质
class MytestTests extends FlatSpec with PrivateMethodTester{
it should("write success") in{
//首先new一个要测试的类
val myTest=new MyTest("string")
//其中通过PrivateMethod修饰,[]中为返回值, ('method)单引号后跟私有方法名
val dosth=PrivateMethod[String]("doSth")
//通过invokePrivate 委托调用私有方法,注意参数要对,貌似传null的话会找不到对应的方法
val str=myTest invokePrivate dosth("string")
//最后断言相应的至即可
asset(str=="string")
}
}