测试数据 sparkStu.text
zhangxs 24 chenxy wangYr 21 teacher wangx 26 teacher
sparksql
{ "name":"zhangxs","age":24,"job":"chengxy", "name":"li","age":21,"job":"teacher", "name":"tao","age":14,"job":"student" }
object CreateDataFream { //创建student对象 case class Student(name:String,age:BigInt,job:String); def main(args: Array[String]){ //初始化sparkSession 这个sparkSession要用val关键字修饰 val spark = SparkSession .builder() .appName("Spark SQL Example") .master("spark://服务器ip:7077") .getOrCreate(); // runDataSetCreate(spark); // runSarkOnFile(spark); // applySchema(spark); //loadParquet(spark); //jsonFile(spark); //销毁sparkSession spark.stop(); } }
//对指定的列进行查询 private def test1(spark :SparkSession){ //因为要使用变量,$符号,所以导入这个包 import spark.implicits._ //从hdfs上读取json数据文件并创建dataFream var dataFreamS= spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql"); //显示dataFream所有数据 dataFreamS.show(); //打印dataFrame结构 dataFreamS.printSchema(); //显示指定列的数据 dataFreamS.select("name").show() //查询指定的列,并修改数据 dataFreamS.select($"name", $"age"+1).show(); //查询年龄大于10的人 dataFreamS.select($"age" > 10).show(); //查看每个年龄段的人数 dataFreamS.groupBy("age").count(); //创建临时视图,如果这个视图已经存在就覆盖掉 dataFreamS.createOrReplaceTempView("zhangxsView"); }
//创建dataFrame并运行 private def runDataSetCreate(spark:SparkSession){ import spark.implicits._ //创建DataSets对象 类型是Student val dataStu = Seq(Student("Andy", 32,"baiLing")).toDS(); //显示数据集信息 dataStu.show(); //创建数据的dataSet var dataArr=Seq(1,2,3).toDS(); //显示数据集的信息 dataArr.show(); //对属性进行简单操作 print(dataArr.map (_ +1 ).collect()); //dataFrame能够被转换成自定义对象类型的dataSet, val dfStu=spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql").as[Student]; dfStu.show(); //jsonFile支持嵌套表,读入并注册成表 spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql").registerTempTable("student"); //根据sql查询注册的table val temsql=spark.sqlContext.sql("select name from student"); //显示name的value print(temsql.show()) }
//从hdfs上读取数据文件并转为student对象进行操作 private def runSarkOnFile(spark:SparkSession){ import spark.implicits._ //读取数据文件 并生成rdd var rdd=spark.read.textFile("hdfs://服务器ip:8020/tmp/dataTest/sparkStu.txt"); //对获取的rdd进行解析,并生成sutdent对象 var sturdd=rdd.map { x => x.split(" ")}.map { z => Student(z(0).toString(),z(1).toInt,z(2).toString())}; //显示student对象 sturdd.show(); //将sutdent对象注册成临时表 student sturdd.registerTempTable("student"); //查询临时表中的数据,并显示 var sqlDF=spark.sql("select t.name,t.age,t.job from friend t where t.age>14 and t.age<26"); sqlDF.show(); }
private def applySchema(spark:SparkSession){ import spark.implicits._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ //确定schema名称(列的名称) var schemaString="name,age,job"; //解析schemaString,并生成StructType对象数组 var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)}) //从hdfs上读取数据文件 var stuDS=spark.sparkContext.textFile(path); //使用Row对象,创建rowRdd var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2))) //创建schemaRDD var rowDF=spark.createDataFrame(sDS, schemaType); // var rowDF=spark.sqlContext.applySchema(sDS, schemaType); 这种方法已经过时 //打印schemaRDD的结构 rowDF.printSchema(); //注册Student table rowDF.createOrReplaceTempView("Student"); // rowDF.registerTempTable("Student"); 这种方法已经过时 //rowDF.collect().foreach {print(_) } //var resDS=spark.sql("select * from Student where age > 24"); var resDS=spark.sql("select name from Student"); resDS.show(); }
//使用parquet文件的方式 private def loadParquet(spark:SparkSession){ import spark.implicits._ //确定schema 列名称 var schemaString="name,age,job"; //解析schemaString,并生成StructType对象数组 var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)}) //创建rowRdd var stuDS=spark.sparkContext.textFile(path); var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2))) //将schemaRDD保存成parquet文件 var rowDF=spark.sqlContext.applySchema(sDS, schemaType); //将文件写到hdfs://服务器ip:8020/tmp/dataTest/ rowDF.write.parquet("hdfs://服务器ip:8020/tmp/dataTest/student.parquet"); ------------------------------------------------------------------- //读取parquet文件 var redParfile=spark.read.parquet("hdfs://服务器ip:8020/tmp/dataTest/student.parquet"); redParfile.createOrReplaceTempView("redParfilered"); var resultRdd=spark.sql("select * from redParfilered t where t.name='zhangxs'"); //DataFrame.rdd 可以将dataFrame转为RDD类型 resultRdd.rdd.map { x => "name"+x(0) }.collect().foreach { print(_) } } /** * spark可以自动的识别一个json模式并加载成数据集, * 这种转换可以使用SparkSession.read.json() 函数 * 这个数据集的来源可以是一个rdd,也可以是一个json文件 * */ private def jsonFile(spark:SparkSession){ var jsonRdd=spark.read.json("hdfs://192.168.177.124:8020/tmp/dataTest/sparksql"); jsonRdd.createOrReplaceTempView("student"); var jfRdd= spark.sql("select * from student t where t.age >24"); jfRdd.show();
/** * 使用Json类型的rdd加载json * * 如果加:: Nil,返回是一个char类型的rdd,加上则返回的是String类型的rdd */ var rdd=spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil); var rddre=spark.read.json(rdd); rddre.show(); }