ubuntu pyspark

目的:jieba + python + spark + kafka + streaming

材料….

《ubuntu pyspark》 image.png

sudo gedit/ect/profile

# add jdk
export JAVA_HOME=/home/kean/app/jdk1.8.0_121
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

# added by Anaconda3 installer
export ANACONDA_ROOT=/home/kean/app/anaconda3
export PATH=${ANACONDA_ROOT}/bin:$PATH

# set pyspark driver
export PYSPARK_DRIVER_PYTHON=$ANACONDA_ROOT/bin/ipython notebook
export PYSPARK_PYTHON=$ANACONDA_ROOT/bin/python
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

《ubuntu pyspark》 image.png

完成了第一步!!!!
参考:blog.csdn.net/duxu24/article/details/53587451
pyspark启动后输入wordcount

后面构建本地分布式

# add scala
export SCALA_HOME=/home/kean/app/scala-2.11.8
export PATH=${SCALA_HOME}/bin:$PATH


# add spark
export SPARK_HOME=/home/kean/app/spark-2.2.1-bin-hadoop2.7
export PATH=${SPARK_HOME}/bin:${SPARK_HOME}/sbin:$PATH

# add pyspark'python to python path
export PYTHONPATH=${SPARK_HOME}/python
export PYTHONPATH=${SPARK_HOME}/python:${SPARK_HOME}/python/py4j-0.10.4-src.zip:$PYTHONPATH # 感觉py4j可以去掉

测试py4j的错误

《ubuntu pyspark》 image.png

解决办法:pip install py4j或者SPARK_HOME/python/lib目录下找到,在这个目录下有一个py4j-0.10.4-src.zip的压缩包,把他解压缩放到SPARK_HOME/python/目录下就可以了

《ubuntu pyspark》 image.png

在pycharm中使用:

创建一个project时候,选着编译器anaconda/bin/python,另外下面有两个勾选项,选上这样就可以直接调用anaconda以存在的模块。
http://blog.csdn.net/u010171031/article/details/51849562

# encoding: utf-8
"""
@version: python3.6
@author: ‘kean‘ 
@contact: 
@site: 
@software: PyCharm
@file: xixi.py
@time: 18-1-21 下午4:01
"""

import os
import sys



# Path for spark source folder
# os.environ['SPARK_HOME'] = "/home/kean/app/spark-2.2.1-bin-hadoop2.7"
# You might need to enter your local IP
# os.environ['SPARK_LOCAL_IP']="192.168.2.138"

# Path for pyspark and py4j
# sys.path.append("/home/kean/app/spark-2.2.1-bin-hadoop2.7/python")
# sys.path.append("/home/kean/app/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf

    print("Successfully imported Spark Modules")
except ImportError as e:
    print("Can not import Spark Modules", e)
    sys.exit(1)

sc = SparkContext('local')
words = sc.parallelize(["scala", "java", "hadoop", "spark", "akka"])
print(words.count())

这里并没有使用hadoop,因为我测试streaming从kafka读取数据暂时不涉及到。

kafka读取下数据

如果想在pycharm运行从kafka读取数据脚本,那就需要将下面的jar包放在SPARK——HOME的jars下面,或者submit 提交脚本并添加下面的jar包
spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar

# encoding: utf-8
"""
@version: python3.6
@author: ‘kean‘ 
@contact: 
@site: 
@software: PyCharm
@file: play.py
@time: 18-1-21 下午4:39
"""

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition

# from pysparktest import myFunc

'''
offsetRanges = []  
def storeOffsetRanges(rdd):  
    global offsetRanges  
    offsetRanges = rdd.offsetRanges()  
    return rdd  

def printOffsetRanges(rdd):  
    print(rdd.count())  
    for o in offsetRanges:  
        print("__________________________________")
        print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))
        print("__________________________________")
'''


def start():
    sc = SparkContext("local[2]", "KafkaConsumerTest")
    ssc = StreamingContext(sc, 2)

    zookeeper = "192.168.1.xxx:2181"
    # broker = "192.168.1.xxx:9092"
    topic = "info-news"
    partition = 0
    groupid = "1655"
    start = 0
    topicPartition = TopicAndPartition(topic, partition)
    # fromOffset = {topicPartition: int(start)}

    kafkaStream = KafkaUtils.createStream(ssc, zookeeper, groupid, {topic: 1}, {'auto.offset.reset': 'smallest'})
    # kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams = {"metadata.broker.list": broker}, fromOffsets = fromOffset)
    lines = kafkaStream.map(lambda x: x[1])
    lines.pprint()
    # kafkaStream.pprint()
    ssc.start()
    ssc.awaitTermination()
    '''
    directKafkaStream\
    .transform(storeOffsetRanges)\
    .foreachRDD(printOffsetRanges)
    '''


if __name__ == '__main__':
    start()

《ubuntu pyspark》 image.png

    原文作者:Kean_L_C
    原文地址: https://www.jianshu.com/p/fa5837aff7fe
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞