DataFrame 写入mysql
import java.io.FileInputStream
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SaveMode}
/**
* @author 利伊奥克儿-lillcol
* 2018/10/12-14:44
*
*/
object MyTestDemo {
/**
* 将DataFrame保存为Mysql表
*
* @param dataFrame 需要保存的dataFrame
* @param tableName 保存的mysql 表名
* @param saveMode 保存的模式 :Append、Overwrite、ErrorIfExists、Ignore
* @param proPath 配置文件的路径
*/
def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: SaveMode, proPath: String) = {
var table = tableName
val properties: Properties = getProPerties(proPath)
val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以 创建prop 按照spark 的格式 进行配置数据库
prop.setProperty("user", properties.getProperty("mysql.username"))
prop.setProperty("password", properties.getProperty("mysql.password"))
prop.setProperty("driver", properties.getProperty("mysql.driver"))
prop.setProperty("url", properties.getProperty("mysql.url"))
if (saveMode == SaveMode.Overwrite) {
var conn: Connection = null
try {
conn = DriverManager.getConnection(
prop.getProperty("url"),
prop.getProperty("user"),
prop.getProperty("password")
)
val stmt = conn.createStatement
table = table.toUpperCase
stmt.execute(s"truncate table $table") //此操作的目的是在覆盖的时候不删除原来的表,避免数据的类型全部变为TEXT类型
conn.close()
}
catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table.toUpperCase, prop)
}
/**
* 获取配置文件
*
* @param proPath
* @return
*/
def getProPerties(proPath: String): Properties = {
val properties: Properties = new Properties()
properties.load(new FileInputStream(proPath))
properties
}
}
效率问题
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnectionFactory(url, props)()
try {
var tableExists = JdbcUtils.tableExists(conn, url, table)
if (mode == SaveMode.Ignore && tableExists) {
return
}
if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $table already exists.")
}
if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)
tableExists = false
}
// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
} finally {
conn.close()
}
JdbcUtils.saveTable(df, url, table, props)
}
--------------------------------------------------------------
/**
* Saves the RDD to the database in a single transaction.
*/
def saveTable(
df: DataFrame,
url: String,
table: String,
properties: Properties) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
getJdbcType(field.dataType, dialect).jdbcNullType
}
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(url, properties)
val batchSize = properties.getProperty("batchsize", "1000").toInt
df.foreachPartition { iterator =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
}
}
配置文件部分内容
#mysql数据库配置
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://0.0.0.0:3306/iptv?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
mysql.username=lillclol
mysql.password=123456
#hive
hive.root_path=hdfs://ns1/user/hive/warehouse/
上面两段代码为DataFrame写入mysql关键源代码
一开始我觉得DataFrame写入mysql效率感人,太慢了,想了各种手段去优化,最快的是把文件拿下来,load进mysql,但是这步骤太繁琐了,后面去看了一下源代码,发现了数据写入mysql的时候是按照分区来写的,也就是说每个分区都创建了一个mysql连接,于是我在写入mysql之前对DataFrame先进行分区,根据mysql连接池数量设定合理的分区,每分钟可以写入100W条记录,基本达到较高的效率。
此为本人日常工作中的总结