Spark DataFrame基础操作

创建SparkSession和SparkContext

val spark = SparkSession.builder.master("local").getOrCreate()
val sc = spark.sparkContext

从数组创建DataFrame

spark.range(1000).toDF("number").show()

指定Schema创建DataFrame

val data = Seq(
  Row("A", 10, 112233),
  Row("B", 20, 223311),
  Row("C", 30, 331122))

val schema = StructType(List(
  StructField("name", StringType),
  StructField("age", IntegerType),
  StructField("phone", IntegerType)))

spark.createDataFrame(sc.makeRDD(data), schema).show()

从JSON文件加载DataFrame

/* data.json
   {"name":"A","age":10,"phone":112233}
   {"name":"B", "age":20,"phone":223311}
   {"name":"C", "age":30,"phone":331122}
 */
spark.read.format("json").load("/Users/tobe/temp2/data.json").show()

从CSV文件加载DataFrame

/* data.csv
   name,age,phone
   A,10,112233
   B,20,223311
   C,30,331122
 */
spark.read.option("header", true).csv("/Users/tobe/temp2/data.csv").show()

读取MySQL数据库加载DataFrame

spark.read.format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", "jdbc:mysql://127.0.0.1:3306/db")
  .option("user", "root")
  .option("password", "root")
  .option("dbtable", "data")
  .load().show()
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.6</version>
</dependency>

RDD转DataFrame

val rdd = sc.textFile("files:///etc/hosts")

import spark.implicits._
rdd.toDF().show()
val spark = SparkSession.builder.master("local").getOrCreate()
val sc = spark.sparkContext

创建Timestamp数据

Spark的TimestampType类型与Java的java.sql.Timestamp对应,

val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext
val schema = StructType(Seq(StructField("createTime", TimestampType, false)))
val rows = List(
  Row(Timestamp.valueOf("2012-12-12 10:10:10.0")),
  Row(new Timestamp(System.currentTimeMillis())),
  Row(new Timestamp(20938)))
val df = spark.createDataFrame(sc.parallelize(rows), schema)

创建DateType数据

Spark的DateType类型与Java的java.sql.Date对应,

val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext
val schema = StructType(Seq(StructField("createTime", DateType, false)))
val rows = List(
  Row(Timestamp.valueOf("2012-12-12")),
  Row(new Timestamp(System.currentTimeMillis())),
  Row(new Timestamp(20938)))
val df = spark.createDataFrame(sc.parallelize(rows), schema)

    原文作者:tobe
    原文地址: https://zhuanlan.zhihu.com/p/77011190
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞