我有问题在zeppelin笔记本中保存由spark流消耗的kafka消息.
我的代码是:
case class Message(id: Long, message: String, timestamp: Long) extends Serializable
val ssc = new StreamingContext(sc, Seconds(2))
val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
Map("test" -> 4),
StorageLevel.MEMORY_ONLY)
.map { case (k, v) => implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
.filter(_.id % 2 == 0)
val mes = messagesStream.window(Seconds(10))
mes
.map(m => Message(m.id, m.message, m.timestamp))
.foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))
ssc.start()
当我运行%sql select * from messages时,它显示没有数据,但是表已定义.如果我在Cassandra上将保存更改为tempTable,它会正确保存并显示数据.不明白为什么会这样.
感谢帮助.
最佳答案 好的,这是问题所在.让我们首先回顾一下foreachRDD运算符的定义:
foreachRDD没有使用它是如何使用的.它是最通用的输出运算符,它将函数func应用于从流生成的每个RDD.此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库.请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD.
那么您的代码实际发生的是以下内容:
由于DStreams由输出操作延迟执行,就像RDD动作一样懒惰地执行RDD.具体而言,DStream输出操作中的RDD操作会强制处理接收到的数据.因此,如果您的应用程序没有任何输出操作,或者您没有输出操作(如dstream.foreachRDD()而内部没有任何RDD操作),则不执行任何操作.系统将简单地接收数据并将其丢弃.
因此,每次执行registerTempTable时都会丢弃RDD数据,因此SQL查询会给出一个空结果.
要解决您的问题,您需要将数据保存在某个地方(Cassandra是一个不错的选择)然后查询它.