package ohmysummer
import ohmysummer.model.SourceCan
import ohmysummer.pipeline.kafka.WmKafkaDeserializer
import ohmysummer.pipeline.schema.{CanSignalSchema, DcSaleData}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.functions._
/**
* 从 Kafka 读取 JSON 数据
* https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
* https://stackoverflow.com/questions/43297973/how-to-read-records-in-json-format-from-kafka-using-structured-streaming
* https://stackoverflow.com/questions/48361177/spark-structured-streaming-kafka-convert-json-without-schema-infer-schema
* https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
* https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
*/
object WmDcSaleApplication {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("ReadFromKafka")
.master("local[2]")
.getOrCreate()
object KafkaDeserializerWrapper {
val deser = new WmKafkaDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
KafkaDeserializerWrapper.deser.deserialize(topic, bytes)
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "dc-sale-data")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
// 反序列化 value 中的字节数组, 得到原始 JSON
val result: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", """deserialize("dc-sale-data", value) AS message""")
.as[(String, String)]
val schema = (new DcSaleData).schema
val canSchema = (new CanSignalSchema).schema
val parsed = result.select($"key", from_json($"message", canSchema) as "data")
// val event: DataFrame = parsed.select($"data.vin", $"data.version", $"data.tboxSn", $"data.iccid", $"data.createTime", explode(array($"data.event.info"))).select("vin", "version", "tboxSn", "iccid", "createTime", "col.*")
val event: Dataset[SourceCan] = parsed.select($"data.vin",$"data.version",$"data.tboxSn",$"data.iccid",$"data.createTime",$"data.event.info" as "event",$"data.signal1s.info" as "signal1s", $"data.signal30s.info" as "signal30s")
.select($"vin", $"version", $"tboxSn", $"iccid", $"createTime", explode(array($"event")) as "eventcol", $"signal1s", $"signal30s")
.select($"vin", $"version", $"tboxSn", $"iccid", $"createTime", $"eventcol", explode(array($"signal1s")) as "signal1scol", $"signal30s")
.select($"vin", $"version", $"tboxSn", $"iccid", $"createTime", $"eventcol", $"signal1scol", explode(array($"signal30s")) as "signal30scol")
.select($"vin", $"version", $"tboxSn", $"iccid", $"createTime", $"eventcol.*", $"signal1scol.*", $"signal30scol.*")
.select($"vin", $"createTime", $"HU_TargetSOC" as "targetSoc" , $"ICU_ICUTotalOdometer" as "totalOdometer")
.filter($"vin".isNotNull)
.as[SourceCan]
// parsed.printSchema()
// event.printSchema()
val console = event.writeStream
.format("console")
.option("truncate", "false")
.outputMode(OutputMode.Append())
val query = console.start()
query.awaitTermination()
}
}