Spark RDD练习
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.edu.hust</groupId>
<artifactId>rdd</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.6</scala.version>
<spark.version>1.6.1</spark.version>
<hadoop.version>2.6.4</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
聚合函数练习
具体的代码如下:
import org.apache.spark.{SparkConf, SparkContext}
object aggregateDemo {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("aggregateDemo").setMaster("local[2]")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(Array(1,2,3,4,5,6,7,7,8),2)
//聚合函数第一个值,表示初始值,这里需要传递两个函数,第一个用于表示局部计算的,第二个表示全局计算的
val rdd2=rdd1.aggregate(0)(_+_,_+_)
println(rdd2)
//这里初始值为10,因为有两个分区,所以需要加3次
val rdd3=rdd1.aggregate(10)(_+_,_+_)
println(rdd3)
//如果需要求两个partition的最大值相加,则使用下列函数
val rdd4=rdd1.aggregate(0)(math.max(_,_),_+_)
println(rdd4)
val rdd5=sc.parallelize(List("a","b","c","d","e","f","g"))
//这是一将所有的字符串连接操作,返回的结果不确定,因为是并行计算,不知道哪一个先返回
val rdd6=rdd5.aggregate("")(_+_,_+_)
println(rdd6)
//这里需要考虑""字符串的长度,然后转化为"1"
val rdd7=sc.parallelize(List("a1","b342","c234","d5235","e22","f1",""))
val rdd8=rdd7.aggregate("")((x:String,y:String)=>math.max(x.length,y.length).toString,(x,y)=>x+y)
println(rdd8)
//val rddf9=rdd7.combineByKey()
}
}
计算基站以及定位
基站数据和定位的数据,具体数据如下https://pan.baidu.com/s/1CeCXsSxbVQk8aezKMK5hOw
代码如下:
import org.apache.spark.{SparkConf, SparkContext}
object LocationDemo2 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("loc").setMaster("local[2]");
val sc=new SparkContext(conf)
//计算分组 val rdd1=sc.textFile("/Users/youyujie/Documents/location").map(line=>{
val text=line.split(",")
val time=if(text(3)=="1") -text(1).toLong else text(1).toLong
((text(0),text(2)),time)
})
val locRDD=sc.textFile("/Users/youyujie/Documents/loc").map(line=>{
val text=line.split(",")
(text(0),text(1),text(2))
})
val rdd2=rdd1.reduceByKey(_+_).map(x=>{
(x._1._2,x._1._1,x._2)
})
//rdd2.join() println(locRDD.collect().toBuffer)
}
本地模式运行实例
import org.apache.spark.{SparkConf, SparkContext}
object Test {
def main(args: Array[String]): Unit = {
//设置spark配置文件,开启本地模式,使用两个线程
val conf=new SparkConf().setAppName("RDD").setMaster("local[2]")
//获取spark上下文
val sc=new SparkContext(conf)
//使用parallelize获取rdd,分区数量是2个
val rdd1=sc.parallelize(List(12,3,4,5,6,7,8,9),2)
val func1=(index:Int,it:Iterator[(Int)])=>{
//写一个函数,这里使用标记出每一个元素的分区号然后将list转化为toIterator返回
it.toList.map(x => "[partID:" + index + ", val: " + x + "]").toIterator
}
//可查看mapPartitionsWithIndex的定义,可以看出需要传递一个函数。
val rdd2=rdd1.mapPartitionsWithIndex(func1)
println( rdd2.collect().toBuffer)
}
}