马士兵spark学习

文档地址

http://mashibing.com/wiki/Spark

上传文件

解压

#cd training
#tar -xvf spark-2.1.0-bin-hadoop2.7.tgz
# rm -rf spark-2.1.0-bin-hadoop2.7.tgz
#mv spark-2.1.0-bin-hadoop2.7 spark
#cd spark
#vi /etc/profile

#./bin/spark-submit --class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.11-2.1.0.jar 10000

浏览器:

192.168.56.200:4040

当spark完成之后,页面就拒绝访问了。

使用spark-shell进行交互式提交

#./bin/spark-shell
#vi hello.txt
hello java
hello java c java c
hello java
hello java
hello c++
hello python
hello c++
hello java
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@77648321
scala> sc.textFile("/root/hello.txt")
res1: org.apache.spark.rdd.RDD[String] = /root/hello.txt MapPartitionsRDD[1] at textFile at <console>:25
scala> val lineRdd = sc.textFile("/root/hello.txt")
lineRdd: org.apache.spark.rdd.RDD[String] = /root/hello.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> lineRDD.foreach(println)
hello java
hello java c java c
hello java
hello java
hello c++
hello python
hello c++
hello java
scala> lineRDD.collect
res6: Array[String] = Array(hello java, hello java c java c, hello java, hello java, hello c++, hello python, hello c++, hello java, "")
scala> val wordRDD = lineRDD.map(line =>line.split(" "))
wordRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:26
scala> wordRDD.collect
res7: Array[Array[String]] = Array(Array(hello, java), Array(hello, java, c, java, c), Array(hello, java), Array(hello, java), Array(hello, c++), Array(hello, python), Array(hello, c++), Array(hello, java), Array(""))
scala> val wordRDD = lineRDD.flatMap(line =>line.split(" "))
wordRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at flatMap at <console>:26
scala> wordRDD.collect
res8: Array[String] = Array(hello, java, hello, java, c, java, c, hello, java, hello, java, hello, c++, hello, python, hello, c++, hello, java, "")
scala> wordRDD.foreach(println)
hello
java
hello
java
c
java
c
hello
java
hello
java
hello
c++
hello
python
hello
c++
hello
java
scala> val wordCountRDD = wordRDD.map(word => (word,1))
wordCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:28
scala> wordCountRDD.collect
res10: Array[(String, Int)] = Array((hello,1), (java,1), (hello,1), (java,1), (c,1), (java,1), (c,1), (hello,1), (java,1), (hello,1), (java,1), (hello,1), (c++,1), (hello,1), (python,1), (hello,1), (c++,1), (hello,1), (java,1), ("",1))
scala> val resultRDD = wordCountRDD.reduceByKey((x,y) =>x + y)
resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:30
scala> resultRDD.collect
res11: Array[(String, Int)] = Array((c++,2), (python,1), ("",1), (hello,8), (java,6), (c,2))
scala> val orderedRDD = resultRDD.sortByKey()
orderedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10] at sortByKey at <console>:32
scala> orderedRDD.collect
res12: Array[(String, Int)] = Array(("",1), (c,2), (c++,2), (hello,8), (java,6), (python,1))
scala> sc.textFile("/root/hello.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey(false).saveAsTextFile("/root/result")
#ll
total 839228
-rw-------. 1 root root      1020 Dec  1 03:12 anaconda-ks.cfg
-rw-r--r--. 1 root root 149756462 Dec  7 02:07 apache-hive-2.1.1-bin.tar.gz
-rw-r--r--. 1 root root 214092195 Dec  7 02:06 hadoop-2.7.3.tar.gz
-rw-r--r--. 1 root root 104659474 Dec  7 02:07 hbase-1.2.6-bin.tar.gz
-rw-r--r--. 1 root root        98 Dec  7 06:21 hello.txt
-rw-r--r--. 1 root root 160162581 Dec  7 02:06 jdk-8u91-linux-x64.rpm
drwxr-xr-x. 2 root root        80 Dec  7 06:51 result
-rw-r--r--. 1 root root 195636829 Dec  7 03:00 spark-2.1.0-bin-hadoop2.7.tgz
-rw-r--r--. 1 root root  35042811 Dec  7 02:06 zookeeper-3.4.10.tar.gz
#cd result
#ll
total 4
-rw-r--r--. 1 root root 49 Dec  7 06:51 part-00000
-rw-r--r--. 1 root root  0 Dec  7 06:51 _SUCCESS

# more part-00000
python,1)
(java,6)
(hello,8)
(c++,2)
(c,2)
(,1)

    原文作者:lehuai
    原文地址: https://www.jianshu.com/p/1d8288ce97f4
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞