1、列转行
行转列和列转行是我们常用的功能,假如我们有如下的csv文件:
+---+----+---------+
|id |name|address |
+---+----+---------+
|1 |a |add1,add2|
|2 |b |add3 |
|3 |c |add4 |
|4 |d |add5 |
+---+----+---------+
我们想转换成如下的格式:
+---+----+----+
| id|name|addr|
+---+----+----+
| 1| a|add1|
| 1| a|add2|
| 2| b|add3|
| 3| c|add4|
| 4| d|add5|
+---+----+----+
首先,我们利用spark读取以下csv文件:
def getCsv(spark:SparkSession,file:String) = {
import spark.implicits._
val df = spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", true)
.option("delimiter", ",")
.option("encoding","gbk")
.load(file)
df
}
然后我们写一个函数,对于传入的一行数据,根据address拆分成多行,并返回:
def splitByAddr(id:String,name:String,addrs:String)={
val addrsList = addrs.split(",")
var resSeq = Seq[Row]()
for(t <- addrsList){
resSeq = resSeq :+ Row(id,name,t)
}
resSeq
}
最后 ,是我们的主函数:
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.enableHiveSupport()
//.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
//implicit val sEncoder = org.apache.spark.sql.Encoders.kryo[String]
val df = getCsv(spark,"test.csv")
df.show(false)
val df1 = df.rdd.flatMap(line => {var s = line.getAs[String]("address");var id = line.getAs[String]("id");var name = line.getAs[String]("name");splitByAddr(id,name,s)})
df1.collect().foreach(println)
val schema = StructType(List(
StructField("id", StringType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("addr", StringType, nullable = true)
))
println(df1)
spark.createDataFrame(df1,schema).show()