大数据 -【spark入门】

1. 简要说明

基于spark 2.3.1版本学习spark基础知识及整体框架。本文首先以python版为主进行描述,后期会主要针对scala版本进行详细讲解。

2. spark学习环境搭建

  • spark安装包下载地址
http://spark.apache.org/downloads.html
https://archive.apache.org/dist/spark/

作者使用的为spark-2.3.1版本为例进行测试与学习。(之所以不选择最新版本,大家都懂的,最新版本不稳定,会有很多坑要踩,索性选择相对稳定的版本)
  • 环境设置
1. 如想设置为全局环境变量,则可配置到bashrc_profile中
2. 仅为开发调试,直接进入到下载安装包spark-2.3.1-bin-hadoop2.7/bin下指定相关操作的命令即可。
  • 启动spark
    启动python版本spark 客户端命令(./pyspark)
Python 2.7.10 (default, Aug 17 2018, 19:45:58)
[GCC 4.2.1 Compatible Apple LLVM 10.0.0 (clang-1000.0.42)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
2019-02-11 17:57:49 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 2.7.10 (default, Aug 17 2018 19:45:58)
SparkSession available as 'spark'.
>>>
    

到此为止spark学习调试的环境基本搭建完成。

3. 核心概念介绍

  • 首先,
    每个spark应用都由一个驱动器程序发起集群上的各种并行操作。shell终端本身即为实际的驱动器程序。shell启动时自动创建了一个SparkContext对象,其变量叫sc,所以以下的操作都可以基于sc做操作。

  • 其次,驱动器一般管理多个执行器(executor)节点。即在集群模式下执行action操作时,不同的节点会统计不同部分的数据(计算结果)。由于我们在本地模式下执行操作,所以所有的执行任务都会在单节点上运行。

    《大数据 -【spark入门】》 spark集群模式执行过程.png

  • 最后,可通过向spark API传递函数,亦可操作相应的集群上。需要对lambda操作熟悉。如:
>>> lines = sc.textFile("README.md")
>>> lines.filter(lambda line: "Python" in line)
PythonRDD[4] at RDD at PythonRDD.scala:49
>>> lines.filter(lambda line: "Python" in line).count()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Library/Python/2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError

报错原因为:spark默认是从hdfs上都文件的,想要读取本地文件需要增加file://前缀。即:

lambda形式:
lines = sc.textFile("file:///spark-2.3.1-bin-hadoop2.7/README.md")
pyline = lines.filter(lambda line: "Scala" in line)
pyline.count()

函数形式:

def hasScala(line):
    return "Scala" in line
pythonLines = lines.filter(hasPython)
  • 独立应用之运行方式

Java和Scala中,只需要添加Maven依赖,编辑器会自动下载依赖的包。但 python程序运行需要使用spark自带的spark-submit脚本来运行。(脚本中已经帮我们引入了python程序的spark依赖)
例如:

#!/usr/bin/env python
# _*_ coding:utf-8 _*_


import logging
from pyspark import SparkConf, SparkContext

logging.basicConfig(level=logging.ERROR)

conf = SparkConf().setMaster("local").setAppName("myapp")
sc = SparkContext(conf=conf)

contents = sc.textFile("file://absfilepath")
res = contents.filter(lambda line: "Python" in line)
print "*" * 10,res.count()

sc.stop()

运行方式如:

spark-submit test.py

运行spark-submit时会出现很繁琐不易识别的INFO信息,如何过滤掉INFO信息呢?

注意:将rootCategory等级修改为WARN或者ERROR即可。

方法如下:

修改日志过滤等级:【conf/log4j.properties】

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

    原文作者:小哥不才IT
    原文地址: https://www.jianshu.com/p/eb0bd5bd0390
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞