Spark:读取mysql数据作为DataFrame

读取mysql数据作为DataFrame

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.iptv.domain.DatePattern
import com.iptv.job.JobBase
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

/**
    * 获取配置文件
    *
    * @param proPath
    * @return
    */
  def getProPerties(proPath: String): Properties = {
    val properties: Properties = new Properties()
    properties.load(new FileInputStream(proPath))
    properties
  }

  /**
    * 获取 Mysql 表的数据
    *
    * @param sqlContext
    * @param tableName 读取Mysql表的名字
    * @param proPath   配置文件的路径
    * @return 返回 Mysql 表的 DataFrame
    */
  def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
    val properties: Properties = getProPerties(proPath)
      sqlContext
        .read
        .format("jdbc")
        .option("url", properties.getProperty("mysql.url"))
        .option("driver", properties.getProperty("mysql.driver"))
        .option("user", properties.getProperty("mysql.username"))
        .option("password", properties.getProperty("mysql.password"))
        //        .option("dbtable", tableName.toUpperCase)
        .option("dbtable", tableName)
        .load()

  }

  /**
    * 获取 Mysql 表的数据 添加过滤条件
    *
    * @param sqlContext
    * @param table           读取Mysql表的名字
    * @param filterCondition 过滤条件
    * @param proPath         配置文件的路径
    * @return 返回 Mysql 表的 DataFrame
    */
  def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
    val properties: Properties = getProPerties(proPath)
    var tableName = ""
      tableName = "(select * from " + table + " where " + filterCondition + " ) as t1" //支持将条件套入sql将临时表用作数据源
      sqlContext
        .read
        .format("jdbc")
        .option("url", properties.getProperty("mysql.url"))
        .option("driver", properties.getProperty("mysql.driver"))
        .option("user", properties.getProperty("mysql.username"))
        .option("password", properties.getProperty("mysql.password"))
        //        .option("dbtable", tableName.toUpperCase)
        .option("dbtable", tableName)
        .load()
  }

使用示例

//不添加过滤条件
val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
    val sc: SparkContext = new SparkContext(conf)
    val sqlContext: SQLContext = getSQLContext(sc)
    import sqlContext.implicits._
val test_table_dataFrame: DataFrame = readMysqlTable(sqlContext, "TEST_TABLE", proPath).persist(PERSIST_LEVEL)
----------------------------------------------------------------------------------------------------
//添加过滤条件
//获取 task_id = ${task_id} 数据做为DataFrame
val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
    val sc: SparkContext = new SparkContext(conf)
    val sqlContext: SQLContext = getSQLContext(sc)
    import sqlContext.implicits._
val test_table_dataFrame = readMysqlTable(sqlContext, "TEST_TABLE", s"task_id=${task_id}", configPath)

配置文件部分内容

配置文件部分内容
#mysql数据库配置
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://0.0.0.0:3306/iptv?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
mysql.username=lillclol
mysql.password=123456

#hive
hive.root_path=hdfs://ns1/user/hive/warehouse/

此为本人日常工作中的原创总结,转载请注明出处!!!!!

    原文作者:利伊奥克儿
    原文地址: https://www.jianshu.com/p/1ad2cfca6604
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞