Structured Spark Streaming - explode 多列

package ohmysummer

import ohmysummer.model.SourceCan
import ohmysummer.pipeline.kafka.WmKafkaDeserializer
import ohmysummer.pipeline.schema.{CanSignalSchema, DcSaleData}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.functions._

/**
  * 从 Kafka 读取  JSON 数据
  *  https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
  *  https://stackoverflow.com/questions/43297973/how-to-read-records-in-json-format-from-kafka-using-structured-streaming
  *  https://stackoverflow.com/questions/48361177/spark-structured-streaming-kafka-convert-json-without-schema-infer-schema
  *  https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
  *  https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
  */


object WmDcSaleApplication {
  def main(args: Array[String]) {

    val spark = SparkSession
      .builder
      .appName("ReadFromKafka")
      .master("local[2]")
      .getOrCreate()

    object KafkaDeserializerWrapper {
      val deser = new WmKafkaDeserializer
    }
    spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
      KafkaDeserializerWrapper.deser.deserialize(topic, bytes)
    )

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "dc-sale-data")
      .option("startingOffsets", "earliest") 
      .load()

    import spark.implicits._
    // 反序列化 value 中的字节数组, 得到原始 JSON
    val result: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", """deserialize("dc-sale-data", value) AS message""")
      .as[(String, String)]

    val schema = (new DcSaleData).schema
    val canSchema = (new CanSignalSchema).schema

    val parsed = result.select($"key", from_json($"message", canSchema) as "data")
//    val event: DataFrame = parsed.select($"data.vin", $"data.version", $"data.tboxSn", $"data.iccid", $"data.createTime", explode(array($"data.event.info"))).select("vin", "version", "tboxSn", "iccid", "createTime", "col.*")
    val event: Dataset[SourceCan] = parsed.select($"data.vin",$"data.version",$"data.tboxSn",$"data.iccid",$"data.createTime",$"data.event.info" as "event",$"data.signal1s.info" as "signal1s", $"data.signal30s.info" as "signal30s")
      .select($"vin", $"version", $"tboxSn", $"iccid", $"createTime",              explode(array($"event")) as "eventcol", $"signal1s", $"signal30s")
      .select($"vin", $"version", $"tboxSn", $"iccid", $"createTime", $"eventcol", explode(array($"signal1s")) as "signal1scol", $"signal30s")
      .select($"vin", $"version", $"tboxSn", $"iccid", $"createTime", $"eventcol", $"signal1scol", explode(array($"signal30s")) as "signal30scol")
      .select($"vin", $"version", $"tboxSn", $"iccid", $"createTime", $"eventcol.*", $"signal1scol.*", $"signal30scol.*")
      .select($"vin", $"createTime", $"HU_TargetSOC" as "targetSoc" , $"ICU_ICUTotalOdometer" as "totalOdometer")
      .filter($"vin".isNotNull)
      .as[SourceCan]

//    parsed.printSchema()
//    event.printSchema()

    val console = event.writeStream
      .format("console")
      .option("truncate", "false")
      .outputMode(OutputMode.Append())

    val query = console.start()

    query.awaitTermination()

  }
}
    原文作者:焉知非鱼
    原文地址: https://www.jianshu.com/p/c15d3b22e51a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞