参考文章:master苏:pyspark系列–pyspark读写dataframe
- 创建dataframe
1.1 从变量创建
from pyspark.sql import SparkSession
spark=SparkSession \
.builder \
.appName('my_first_app_name') \
.getOrCreate()
# 生成以逗号分隔的数据
stringCSVRDD = spark.sparkContext.parallelize([
(123, "Katie", 19, "brown"),
(234, "Michael", 22, "green"),
(345, "Simone", 23, "blue")
])
# 指定模式, StructField(name,dataType,nullable)
# 其中:
# name: 该字段的名字,
# dataType:该字段的数据类型,
# nullable: 指示该字段的值是否为空
from pyspark.sql.types import StructType, StructField, LongType, StringType # 导入类型
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])
# 对RDD应用该模式并且创建DataFrame
swimmers = spark.createDataFrame(stringCSVRDD,schema)
# 利用DataFrame创建一个临时视图
swimmers.registerTempTable("swimmers")
# 查看DataFrame的行数
swimmers.count()
createDataFrame
(data, schema=None, samplingRatio=None, verifySchema=True)
从RDD、list或pandas.DataFrame创建一个DataFrame。
当schema是列名列表时,将从数据中推断出每个列的类型。
当schema为None时,它将尝试从数据中推断模式(列名和类型),数据应该是Row、namedtuple或dict的RDD。
1.2. 从变量创建
# 使用自动类型推断的方式创建dataframe
data = [(123, "Katie", 19, "brown"),
(234, "Michael", 22, "green"),
(345, "Simone", 23, "blue")]
df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
df.show()
df.count()
1.3、从dataframe创建
# 如果不指定schema则用pandas的列名
df = pd.DataFrame(np.random.random((4,4)))
spark_df = spark.createDataFrame (df,schema=['a','b','c','d'])
2、 读取、写入json/csv/parquet/hive
# 读取json文件
json_file = r"文件路径"
df = spark.read.json(json_file)
df.show()
# 读取csv文件
monthlySales = spark.read.csv(csv_file, header=True, inferSchema=True)
monthlySales.show()
#写入csv文件
spark_df.write.csv(path=csv_file, header=True, sep=",", mode='overwrite')
#从列式存储的parquet读取
df=spark.read.parquet(parquet——file)
df.show()
#写入parquet
park_df.write.parquet(path=parquet——file,mode='overwrite')
# 如果已经配置spark连接hive的参数,可以直接读取hive数据
spark = SparkSession \
.builder \
.enableHiveSupport() \
.appName("my_first_app_name") \
.getOrCreate()
df=spark.sql("select * from hive_tb_name")
df.show()
#写入hive表
# 打开动态分区
spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")
# 使用普通的hive-sql写入分区表
spark.sql("""
insert overwrite table ai.da_aipurchase_dailysale_hive
partition (saledate)
select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate
from szy_aipurchase_tmp_szy_dailysale distribute by saledate
""")
# 或者使用每次重建分区表的方式
jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')
# 不写分区表,只是简单的导入到hive表
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)
#从hdfs读取
# 直接读取,不需要指定ip和port
data= spark.read.csv('hdfs:///tmp/_da_exdata_path/data.csv', header=True)
data.show()
# 有些情况下是需要指定ip和端口的
data= spark.read.csv('hdfs://localhost:9000/tmp/_da_exdata_path/data.csv', header=True)
data.show()
# 数据写到hdfs,而且以csv格式保存
jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")