(1)背景
目前使用Python读取Hive表,解析转换之后并发插入Redis,使用fetchone方法读取速度较慢,Python转换处理的速度也较慢。所以需要优化插入Redis的流程。
考虑使用SparkSQL读取数据插入Redis。
(2)优化思路步骤
1)首先使用collection_list批量处理每个字段。
测试的时候报错,数据量到达上限的时候,会OOM或者报连接Redis失败。后增加时间维度拆分数据,并使用SparkSQL的foreachPartition读取每个Partition内的数据,批量并发插入Redis。
也测试过使用GROUPING SETS处理数据,一次性处理的数据量过大,报OOM。
2)使用SparkSQL的特性
使用registerDataFrameAsTable方法将Hive表注册成表,并使用cacheTable方法将此表cache在内存。这样做的目的是减少每次连接Hive读取每个字段数据的时间。测试效果较好。
3)使用Redis的批量处理特性
对于无序集合set使用sadd方法,有序集合sorted set使用zadd方法。并使用了Redis的pipeline,pipeline是redis在提供单个请求中缓冲多条服务器命令的基类的子类,它通过减少服务器-客户端之间反复的TCP数据库包,从而大大提高了执行批量命令的功能。
(3)error列举
1)ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 4.6 GB of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
增加spark.yarn.executor.memoryOverhead。
2)ConnectionError: Error 104 while writing to socket. Connection reset by peer.
数据量较大,分拆数据
3)py4j.protocol.Py4JJavaError: An error occurred while calling o21.cacheTable.
: java.lang.AssertionError: assertion failed: No plan for MetastoreRelation default
SQLContext读不到hive里面的数据,使用HiveContext将表cache在内存,并使用HiveContext读取
被cache的表。
4)在生产环境中执行作业时候,某个task会卡住,后发现每次作业在某个节点执行的时候都会卡住不动,并且不报错。使用–master=local[*] 测试不会卡住。后查出因为每个节点的Spark版本不同。