创建SparkSession和SparkContext
val spark = SparkSession.builder.master("local").getOrCreate()
val sc = spark.sparkContext
从数组创建DataFrame
spark.range(1000).toDF("number").show()
指定Schema创建DataFrame
val data = Seq(
Row("A", 10, 112233),
Row("B", 20, 223311),
Row("C", 30, 331122))
val schema = StructType(List(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("phone", IntegerType)))
spark.createDataFrame(sc.makeRDD(data), schema).show()
从JSON文件加载DataFrame
/* data.json
{"name":"A","age":10,"phone":112233}
{"name":"B", "age":20,"phone":223311}
{"name":"C", "age":30,"phone":331122}
*/
spark.read.format("json").load("/Users/tobe/temp2/data.json").show()
从CSV文件加载DataFrame
/* data.csv
name,age,phone
A,10,112233
B,20,223311
C,30,331122
*/
spark.read.option("header", true).csv("/Users/tobe/temp2/data.csv").show()
读取MySQL数据库加载DataFrame
spark.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://127.0.0.1:3306/db")
.option("user", "root")
.option("password", "root")
.option("dbtable", "data")
.load().show()
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
RDD转DataFrame
val rdd = sc.textFile("files:///etc/hosts")
import spark.implicits._
rdd.toDF().show()
val spark = SparkSession.builder.master("local").getOrCreate()
val sc = spark.sparkContext
创建Timestamp数据
Spark的TimestampType类型与Java的java.sql.Timestamp对应,
val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext
val schema = StructType(Seq(StructField("createTime", TimestampType, false)))
val rows = List(
Row(Timestamp.valueOf("2012-12-12 10:10:10.0")),
Row(new Timestamp(System.currentTimeMillis())),
Row(new Timestamp(20938)))
val df = spark.createDataFrame(sc.parallelize(rows), schema)
创建DateType数据
Spark的DateType类型与Java的java.sql.Date对应,
val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext
val schema = StructType(Seq(StructField("createTime", DateType, false)))
val rows = List(
Row(Timestamp.valueOf("2012-12-12")),
Row(new Timestamp(System.currentTimeMillis())),
Row(new Timestamp(20938)))
val df = spark.createDataFrame(sc.parallelize(rows), schema)