Spark RDD练习

Spark RDD练习

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
    <modelVersion>4.0.0</modelVersion>  
  
    <groupId>cn.edu.hust</groupId>  
    <artifactId>rdd</artifactId>  
    <version>1.0-SNAPSHOT</version>  
  
    <properties>  
        <maven.compiler.source>1.7</maven.compiler.source>  
        <maven.compiler.target>1.7</maven.compiler.target>  
        <encoding>UTF-8</encoding>  
        <scala.version>2.10.6</scala.version>  
        <spark.version>1.6.1</spark.version>  
        <hadoop.version>2.6.4</hadoop.version>  
    </properties>  
  
    <dependencies>  
        <dependency>  
            <groupId>org.scala-lang</groupId>  
            <artifactId>scala-library</artifactId>  
            <version>${scala.version}</version>  
        </dependency>  
  
        <dependency>  
            <groupId>org.apache.spark</groupId>  
            <artifactId>spark-core_2.10</artifactId>  
            <version>${spark.version}</version>  
        </dependency>  
  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-client</artifactId>  
            <version>${hadoop.version}</version>  
        </dependency>  
    </dependencies>  
  
    <build>  
        <sourceDirectory>src/main/scala</sourceDirectory>  
        <testSourceDirectory>src/test/scala</testSourceDirectory>  
        <plugins>  
            <plugin>  
                <groupId>net.alchim31.maven</groupId>  
                <artifactId>scala-maven-plugin</artifactId>  
                <version>3.2.2</version>  
                <executions>  
                    <execution>  
                        <goals>  
                            <goal>compile</goal>  
                            <goal>testCompile</goal>  
                        </goals>  
                        <configuration>  
                            <args>  
                                <arg>-make:transitive</arg>  
                                <arg>-dependencyfile</arg>  
                                <arg>${project.build.directory}/.scala_dependencies</arg>  
                            </args>  
                        </configuration>  
                    </execution>  
                </executions>  
            </plugin>  
  
            <plugin>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-shade-plugin</artifactId>  
                <version>2.4.3</version>  
                <executions>  
                    <execution>  
                        <phase>package</phase>  
                        <goals>  
                            <goal>shade</goal>  
                        </goals>  
                        <configuration>  
                            <filters>  
                                <filter>  
                                    <artifact>*:*</artifact>  
                                    <excludes>  
                                        <exclude>META-INF/*.SF</exclude>  
                                        <exclude>META-INF/*.DSA</exclude>  
                                        <exclude>META-INF/*.RSA</exclude>  
                                    </excludes>  
                                </filter>  
                            </filters>  
                        </configuration>  
                    </execution>  
                </executions>  
            </plugin>  
        </plugins>  
    </build>  
  
  
</project>  

聚合函数练习

具体的代码如下:

import org.apache.spark.{SparkConf, SparkContext}  
  
object aggregateDemo {  
  def main(args: Array[String]): Unit = {  
    val conf=new SparkConf().setAppName("aggregateDemo").setMaster("local[2]")  
    val sc=new SparkContext(conf)  
    val rdd1=sc.parallelize(Array(1,2,3,4,5,6,7,7,8),2)  
    //聚合函数第一个值,表示初始值,这里需要传递两个函数,第一个用于表示局部计算的,第二个表示全局计算的  
    val rdd2=rdd1.aggregate(0)(_+_,_+_)  
    println(rdd2)  
    //这里初始值为10,因为有两个分区,所以需要加3次  
    val rdd3=rdd1.aggregate(10)(_+_,_+_)  
    println(rdd3)  
    //如果需要求两个partition的最大值相加,则使用下列函数  
    val rdd4=rdd1.aggregate(0)(math.max(_,_),_+_)  
    println(rdd4)  
  
    val rdd5=sc.parallelize(List("a","b","c","d","e","f","g"))  
    //这是一将所有的字符串连接操作,返回的结果不确定,因为是并行计算,不知道哪一个先返回  
    val rdd6=rdd5.aggregate("")(_+_,_+_)  
    println(rdd6)  
    //这里需要考虑""字符串的长度,然后转化为"1"  
    val rdd7=sc.parallelize(List("a1","b342","c234","d5235","e22","f1",""))  
    val rdd8=rdd7.aggregate("")((x:String,y:String)=>math.max(x.length,y.length).toString,(x,y)=>x+y)  
    println(rdd8)  
  
    //val rddf9=rdd7.combineByKey()  
  }  
}  

计算基站以及定位

基站数据和定位的数据,具体数据如下https://pan.baidu.com/s/1CeCXsSxbVQk8aezKMK5hOw

代码如下:

import org.apache.spark.{SparkConf, SparkContext}  
  
object LocationDemo2 {  
  def main(args: Array[String]): Unit = {  
    val conf=new SparkConf().setAppName("loc").setMaster("local[2]");  
    val sc=new SparkContext(conf)  
    //计算分组     val rdd1=sc.textFile("/Users/youyujie/Documents/location").map(line=>{  
      val text=line.split(",")  
      val time=if(text(3)=="1") -text(1).toLong else text(1).toLong  
      ((text(0),text(2)),time)  
    })  
    val locRDD=sc.textFile("/Users/youyujie/Documents/loc").map(line=>{  
      val text=line.split(",")  
      (text(0),text(1),text(2))  
    })  
    val rdd2=rdd1.reduceByKey(_+_).map(x=>{  
      (x._1._2,x._1._1,x._2)  
    })  
    //rdd2.join()     println(locRDD.collect().toBuffer)  
  }  

本地模式运行实例

import org.apache.spark.{SparkConf, SparkContext}  
  
object Test {  
  def main(args: Array[String]): Unit = {  
    //设置spark配置文件,开启本地模式,使用两个线程  
    val conf=new SparkConf().setAppName("RDD").setMaster("local[2]")  
    //获取spark上下文  
    val sc=new SparkContext(conf)  
    //使用parallelize获取rdd,分区数量是2个  
    val rdd1=sc.parallelize(List(12,3,4,5,6,7,8,9),2)  
  
    val func1=(index:Int,it:Iterator[(Int)])=>{  
      //写一个函数,这里使用标记出每一个元素的分区号然后将list转化为toIterator返回  
        it.toList.map(x => "[partID:" +  index + ", val: " + x + "]").toIterator  
    }  
    //可查看mapPartitionsWithIndex的定义,可以看出需要传递一个函数。  
    val rdd2=rdd1.mapPartitionsWithIndex(func1)  
    println( rdd2.collect().toBuffer)  
  }  
}  

    原文作者:Oeljeklaus
    原文地址: https://zhuanlan.zhihu.com/p/37619389
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞