我的
Scala测试中出现此错误:
StructType(StructField(a,StringType,true), StructField(b,StringType,true), StructField(c,StringType,true), StructField(d,StringType,true), StructField(e,StringType,true), StructField(f,StringType,true), StructField(NewColumn,StringType,false)) did not equal StructType(StructField(a,StringType,true), StructField(b,StringType,true), StructField(c,StringType,true), StructField(d,StringType,true), StructField(e,StringType,true), StructField(f,StringType,true), StructField(NewColumn,StringType,true))
ScalaTestFailureLocation: com.holdenkarau.spark.testing.TestSuite$class at (TestSuite.scala:13)
Expected :StructType(StructField(a,StringType,true), StructField(b,StringType,true), StructField(c,StringType,true), StructField(d,StringType,true), StructField(e,StringType,true), StructField(f,StringType,true), StructField(NewColumn,StringType,true))
Actual :StructType(StructField(a,StringType,true), StructField(b,StringType,true), StructField(c,StringType,true), StructField(d,StringType,true), StructField(e,StringType,true), StructField(f,StringType,true), StructField(NewColumn,StringType,false))
当它应该是真的时,最后的StructField是假的,我不是为什么.这是真的意味着架构接受空值.
这是我的考验:
val schema1 = Array("a", "b", "c", "d", "e", "f")
val df = List(("a1", "b1", "c1", "d1", "e1", "f1"),
("a2", "b2", "c2", "d2", "e2", "f2"))
.toDF(schema1: _*)
val schema2 = Array("a", "b", "c", "d", "e", "f", "NewColumn")
val dfExpected = List(("a1", "b1", "c1", "d1", "e1", "f1", "a1_b1_c1_d1_e1_f1"),
("a2", "b2", "c2", "d2", "e2", "f2", "a2_b2_c2_d2_e2_f2")).toDF(schema2: _*)
val transformer = KeyContract("NewColumn", schema1)
val newDf = transformer(df)
newDf.columns should contain ("NewColumn")
assertDataFrameEquals(newDf, dfExpected)
这是KeyContract:
case class KeyContract(tempColumn: String, columns: Seq[String],
unsigned: Boolean = true) extends Transformer {
override def apply(input: DataFrame): DataFrame = {
import org.apache.spark.sql.functions._
val inputModif = columns.foldLeft(input) { (tmpDf, columnName) =>
tmpDf.withColumn(columnName, when(col(columnName).isNull,
lit("")).otherwise(col(columnName)))
}
inputModif.withColumn(tempColumn, concat_ws("_", columns.map(col): _*))
}
}
提前致谢!!
最佳答案 发生这种情况是因为concat_ws从不返回null,并且结果字段被标记为不可为空.
如果要使用第二个DataFrame作为参考,则必须使用schema和Rows:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
val spark: SparkSession = SparkSession.builder.getOrCreate()
val dfExpected = spark.createDataFrame(spark.sparkContext.parallelize(List(
Row("a1", "b1", "c1", "d1", "e1", "f1", "a1_b1_c1_d1_e1_f1"),
Row("a2", "b2", "c2", "d2", "e2", "f2", "a2_b2_c2_d2_e2_f2")
)), StructType(schema2.map { c => StructField(c, StringType, c != "NewColumn") }))
这样,最后一列将不可为空:
dfExpected.printSchema
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- c: string (nullable = true)
|-- d: string (nullable = true)
|-- e: string (nullable = true)
|-- f: string (nullable = true)
|-- NewColumn: string (nullable = false)