map,filter,flatMap算子
视频教程:
1、优酷
2、YouTube
1、map
map是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。
java:
1 package com.bean.spark.trans; 2 3 import java.util.Arrays; 4 import java.util.List; 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaRDD; 7 import org.apache.spark.api.java.JavaSparkContext; 8 import org.apache.spark.api.java.function.Function; 9 /** 10 * 11 * @author RedBean 12 *map 13 */ 14 public class TraMap { 15 public static void main(String[] args) { 16 SparkConf conf = new SparkConf(); 17 conf.setMaster("local"); 18 conf.setAppName("map"); 19 System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6"); 20 JavaSparkContext sc = new JavaSparkContext(conf); 21 List<Integer> number = Arrays.asList(0,1,2,3,4,5,6,7,8,9); 22 JavaRDD<Integer> numberRDD = sc.parallelize(number); 23 JavaRDD<Integer> results = numberRDD.map(new Function<Integer, Integer>() { 24 @Override 25 public Integer call(Integer s) throws Exception { 26 // TODO Auto-generated method stub 27 return s * 5; 28 } 29 }); 30 System.out.println(results.collect()); 31 } 32 }
python:
1 # -*- coding:utf-8 -*- 2 3 4 from __future__ import print_function 5 from pyspark import SparkConf 6 from pyspark import SparkContext 7 import os 8 9 if __name__ == '__main__': 10 os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6' 11 conf = SparkConf().setAppName('mapTest').setMaster('local') 12 sc = SparkContext(conf=conf) 13 data = sc.parallelize([1,2,3,4,5,6]) 14 def myMap(l): 15 return l * 5 16 print(data.map(myMap).collect())
2、filter
返回一个新的数据集,由经过func函数后返回值为true的原元素组成
java:
1 package com.bean.spark.trans; 2 3 import java.util.Arrays; 4 import java.util.List; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 import org.apache.spark.api.java.function.Function; 10 11 public class TraFilter { 12 public static void main(String[] args) { 13 SparkConf conf = new SparkConf(); 14 conf.setMaster("local"); 15 conf.setAppName("filter"); 16 System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6"); 17 JavaSparkContext sc = new JavaSparkContext(conf); 18 List<Integer> number = Arrays.asList(0,1,2,3,4,5,6,7,8,9); 19 JavaRDD<Integer> numberRDD = sc.parallelize(number); 20 JavaRDD<Integer> results = numberRDD.filter(new Function<Integer, Boolean>() { 21 22 @Override 23 public Boolean call(Integer s) throws Exception { 24 // TODO Auto-generated method stub 25 return s % 2 == 0; 26 } 27 }); 28 System.out.println(results.collect()); 29 } 30 }
python:
1 # -*- coding:utf-8 -*- 2 3 4 from __future__ import print_function 5 from pyspark import SparkConf 6 from pyspark import SparkContext 7 import os 8 9 if __name__ == '__main__': 10 os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6' 11 conf = SparkConf().setAppName('filterTest').setMaster('local') 12 sc = SparkContext(conf=conf) 13 data = sc.parallelize([1,2,3,4,5,6]) 14 def filterFun(l): 15 return l > 2 16 print(data.filter(filterFun).collect())
3、flatMap
将一条 rdd数据使用你定义的函数给分解成多条 rdd数据。
java:
1 package com.bean.spark.trans; 2 3 import java.util.Arrays; 4 import java.util.Iterator; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 import org.apache.spark.api.java.function.FlatMapFunction; 10 11 public class TraFlatMap { 12 public static void main(String[] args) { 13 SparkConf conf = new SparkConf(); 14 conf.setMaster("local"); 15 conf.setAppName("FlatMap"); 16 System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6"); 17 JavaSparkContext sc = new JavaSparkContext(conf); 18 JavaRDD<String> context = sc.textFile("D:/tools/data/flatMap/flatMap.txt"); 19 JavaRDD<String> results = context.flatMap(new FlatMapFunction<String, String>() { 20 @Override 21 public Iterator<String> call(String s) throws Exception { 22 // TODO Auto-generated method stub 23 return Arrays.asList(s).iterator(); 24 } 25 }); 26 System.out.println(results.collect()); 27 28 } 29 }
python:
1 # -*- coding:utf-8 -*- 2 3 4 from __future__ import print_function 5 from pyspark import SparkConf 6 from pyspark import SparkContext 7 import os 8 9 if __name__ == '__main__': 10 os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6' 11 conf = SparkConf().setAppName('filterTest').setMaster('local') 12 sc = SparkContext(conf=conf) 13 data = sc.parallelize(["Hello World","Spark Hadoop Storm","java python c"]) 14 def flatFun(l): 15 return l.split(" ") 16 print(data.flatMap(flatFun).collect())