SparkStreaming入门教程(五)输出操作Output Operations

Output Operations将DStream的数据推送到外部系统,如数据库或文件系统。类似于RDD的惰性求值,输出操作才会触发计算的实际执行。

  • print()
    在驱动器程序中打印每个批次中的前十个元素,通常用于调试模式。
  • saveAsTextFiles(prefix, [suffix])
    将此DStream的内容保存为文本文件。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]”。
  • saveAsObjectFiles(prefix, [suffix])
    将此DStream的内容保存为SequenceFiles序列化的Java对象。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]”
  • saveAsHadoopFiles(prefix, [suffix])
    将此DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]”
  • foreachRDD(func)
    最通用的输出运算符,将函数func应用于从流中生成的每个RDD。此功能将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或将其写入数据库,func内通常有RDD的action操作

前面几个都太简单,直接调用方法即可,只演示spark将数据输出插入到mysql数据库的代码

import java.sql.DriverManager
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(5))   
val wordcount = ssc.socketTextStream("localhost", 7799).flatMap(_.split(" ")).map(word => (word, 1))
wordcount.foreachRDD(wd => wd.foreachPartition(
      data => {
        val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
        try {
          for (row <- data) {
            println("input data is " + row._1 + "  " + row._2)
            val sql = "insert into stream(word,num) values(" + "'" + row._1 + "'," + row._2 + ")"
            conn.prepareStatement(sql).executeUpdate()
          }
        } finally {
          conn.close()
        }
      }))
      
ssc.start()
ssc.awaitTermination()

《SparkStreaming入门教程(五)输出操作Output Operations》
《SparkStreaming入门教程(五)输出操作Output Operations》

    原文作者:Seven_Ki
    原文地址: https://www.jianshu.com/p/69460202ee03
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞