$spark-sql –help 查看帮助命令
$设置任务个数,在这里修改为20个
spark-sql>SET spark.sql.shuffle.partitions=20;
$选择数据库
spark-sql>use siat;
$查询数据表
spark-sql>select * from test;
$使用registerTempTable代替1.0版本的registerAsTable —注册临时表
$sql()将代替hql()来提交查询语句,统一了接口
使用registerTempTable注册表是一个临时表,生命周期只在所定义的sqlContext或hiveContext实例之中。换而言之,在一个sqlContext(或hiveContext)中registerTempTable的表不能在另一个sqlContext(或hiveContext)中使用。
$语法解析器选项spark.sql.dialect:sql语法解析器和hiveql语法解析器。
set spark.sql.dialect=value
SparkSQL1.1对数据的查询分成了2个分支:sqlContext 和 hiveContext。至于两者之间的关系,hiveSQL继承了sqlContext,所以拥有sqlContext的特性之外,还拥有自身的特性(最大的特性就是支持hive)。
Spark1.1.0开始提供了两种方式将RDD转换成SchemaRDD:
通过定义Case Class,使用反射推断Schema(case class方式)
通过可编程接口,定义Schema,并应用到RDD上(applySchema 方式)
前者使用简单、代码简洁,适用于已知Schema的源数据上;后者使用较为复杂,但可以在程序运行过程中实行,适用于未知Schema的RDD上。
对于Case Class方式,首先要定义Case Class,在RDD的Transform过程中使用Case Class可以隐式转化成SchemaRDD,然后再使用registerTempTable注册成表。注册成表后就可以在sqlContext对表进行操作,如select 、insert、join等。注意,case class可以是嵌套的,也可以使用类似Sequences 或 Arrays之类复杂的数据类型。
RDD(Resilient Distributed Datasets)[1] ,弹性分布式数据集, 是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。
$hadoop fs -mkdir /class6
$hadoop fs -copyFromLocal /home/hadoop/upload/class6/people.* /class6
$hadoop fs -ls /
//sqlContext演示
scala>val sqlContext=new org.apache.spark.sql.SQLContext(sc)
scala>import sqlContext.createSchemaRDD
第三步 定义Person类,读入数据并注册为临时表
//RDD1演示
scala>case class Person(name:String,age:Int)
scala>val rddpeople=sc.textFile(“hdfs://hadoop1:9000/class6/people.txt”).map(_.split(“,”)).map(p=>Person(p(0),p(1).trim.toInt))
scala>rddpeople.registerTempTable(“rddTable”)
第四步 在查询年纪在13-19岁之间的人员
scala>sqlContext.sql(“SELECT name FROM rddTable WHERE age >= 13 AND age <= 19”).map(t => “Name: ” + t(0)).collect().foreach(println)
使用applySchema定义RDD演示
applySchema 方式比较复杂,通常有3步过程:
l从源RDD创建rowRDD
l创建与rowRDD匹配的Schema
l将Schema通过applySchema应用到rowRDD
第一步 导入包创建Schema
//导入SparkSQL的数据类型和Row
scala>import org.apache.spark.sql._
//创建于数据结构匹配的schema
scala>val schemaString = “name age”
scala>val schema =
StructType(
schemaString.split(” “).map(fieldName => StructField(fieldName, StringType, true)))
第二步 创建rowRDD并读入数据
//创建rowRDD
scala>val rowRDD = sc.textFile(“hdfs://hadoop1:9000/class6/people.txt”).map(_.split(“,”)).map(p => Row(p(0), p(1).trim))
//用applySchema将schema应用到rowRDD
scala>val rddpeople2 = sqlContext.applySchema(rowRDD, schema)
scala>rddpeople2.registerTempTable(“rddTable2”)
第三步 查询获取数据
scala>sqlContext.sql(“SELECT name FROM rddTable2 WHERE age >= 13 AND age <= 19”).map(t => “Name: ” + t(0)).collect().foreach(println)
parquet演示
同样得,sqlContext可以读取parquet文件,由于parquet文件中保留了schema的信息,所以不需要使用case class来隐式转换。sqlContext读入parquet文件后直接转换成SchemaRDD,也可以将SchemaRDD保存成parquet文件格式。
第一步 保存成parquest格式文件
// 把上面步骤中的rddpeople保存为parquet格式文件到hdfs中
scala>rddpeople.saveAsParquetFile(“hdfs://hadoop1:9000/class6/people.parquet”)
第二步 读入parquest格式文件,注册表parquetTable
//parquet演示
scala>val parquetpeople = sqlContext.parquetFile(“hdfs://hadoop1:9000/class6/people.parquet”)
scala>parquetpeople.registerTempTable(“parquetTable”)
第三步 查询年龄大于等于25岁的人名
scala>sqlContext.sql(“SELECT name FROM parquetTable WHERE age >= 25”).map(t => “Name: ” + t(0)).collect().foreach(println)
json演示
sparkSQL1.1.0开始提供对json文件格式的支持,这意味着开发者可以使用更多的数据源,如鼎鼎大名的NOSQL数据库MongDB等。sqlContext可以从jsonFile或jsonRDD获取schema信息,来构建SchemaRDD,注册成表后就可以使用。
ljsonFile – 加载JSON文件目录中的数据,文件的每一行是一个JSON对象
ljsonRdd – 从现有的RDD加载数据,其中RDD的每个元素包含一个JSON对象的字符串
第一步 上传测试数据
第二步 读取数据并注册jsonTable表
//json演示
scala>val jsonpeople = sqlContext.jsonFile(“hdfs://hadoop1:9000/class6/people.json”)
jsonpeople.registerTempTable(“jsonTable”)
第三步 查询年龄大于等于25的人名
scala>sqlContext.sql(“SELECT name FROM jsonTable WHERE age >= 25”).map(t => “Name: ” + t(0)).collect().foreach(println)
Cache使用
sparkSQL的cache可以使用两种方法来实现:
lCacheTable()方法
lCACHE TABLE命令
千万不要先使用cache SchemaRDD,然后registerAsTable;使用RDD的cache()将使用原生态的cache,而不是针对SQL优化后的内存列存储。
第一步 对rddTable表进行缓存
//cache使用
scala>val sqlContext=new org.apache.spark.sql.SQLContext(sc)
scala>import sqlContext.createSchemaRDD
scala>case class Person(name:String,age:Int)
scala>val rddpeople=sc.textFile(“hdfs://hadoop1:9000/class6/people.txt”).map(_.split(“,”)).map(p=>Person(p(0),p(1).trim.toInt))
scala>rddpeople.registerTempTable(“rddTable”)
scala>sqlContext.cacheTable(“rddTable”)
scala>sqlContext.sql(“SELECT name FROM rddTable WHERE age >= 13 AND age <= 19”).map(t => “Name: ” + t(0)).collect().foreach(println)
第二步 对parquetTable表进行缓存
scala>val parquetpeople = sqlContext.parquetFile(“hdfs://hadoop1:9000/class6/people.parquet”)
scala>parquetpeople.registerTempTable(“parquetTable”)
scala>sqlContext.sql(“CACHE TABLE parquetTable”)
scala>sqlContext.sql(“SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19”).map(t => “Name: ” + t(0)).collect().foreach(println)
第三步 解除缓存
//uncache使用
scala>sqlContext.uncacheTable(“rddTable”)
scala>sqlContext.sql(“UNCACHE TABLE parquetTable”)
DSL演示
SparkSQL除了支持HiveQL和SQL-92语法外,还支持DSL(Domain Specific Language)。在DSL中,使用Scala符号’+标示符表示基础表中的列,Spark的execution engine会将这些标示符隐式转换成表达式。另外可以在API中找到很多DSL相关的方法,如where()、select()、limit()等等,详细资料可以查看Catalyst模块中的DSL子模块,下面为其中定义几种常用方法:
//DSL演示
scala>import sqlContext._
scala>val teenagers_dsl = rddpeople.where(‘age >= 10).where(‘age <= 19).select(‘name)
scala>teenagers_dsl.map(t => “Name: ” + t(0)).collect().foreach(println)
Spark运行架构
Application:Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver 功能的代码和分布在集群中多个节点上运行的Executor代码;
Driver:Spark中的Driver即运行上述Application的main()函数并且创建SparkContext,其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。通常用SparkContext代表Drive;
Executor:Application运行在Worker 节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批Executor。在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于Hadoop MapReduce中的YarnChild。一个CoarseGrainedExecutorBackend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task。每个CoarseGrainedExecutorBackend能并行运行Task的数量就取决于分配给它的CPU的个数了;
Cluster Manager:指的是在集群上获取资源的外部服务,目前有:
Ø Standalone:Spark原生的资源管理,由Master负责资源的分配;
Ø Hadoop Yarn:由YARN中的ResourceManager负责资源的分配;
Worker:集群中任何可以运行Application代码的节点,类似于YARN中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点;
作业(Job):包含多个Task组成的并行计算,往往由Spark Action催生,一个JOB包含多个RDD及作用于相应RDD上的各种Operation;
阶段(Stage):每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;
任务(Task): 被送到某个Executor上的工作任务;
Spark运行基本流程
1. 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2. 资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3. SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
4. Task在Executor上运行,运行完毕释放所有资源。
RDD的特点
1.来源:一种是从持久存储获取数据,另一种是从其他RDD生成
2.只读:状态不可变,不能修改
3.分区:支持元素根据 Key 来分区 ( Partitioning ) ,保存到多个结点上,还原时只会重新计算丢失分区的数据,而不会影响整个系统
4.路径:在 RDD 中叫世族或血统 ( lineage ) ,即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的
5.持久化:可以控制存储级别(内存、磁盘等)来进行持久化
6.操作:丰富的动作 ( Action ) ,如Count、Reduce、Collect和Save 等
RDD基础数据类型
目前有两种类型的基础RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行计算。 Hadoop数据集(Hadoop Datasets):在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统即可。这两种类型的RDD都可以通过相同的方式进行操作,从而获得子RDD等一系列拓展,形成lineage血统关系图。
1. 并行化集合
并行化集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。
Hadoop数据集
Spark可以将任何Hadoop所支持的存储资源转化成RDD,如本地文件(需要网络文件系统,所有的节点都必须能访问到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。
转换与操作
对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)
l转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
l操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
转换
reduce(func) | 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行 |
collect() | 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM |
count() | 返回数据集的元素个数 |
take(n) | 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用) |
first() | 返回数据集的第一个元素(类似于take(1) |
saveAsTextFile(path) | 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本 |
saveAsSequenceFile(path) | 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等) |
foreach(func) | 在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互 |
操作
map(func) | 返回一个新的分布式数据集,由每个原元素经过func函数转换后组成 |
filter(func) | 返回一个新的数据集,由经过func函数后返回值为true的原元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) |
flatMap(func) | 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) |
sample(withReplacement, frac, seed) | 根据给定的随机种子seed,随机抽样出数量为frac的数据 |
union(otherDataset) | 返回一个新的数据集,由原数据集和参数联合而成 |
groupByKey([numTasks]) | 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task |
reduceByKey(func, [numTasks]) | 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集 |
groupWith(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup |
cartesian(otherDataset) | 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。 |
flatMap(func) | 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素 |
SparkSQL调优
Spark是一个快速的内存计算框架,同时是一个并行运算的框架,在计算性能调优的时候,除了要考虑广为人知的木桶原理外,还要考虑平行运算的Amdahl定理。
木桶原理又称短板理论,其核心思想是:一只木桶盛水的多少,并不取决于桶壁上最高的那块木块,而是取决于桶壁上最短的那块。将这个理论应用到系统性能优化上,系统的最终性能取决于系统中性能表现最差的组件。例如,即使系统拥有充足的内存资源和CPU资源,但是如果磁盘I/O性能低下,那么系统的总体性能是取决于当前最慢的磁盘I/O速度,而不是当前最优越的CPU或者内存。在这种情况下,如果需要进一步提升系统性能,优化内存或者CPU资源是毫无用处的。只有提高磁盘I/O性能才能对系统的整体性能进行优化。
Amdahl定理,一个计算机科学界的经验法则,因吉恩·阿姆达尔而得名。它代表了处理器平行运算之后效率提升的能力。并行计算中的加速比是用并行前的执行速度和并行后的执行速度之比来表示的,它表示了在并行化之后的效率提升情况。阿姆达尔定律是固定负载(计算总量不变时)时的量化标准。可用公式:来表示。式中分别表示问题规模的串行分量(问题中不能并行化的那一部分)和并行分量,p表示处理器数量。当时,上式的极限是,其中。这意味着无论我们如何增大处理器数目,加速比是无法高于这个数的。
SparkSQL作为Spark的一个组件,在调优的时候,也要充分考虑到上面的两个原理,既要考虑如何充分的利用硬件资源,又要考虑如何利用好分布式系统的并行计算。
并行性
SparkSQL在集群中运行,将一个查询任务分解成大量的Task分配给集群中的各个节点来运行。通常情况下,Task的数量是大于集群的并行度。shuffle的时候使用了缺省的spark.sql.shuffle.partitions,即200个partition,也就是200个Task.
而实验的集群环境却只能并行3个Task,也就是说同时只能有3个Task保持Running.
这时大家就应该明白了,要跑完这200个Task就要跑200/3=67批次。如何减少运行的批次呢?那就要尽量提高查询任务的并行度。查询任务的并行度由两方面决定:集群的处理能力和集群的有效处理能力。
对于Spark Standalone集群来说,集群的处理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES参数、SPARK_WORKER_CORES参数决定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超过物理机器的实际CPU core;
l集群的有效处理能力是指集群中空闲的集群资源,一般是指使用spark-submit或spark-shell时指定的–total-executor-cores,一般情况下,我们不需要指定,这时候,Spark Standalone集群会将所有空闲的core分配给查询,并且在Task轮询运行过程中,Standalone集群会将其他spark应用程序运行完后空闲出来的core也分配给正在运行中的查询。
综上所述,SparkSQL的查询并行度主要和集群的core数量相关,合理配置每个节点的core可以提高集群的并行度,提高查询的效率。
高效的数据格式
高效的数据格式,一方面是加快了数据的读入速度,另一方面可以减少内存的消耗。高效的数据格式包括多个方面:
数据本地性
分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。
下面是Spark webUI监控Stage的一个图:
PROCESS_LOCAL是指读取缓存在本地节点的数据
NODE_LOCAL是指读取本地节点硬盘数据
ANY是指读取非本地节点数据
通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关。
合适的数据类型
对于要查询的数据,定义合适的数据类型也是非常有必要。对于一个tinyint可以使用的数据列,不需要为了方便定义成int类型,一个tinyint的数据占用了1个byte,而int占用了4个byte。也就是说,一旦将这数据进行缓存的话,内存的消耗将增加数倍。在SparkSQL里,定义合适的数据类型可以节省有限的内存资源。
合适的数据列
对于要查询的数据,在写SQL语句的时候,尽量写出要查询的列名,如Select a,b from tbl,而不是使用Select * from tbl;这样不但可以减少磁盘IO,也减少缓存时消耗的内存。
优的数据存储格式
在查询的时候,最终还是要读取存储在文件系统中的文件。采用更优的数据存储格式,将有利于数据的读取速度。查看SparkSQL的Stage,可以发现,很多时候,数据读取消耗占有很大的比重。对于sqlContext来说,支持 textFiile、SequenceFile、ParquetFile、jsonFile;对于hiveContext来说,支持AvroFile、ORCFile、Parquet File,以及各种压缩。根据自己的业务需求,测试并选择合适的数据存储格式将有利于提高SparkSQL的查询效率。
内存的使用
spark应用程序最纠结的地方就是内存的使用了,也是最能体现“细节是魔鬼”的地方。Spark的内存配置项有不少,其中比较重要的几个是:
SPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,可以充分的利用节点的内存资源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超过节点本身具备的内存容量;
executor-memory,在spark-shell或spark-submit提交spark应用程序时申请使用的内存数量;不要超过节点的SPARK_WORKER_MEMORY;
spark.storage.memoryFraction spark应用程序在所申请的内存资源中可用于cache的比例
spark.shuffle.memoryFraction spark应用程序在所申请的内存资源中可用于shuffle的比例
在实际使用上,对于后两个参数,可以根据常用查询的内存消耗情况做适当的变更。另外,在SparkSQL使用上,有几点建议:
对于频繁使用的表或查询才进行缓存,对于只使用一次的表不需要缓存;
对于join操作,优先缓存较小的表;
要多注意Stage的监控,多思考如何才能更多的Task使用PROCESS_LOCAL;
要多注意Storage的监控,多思考如何才能Fraction cached的比例更多
合适的Task
对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle。
其他的一些建议
优化的方面的内容很多,但大部分都是细节性的内容,下面就简单地提提:
想要获取更好的表达式查询速度,可以将spark.sql.codegen设置为Ture;
对于大数据集的计算结果,不要使用collect() ,collect()就结果返回给driver,很容易撑爆driver的内存;一般直接输出到分布式文件系统中;
对于Worker倾斜,设置spark.speculation=true 将持续不给力的节点去掉;
对于数据倾斜,采用加入部分中间步骤,如聚合后cache,具体情况具体分析;
适当的使用序化方案以及压缩方案;
善于利用集群监控系统,将集群的运行状况维持在一个合理的、平稳的状态;
善于解决重点矛盾,多观察Stage中的Task,查看最耗时的Task,查找原因并改善;
import java.io.IOException import org.apache.spark.{SparkConf, SparkContext} object SparkSQL { // 创建一个表示客户的自定义类 case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String) def main(args:Array[String]):Unit={ System.setProperty("hadoop.home.dir", "D:\\hadoop") //加载hadoop组件 val conf = new SparkConf().setAppName("SQLApp").setMaster("spark://192.168.66.66:7077") .set("spark.executor.memory", "1g") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setJars(Seq("D:\\workspace\\scala\\out\\scala.jar"))//加载远程spark val sparkcontext = new SparkContext(conf); try{ val sqlContext = new org.apache.spark.sql.SQLContext(sparkcontext) // 导入语句,可以隐式地将RDD转化成DataFrame import sqlContext.implicits._ //用数据集文本文件创建一个Customer对象的DataFrame val dfCustomers = sparkcontext.textFile("/usr/app/spark/customer.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF() // 将DataFrame注册为一个表 dfCustomers.registerTempTable("customers") // 显示DataFrame的内容 dfCustomers.show() // 打印DF模式 dfCustomers.printSchema() // 选择客户名称列 dfCustomers.select("name").show() // 选择客户名称和城市列 dfCustomers.select("name", "city").show() // 根据id选择客户 dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show() // 根据邮政编码统计客户数量 dfCustomers.groupBy("zip_code").count().show() // 用sqlContext对象提供的sql方法执行SQL语句。 val custNames = sqlContext.sql("SELECT name FROM customers") // SQL查询的返回结果为DataFrame对象,支持所有通用的RDD操作。 // 可以按照顺序访问结果行的各个列。 custNames.map(t => "Name: " + t(0)).collect().foreach(println) // 用sqlContext对象提供的sql方法执行SQL语句。 val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code") // SQL查询的返回结果为DataFrame对象,支持所有通用的RDD操作。 // 可以按照顺序访问结果行的各个列。 customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println) sparkcontext.stop() } catch { case ex: IOException => println("IO Exception") } finally { println("finally"); } } }