文章地址:http://www.haha174.top/article/details/253683
项目源码:https://github.com/haha174/spark.git
1简介
对于saprksql 来说无论是从什么数据源创建出来的DataFrame,都有一些共同的load 和save 操作。load 操作主要是用于加载数据,创建出DataFrame;save 操作主要将Dataframe 中的数据保存到文件夹中。
下面给出java 示例:
public class GenericLoadSave {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("DataFrameStudy");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
DataFrameReader reader=sqlContext.read();
Dataset ds= reader.load("hdfs://hadoop:8020/data/students.parquet");
ds.select("name","age").write().save("hdfs://hadoop:8020/data/studentsSave.parquet");
sc.close();
}
}
下面给出scala 示例
object GenericLoadSave {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameStudy")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val reader = sqlContext.read
val ds = reader.load("hdfs://hadoop:8020/data/students.parquet")
ds.select("name", "age").write.save("hdfs://hadoop:8020/data/studentsSave.parquet")
}
}
2.手动制定数据源的类型
也可以手动指定数据源的类型,通常需要全名来指定如org.apache.spark.sql.parquet.但是sparkSql 内置了一些数据类型,比如json,parquet,jdbc.
下面给出java 示例
public class ManuallySpecifyOptions {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("DataFrameStudy").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
DataFrameReader reader=sqlContext.read();
Dataset ds= reader.format("json").load("C:\\Users\\wchen129\\Desktop\\data\\sparkdata\\students.json");
ds.show();
ds.write().save("C:\\Users\\wchen129\\Desktop\\data\\sparkdata\\students.parquet");
sc.close();
}
}
下面给出scala 示例
object ManuallySpecifyOptions {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameStudy").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val reader = sqlContext.read
val ds = reader.format("json").load("C:\\Users\\wchen129\\Desktop\\data\\sparkdata\\students.json")
ds.show()
ds.write.save("C:\\Users\\wchen129\\Desktop\\data\\sparkdata\\students.parquet")
}
}
3save model
Spark SQL 对于save 操作,提供了不同的save mode 主要来处理,当前目标位置已经有数据时,应当如何操作,而且save 不会执行锁操作并不是原子的,友谊有一定的风险出现脏数据
SaveModel.ErrorIfExists(默认) 如果目标位置已经存在数据,就抛出一个异常,
SaveModel.Append 如果目标位置已经存在则追加,
SaveModel.Overwrite 如果目标位置已经存在则覆盖
SaveModel.ignore 如果目标位置已经存在数据则不做任何操作
下面给出java 示例
public class SaveModelTest
{
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("DataFrameStudy").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
DataFrameReader reader=sqlContext.read();
Dataset ds= reader.format("json").load("hdfs://hadoop:8020/data/students.json");
ds.show();
//ds.write().format("json").mode(SaveMode.ErrorIfExists).save("C:\\Users\\wchen129\\Desktop\\data\\sparkdata\\students.json");
// ds.write().format("json").mode(SaveMode.Append).save("hdfs://hadoop:8020/data/students.json");
// ds.write().format("json").mode(SaveMode.Overwrite).save("hdfs://hadoop:8020/data/students.json");
ds.write().format("json").mode(SaveMode.Ignore).save("hdfs://hadoop:8020/data/students.json");
sc.close();
}
}
下面给出scala 示例:
object SaveModelTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameStudy").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val reader = sqlContext.read
val ds = reader.format("json").load("hdfs://hadoop:8020/data/students.json")
ds.show()
//ds.write().format("json").mode(SaveMode.ErrorIfExists).save("C:\\Users\\wchen129\\Desktop\\data\\sparkdata\\students.json");
// ds.write().format("json").mode(SaveMode.Append).save("hdfs://hadoop:8020/data/students.json");
// ds.write().format("json").mode(SaveMode.Overwrite).save("hdfs://hadoop:8020/data/students.json");
ds.write.format("json").mode(SaveMode.Ignore).save("hdfs://hadoop:8020/data/students.json")
}
}
欢迎关注,更多福利
这里写图片描述