scala – 在zeppelin笔记本中保存火花流消耗的kafka消息

我有问题在zeppelin笔记本中保存由spark流消耗的kafka消息.

我的代码是:

case class Message(id: Long, message: String, timestamp: Long) extends Serializable

   val ssc = new StreamingContext(sc, Seconds(2))

  val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, 
    Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
    Map("test" -> 4),
    StorageLevel.MEMORY_ONLY)
    .map { case (k, v) =>  implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
    .filter(_.id % 2 == 0)

  val mes =  messagesStream.window(Seconds(10))

  mes
  .map(m => Message(m.id, m.message, m.timestamp))
  .foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))

  ssc.start() 

当我运行%sql select * from messages时,它显示没有数据,但是表已定义.如果我在Cassandra上将保存更改为tempTable,它会正确保存并显示数据.不明白为什么会这样.

感谢帮助.

最佳答案 好的,这是问题所在.让我们首先回顾一下foreachRDD运算符的定义:

foreachRDD没有使用它是如何使用的.它是最通用的输出运算符,它将函数func应用于从流生成的每个RDD.此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库.请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD.

那么您的代码实际发生的是以下内容:

由于DStreams由输出操作延迟执行,就像RDD动作一样懒惰地执行RDD.具体而言,DStream输出操作中的RDD操作会强制处理接收到的数据.因此,如果您的应用程序没有任何输出操作,或者您没有输出操作(如dstream.foreachRDD()而内部没有任何RDD操作),则不执行任何操作.系统将简单地接收数据并将其丢弃.

因此,每次执行registerTempTable时都会丢弃RDD数据,因此SQL查询会给出一个空结果.

要解决您的问题,您需要将数据保存在某个地方(Cassandra是一个不错的选择)然后查询它.

点赞