pyspark学习--dataframe

参考文章:master苏:pyspark系列–pyspark读写dataframe

  1. 创建dataframe

1.1 从变量创建

from pyspark.sql import SparkSession

spark=SparkSession \
        .builder \
        .appName('my_first_app_name') \
        .getOrCreate()

# 生成以逗号分隔的数据
stringCSVRDD = spark.sparkContext.parallelize([
    (123, "Katie", 19, "brown"),
    (234, "Michael", 22, "green"),
    (345, "Simone", 23, "blue")
])
# 指定模式, StructField(name,dataType,nullable)
# 其中:
# name: 该字段的名字,
# dataType:该字段的数据类型,
# nullable: 指示该字段的值是否为空
from pyspark.sql.types import StructType, StructField, LongType, StringType  # 导入类型

schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# 对RDD应用该模式并且创建DataFrame
swimmers = spark.createDataFrame(stringCSVRDD,schema)

# 利用DataFrame创建一个临时视图
swimmers.registerTempTable("swimmers")

# 查看DataFrame的行数
swimmers.count()

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

从RDD、list或pandas.DataFrame创建一个DataFrame。

当schema是列名列表时,将从数据中推断出每个列的类型。

当schema为None时,它将尝试从数据中推断模式(列名和类型),数据应该是Row、namedtuple或dict的RDD。

1.2. 从变量创建

# 使用自动类型推断的方式创建dataframe

data = [(123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")]
df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
df.show()
df.count()

1.3、从dataframe创建

# 如果不指定schema则用pandas的列名
df = pd.DataFrame(np.random.random((4,4)))
spark_df = spark.createDataFrame (df,schema=['a','b','c','d'])

2、 读取、写入json/csv/parquet/hive

# 读取json文件
json_file = r"文件路径"
df = spark.read.json(json_file)
df.show()

# 读取csv文件
monthlySales = spark.read.csv(csv_file, header=True, inferSchema=True)
monthlySales.show()

#写入csv文件
spark_df.write.csv(path=csv_file, header=True, sep=",", mode='overwrite')

#从列式存储的parquet读取
df=spark.read.parquet(parquet——file)
df.show()

#写入parquet
park_df.write.parquet(path=parquet——file,mode='overwrite')

# 如果已经配置spark连接hive的参数,可以直接读取hive数据
spark = SparkSession \
        .builder \
        .enableHiveSupport() \      
        .appName("my_first_app_name") \
        .getOrCreate()

df=spark.sql("select * from hive_tb_name")
df.show()

#写入hive表
# 打开动态分区
spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")

# 使用普通的hive-sql写入分区表
spark.sql("""
    insert overwrite table ai.da_aipurchase_dailysale_hive 
    partition (saledate) 
    select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate 
    from szy_aipurchase_tmp_szy_dailysale distribute by saledate
    """)

# 或者使用每次重建分区表的方式
jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')

# 不写分区表,只是简单的导入到hive表
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)

#从hdfs读取

# 直接读取,不需要指定ip和port
data= spark.read.csv('hdfs:///tmp/_da_exdata_path/data.csv', header=True)
data.show()

# 有些情况下是需要指定ip和端口的
data= spark.read.csv('hdfs://localhost:9000/tmp/_da_exdata_path/data.csv', header=True)
data.show()

# 数据写到hdfs,而且以csv格式保存
jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")

    原文作者:刘叔
    原文地址: https://zhuanlan.zhihu.com/p/57743251
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞