在Spark SQL 编程时,经常需要对获取的DataFrame 对象进行map 操作。map 基于的元素是Row. 那么如何操作Row呢?
1. 使用Row 提供的 get方法,获取需要的列
2. 使用类型匹配,显示的声明列的类型
3. 使用类型匹配和样例类
1. get 方法
val df = spark.sql("select id,idType,sid from graph")
val df1 = df.map(_.getString(0)) // 可通过占位符获取指定元素,当只能返回一个
val df2 = df.filter(_.getString(1) != "1")
val df3 = df.filter(_.getString(1)!= "1" && true) // 占位符只能使用一次
val df4 = df.map(x=>(x.getString(0),x.getString(1))).toDF("id","idType")
val df5 = df.map(x=> {
val id = x.getString(0)
val idType = x.getString(1)
val sid = x.getString(2)
val mock = (id+idType).hashCode
(mock,sid)
}).toDF("mock","sid")
2. 使用类型匹配,并声明列的类型
import org.apache.spark.sql.Row
val df = spark.sql("select id,idType,sid from graph")
val df_ = df.map{case Row(id:String,idType:String,sid:String) => (id,idType,sid)}
3. 使用类型匹配,样例类什么列的类型
val df = spark.sql("select id,idType,sid from graph")
case class graph(id:String,idType:String,sid:String)
val df_ = df.as[graph].map(x=> (x.id,x.idType,x.sid))
4. 结合Schema 的写法
val df = spark.sql("select id,idType,sid from graph")
// 以下代码等价与 df.select("id","idType")
val schema = df.schema.map(_.name).filter(_.contains("id"))
df.map(x=>{
val cList = new ListBuffer[String]
for(c<- schema) cList.append(x.getAs(c))
cList.toList
})
总结:
由于Spark 提供的API丰富多样,Scala 语言本身又支持类型匹配,解析一个数据结构的方法多种多样,可根据习惯,爱怎么写,怎么写。