我有一些代码将两个流式DataFrames和输出连接到控制台.
val dataFrame1 =
df1Input.withWatermark("timestamp", "40 seconds").as("A")
val dataFrame2 =
df2Input.withWatermark("timestamp", "40 seconds").as("B")
val finalDF: DataFrame = dataFrame1.join(dataFrame2,
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDF.writeStream.format("console").start().awaitTermination()
我现在想要的是重构这部分以使用数据集,所以我可以进行一些编译时检查.
所以我尝试的非常简单:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDS.writeStream.format("console").start().awaitTermination()
但是,这会产生以下错误:
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;;
如您所见,连接代码没有改变,因此两侧都有水印和范围条件.唯一的变化是使用数据集API而不是DataFrame.
另外,当我使用内连接时它很好:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
)
finalDS.writeStream.format("console").start().awaitTermination()
有谁知道这怎么可能发生?
最佳答案 好吧,当你使用joinWith方法而不是join时,你依赖于不同的实现,看起来这个实现不支持leftOuter join用于流式数据集.
您可以查看官方文档的outer joins with watermarking部分.方法join不使用joinWith.请注意,结果类型将是DataFrame.这意味着您很可能必须手动映射字段
val finalDS = dataFrame1.as[A].join(dataFrame2.as[B],
expr(
"A.key = B.key" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour"),
joinType = "leftOuter").select(/* useful fields */).as[C]