python读取postgresql并进行spark处理

贴出一段程序,主要是对postgresql进行读取并进行spark处理


def operator(x):

print(x[1])

x[2] =='kao'

    if x[4] =='male':

x[4] =1

    elif x[4] =='female':

x[4] =2

    else:

x[4] =0

    print(x[4])

return x

def tuple_convert_to_list(x):

list1 =list(x)

return list1

if __name__ =='__main__':

conn = psycopg2.connect(host='192.168.0.1',

port=5000,

user='test',

password='test20180910',

database='test')

cursor = conn.cursor()

try:

sql ="select *from unicom_2i_hold_schema.tb_user limit 100000"

        print("sql:"+sql)

cursor.execute(sql)

rows = cursor.fetchall()

print(cursor.rowcount)

print(rows)

print(type(rows))

sc = SparkContext(appName="test")

rdd = sc.parallelize(rows)

print(type(rdd))

rdd2 = rdd.map(tuple_convert_to_list)

rdd3 = rdd2.map(operator)

print(type(rdd2))

print(rdd2.take(100))

print(rdd3.take(1000))

finally:

cursor.close;

    原文作者:Tristan_Feng
    原文地址: https://www.jianshu.com/p/db28b1f6797f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞