scala – StructField中的错误(a,StringType,false).这是假的,应该是真的

我的
Scala测试中出现此错误:

StructType(StructField(a,StringType,true), StructField(b,StringType,true), StructField(c,StringType,true), StructField(d,StringType,true), StructField(e,StringType,true), StructField(f,StringType,true), StructField(NewColumn,StringType,false)) did not equal StructType(StructField(a,StringType,true), StructField(b,StringType,true), StructField(c,StringType,true), StructField(d,StringType,true), StructField(e,StringType,true), StructField(f,StringType,true), StructField(NewColumn,StringType,true))

ScalaTestFailureLocation: com.holdenkarau.spark.testing.TestSuite$class at (TestSuite.scala:13)

Expected :StructType(StructField(a,StringType,true), StructField(b,StringType,true), StructField(c,StringType,true), StructField(d,StringType,true), StructField(e,StringType,true), StructField(f,StringType,true), StructField(NewColumn,StringType,true))

Actual   :StructType(StructField(a,StringType,true), StructField(b,StringType,true), StructField(c,StringType,true), StructField(d,StringType,true), StructField(e,StringType,true), StructField(f,StringType,true), StructField(NewColumn,StringType,false))

当它应该是真的时,最后的StructField是假的,我不是为什么.这是真的意味着架构接受空值.

这是我的考验:

val schema1 = Array("a", "b", "c", "d", "e", "f")
val df = List(("a1", "b1", "c1", "d1", "e1", "f1"),
  ("a2", "b2", "c2", "d2", "e2", "f2"))
  .toDF(schema1: _*)

val schema2 = Array("a", "b", "c", "d", "e", "f", "NewColumn")

val dfExpected = List(("a1", "b1", "c1", "d1", "e1", "f1", "a1_b1_c1_d1_e1_f1"),
  ("a2", "b2", "c2", "d2", "e2", "f2", "a2_b2_c2_d2_e2_f2")).toDF(schema2: _*)

val transformer = KeyContract("NewColumn", schema1)
val newDf = transformer(df)
newDf.columns should contain ("NewColumn")
assertDataFrameEquals(newDf, dfExpected)

这是KeyContract:

case class KeyContract(tempColumn: String, columns: Seq[String],
                       unsigned: Boolean = true) extends Transformer {

  override def apply(input: DataFrame): DataFrame = {
    import org.apache.spark.sql.functions._

    val inputModif = columns.foldLeft(input) { (tmpDf, columnName) =>
      tmpDf.withColumn(columnName, when(col(columnName).isNull,
        lit("")).otherwise(col(columnName)))
    }

    inputModif.withColumn(tempColumn, concat_ws("_", columns.map(col): _*))
  }
}

提前致谢!!

最佳答案 发生这种情况是因为concat_ws从不返回null,并且结果字段被标记为不可为空.

如果要使用第二个DataFrame作为参考,则必须使用schema和Rows:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

val spark: SparkSession = SparkSession.builder.getOrCreate()

val dfExpected = spark.createDataFrame(spark.sparkContext.parallelize(List(
  Row("a1", "b1", "c1", "d1", "e1", "f1", "a1_b1_c1_d1_e1_f1"),
  Row("a2", "b2", "c2", "d2", "e2", "f2", "a2_b2_c2_d2_e2_f2")
)), StructType(schema2.map { c => StructField(c, StringType, c != "NewColumn") }))

这样,最后一列将不可为空:

dfExpected.printSchema
root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)
 |-- e: string (nullable = true)
 |-- f: string (nullable = true)
 |-- NewColumn: string (nullable = false)
点赞