通过编写Spark程序统计各城市的住宿场所数量和房间数量,并根据城市房间数量降序排列,输出前10条统计结果,将统计结果保存至本地。同时在MySQL创建数据库并在其中创建table3_2,将统计结果写入表table3_2中。要求输出字段包含:省份、城市、住宿场所数量、房间数量。
原数据
Spark源代码
import org.apache.spark.rdd.RDD
import org.apache.spark.{ SparkConf, SparkContext}
import java.sql.{ Connection, DriverManager, PreparedStatement}
object task3_2trymysql {
def main(args: Array[String]): Unit = {
//准备环境
val conf = new SparkConf().setMaster("local").setAppName("task3_2")
val sc = new SparkContext(conf)
//读取文件
val rdd: RDD[String] = sc.textFile("D:\\abc\\jd_4707.csv")//
//获取表头
val one: String = rdd.first()
//过滤表头,以及该空数据
val gl: RDD[String] = rdd.filter(row => {
row != one && row.split(",")(8)!=""&&row.split(",")(4)!=""&&row.split(",")(3)!=""
}
)
//获取数据((湖南,长沙),1),1的作用是用来统计该城市的住宿场所数量
val value1 = gl.map(line => {
val strings = line.split(",")
((strings(3),strings(4)),1)
}
)
//获取数据(长沙,房间数)
val value2 = gl.map(line => {
val strings = line.split(",")
(strings(4),strings(8).toInt)
}
)
//通过key将value进行叠加,统计每个城市的住宿场所数量和总房间数
val rdd1 = value1.reduceByKey(_ + _)
val rdd2 = value2.reduceByKey(_ + _)
//改变数据为(湖南,(长沙,住宿场所数量))
val rdd4 = rdd1.map(line => (line._1._2,(line._1._1, line._2)))
//使用join,相同的key进行连接,数据变为(湖南,(长沙,住宿场所数量),房间数)
val newrdd= rdd4.join(rdd2)
//改变数据为(湖南,长沙,住宿场所数量,房间数)
val rdd5: RDD[(String, String, Int, Int)] = newrdd.map(line => (line._2._1._1, line._1, line._2._1._2, line._2._2))
//按照房间数降序排序,打印前十条数据
rdd5.sortBy(x => x._4, false).take(10).foreach(println)
//按照房间数降序排序,将数据保存到本地
rdd5.sortBy(x => x._4, false).saveAsTextFile("D:\\abc\\asdsada")
//连接数据库相关参数
var driver="com.mysql.cj.jdbc.Driver"
var user="root"
var password="101010"
var url="jdbc:mysql://localhost:3306/accommodationdata?serverTimezone=GMT%2B8"
//写入mysql操作
rdd5.foreachPartition{
data=>{
//注册驱动
Class.forName(driver)
//获取连接
val connection: Connection = DriverManager.getConnection(url, user, password)
//声明数据库操作sql语句,将数据写入表take3_2中
var sql="insert into take3_2 values(?,?,?,?) "
//创建数据库操作对象
val ps: PreparedStatement = connection.prepareStatement(sql)
data.foreach{
//数据匹配
case (province,city,accommodation_num,room)=> {
//设置参数
ps.setString(1,province)
ps.setString(2,city)
ps.setInt(3,accommodation_num)
ps.setInt(4,room)
//执行sql
ps.executeUpdate();
}
}
//关闭连接和数据库操作对象
ps.close()
connection.close()
}
}
sc.stop()
}
}