批量加载-Bulk Load
在工作过程中有个需求,需要将DataFrame的数据保存进Hbase,并且在Spark集群并没有安装Hbase,此时对于常规的使用put将DataFrame加载进Hbase的方式不在适用,一方面是没有Hbase,另一方面是数据量比较大,通过Put加载数据太慢。
为了实现自己的需求,测试了两个方案
通过hive关联Hbase将数据加载进去
大致步骤
在之前已经有一篇博文是关于hive关联Hbase的,再次仅仅简述处理过程
1.首先将DataFrame进行处理后保存成文件resultFile
2.根据DataFrame字段建立一张Hive临时表tmp_table1
3.将resultFile通过load data 导入临时表tmp_table1
4.建立一张关联Hbase的表hive_hbase_table
5.通过insert into 将tmp_table1的数据插入hive_hbase_table
此时,实现了将DataFrame存入Hbase的目的,如果Spark与Hbase不在同一个集群,那么两个集群之间只需要传输hive底层文件即可。
此方式的优点和缺点
优点:
1.直接保存为hive底层文件(即DataFrame直接写文件,对于里面的字段不需要过多处理)
2.由于hive表关联了Hbase,我们可以通过hive直接查询Hbase的数据(方便对数据进行二次加工)
缺点:
1.需要进行两次数据加载:
第一次加载是根据因为是load速度还很快,基本可以忽略不计;
但是第二第insert into 会触发mapreduce,执行时间与数据量的大小关系很大。
2.耗费额外的空间:
临时表tmp_table1仅仅是作为一个中转站,但是还是会消耗磁盘空间,这种方式消耗磁盘空间的大小接近最终结果的两倍。
具体参考此篇文章
通过HFile 直接将数据导入Hbase
由于客户对数据导入的速度有一定要求,所以第一个方案成为备选方案。于是在查询资料的情况下,找到了第二个方案。
将DataFrame先保存成HFile,然后通过命令直接将文件HFile load进对应的Hbase表。
接下来将详细记录我在这个过程中的踩坑,脱坑历程。
1.第一坑:包找不到
参考了众多博文(其实都是一样的),有几个方案本想来参考一下的,但是自己用的时候发现有些包根本就找不到。于是又花了点时间去看看对应的类在哪个包,依赖是啥。
在此处给出我所使用的相关包依赖(sbt的,maven对应改一下基本上就可以了)
//hbase的几个包必须要有,否则有些关键的类用不了
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
2.第二坑:需不需要配置Hbase参数?
因为我们有三个环境,开发环境是有Hbase的,生产没有,而最终数据又要放到第三个集群,所以Hbase 的有无对结果的影响还是比较纠结。
经过测试,如果单纯的在Spark里面生成HFile,则不需要配置额外的信息,直接默认即可,
但是如果在Spark生成HFile向直接导入Hbase则需要配置相关信息。详细的情况后续说明。
3.第三坑:HFile在代码里面load不进去?
一开始我想的是在生成HFile后直接导入Hbase(单纯想测试的),可是HFile死活load不进去,一直出现一下的信息:
18/10/13 15:12:41 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info/5eab4cb95552481faf538552f848b038 first=752-\xE6\x83\xA0\xE5\xB7\x9E last=752-\xE6\x83\xA0\xE5\xB7\x9E
18/10/13 15:12:41 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info/5b39a5e9779e499bb6817affc94c51a9 first=662-\xE9\x98\xB3\xE6\xB1\x9F last=662-\xE9\x98\xB3\xE6\xB1\x9F
18/10/13 15:12:41 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info/2e3334a0a8094b899fb6cef2195becd5 first=751-\xE9\x9F\xB6\xE5\x85\xB3 last=751-\xE9\x9F\xB6\xE5\x85\xB3
18/10/13 15:12:41 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info/e1ebb7ab430e47dfbc05938e037c00de first=759-\xE6\xB9\x9B\xE6\xB1\x9F last=759-\xE6\xB9\x9B\xE6\xB1\x9F
18/10/13 15:23:55 INFO client.RpcRetryingCaller: Call exception, tries=10, retries=35, started=673838 ms ago, cancelled=false, msg=row '' on table 'iptv:spark_test' at region=iptv:spark_test,,1539413066165.047ff19285b4360a8cf82bdade12a465., hostname=iptve2e06,60020,1539323044523, seqNum=2
18/10/13 15:25:10 INFO client.RpcRetryingCaller: Call exception, tries=11, retries=35, started=748954 ms ago, cancelled=false, msg=row '' on table 'iptv:spark_test' at region=iptv:spark_test,,1539413066165.047ff19285b4360a8cf82bdade12a465., hostname=iptve2e06,60020,1539323044523, seqNum=2
找了很久也没有找到解决方法(我猜测可能是读写权限问题,因为我读取数据是没问题的),最后只能在外面执行命令导入数据了。
虽然这个直接在代码处理是不符合我的需求的,但是这个问题没搞定很不爽(后期找到解决方案在发出来)
4.第四坑:只能处理一列数据
我的DataFrame最终的存储目标是一个列族,多个列的,但是目前在网上和官网的都是只能处理单个列族,单个列,与我自己的需求相差甚远。在网上找到一个方案可以实现一列族、多列但是需要修改源代码,这个由于时间关系就放弃。
最后采取一个折中的办法,就是每次处理一列最后合并一下,很蠢的办法,但是目前只想到这个。
在我的另一篇博文Spark:DataFrame写HFile (Hbase)一个列族、一个列扩展一个列族、多个列中实现了多列处理
5.第五坑:用错导入数据的命令
在直接在代码load数据失败后,就寻求其他的办法来load数据
在看了hbase的官方文档后,知道还可以在外部用hbase的命令将数据load进去
//在Hbase官方文档找到的命令
hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile iptv:spark_test
//在某篇博文里面找到的命令
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile iptv:spark_test
当时用了官方文档的命令 直接报找不到包,整个人瞬间不好了。
后面再去搜相关资料看到别人用的命令是下面这个,试了一下居然成功了,在Hbase也能查到对应的数据了,至此,才算是完成了整个流程测试。
实现代码
1.直接在代码里面处理load数据流程
注:这个最后一步我没有走通,问题还在找。
package com.iptv.job.basedata
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import com.iptv.domain.DatePattern
import com.iptv.job.JobBase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author 利伊奥克儿-lillcol
* 2018/10/12-15:58
*
*/
object HbaseHFileTest {
var hdfsPath: String = ""
var proPath: String = ""
var DATE: String = ""
var conf: Configuration = null
val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
val sc: SparkContext = new SparkContext(sparkConf)
val sqlContext: SQLContext = getSQLContext(sc)
import sqlContext.implicits._
def main(args: Array[String]): Unit = {
val date: Date = getCommandParam(args)
hdfsPath = args(0)
proPath = args(1)
DATE = dateHelper.getDateFormatStr(date, DatePattern.DATE_YMD)
//获取测试DataFrame
val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)
val resultDataFrame: DataFrame = dim_sys_city_dict
.select(concat($"city_id", lit("-"), $"city_name").as("key"), $"city_id", $"city_name")
//hbase的表名
val tableName = "iptv:spark_test"
//hbase的参数配置
conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", "n1,n2,n3")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.master", "n1:60000")
conf.set("hbase.zookeeper.master", "n1:60000")
lazy val job = Job.getInstance(conf)
val con: Connection = ConnectionFactory.createConnection(conf)
//设置MapOutput Key Value 的数据类型
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
// val table: Table =new HTable(conf,tableName) //这个方法被 deprecated 采用下面的方式
val table: Table = con.getTable(TableName.valueOf(tableName))
val cf: Array[Byte] = "cf_info".getBytes //列族
//对第一个列处理
val result1: RDD[(ImmutableBytesWritable, KeyValue)] = resultDataFrame
.map(row => {
//将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
val rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
val clounmVale: Array[Byte] = "city_id".getBytes //列的名称
val value: Array[Byte] = Bytes.toBytes(row.getAs[String]("city_id")) //列的值
val kv: KeyValue = new KeyValue(rowkey, cf, clounmVale, value) //封装一下 rowkey, cf, clounmVale, value
(new ImmutableBytesWritable(rowkey), kv) //生成 KeyValue 这里每次只能处理一列(后续看看有没办法处理多列)
})
//对第而个列处理
val result2: RDD[(ImmutableBytesWritable, KeyValue)] = resultDataFrame
.map(row => {
//将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
val rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
val clounmVale: Array[Byte] = "city_name".getBytes //列的名称
val value: Array[Byte] = Bytes.toBytes(row.getAs[String]("city_name")) //列的值
val kv: KeyValue = new KeyValue(rowkey, cf, clounmVale, value) //封装一下 rowkey, cf, clounmVale, value
(new ImmutableBytesWritable(rowkey), kv) //生成 KeyValue 这里每次只能处理一列(后续看看有没办法处理多列)
})
//保存 到hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile"
//union两个数据
result1
.union(result2)
.sortBy(x => x._1, true) //要保持 key 整理有序
.saveAsNewAPIHadoopFile("hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile",
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
job.getConfiguration)
// 将保存在临时文件夹的hfile数据保存到hbase中
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(new Path("hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile"), table.asInstanceOf[HTable])
}
}
2.在外面通过命令load数据
package com.iptv.job.basedata
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import com.iptv.domain.DatePattern
import com.iptv.job.JobBase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author 利伊奥克儿-lillcol
* 2018/10/12-15:58
*
*/
object HbaseHFileTest {
var hdfsPath: String = ""
var proPath: String = ""
var DATE: String = ""
var conf: Configuration = null
val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
val sc: SparkContext = new SparkContext(sparkConf)
val sqlContext: SQLContext = getSQLContext(sc)
import sqlContext.implicits._
def main(args: Array[String]): Unit = {
val date: Date = getCommandParam(args)
hdfsPath = args(0)
proPath = args(1)
DATE = dateHelper.getDateFormatStr(date, DatePattern.DATE_YMD)
//获取测试DataFrame
val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)
val resultDataFrame: DataFrame = dim_sys_city_dict
.select(concat($"city_id", lit("-"), $"city_name").as("key"), $"city_id", $"city_name")
//hbase的表名
val tableName = "iptv:spark_test"
//hbase的参数配置
conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
lazy val job = Job.getInstance(conf)
val con: Connection = ConnectionFactory.createConnection(conf)
//设置MapOutput Key Value 的数据类型
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
// val table: Table =new HTable(conf,tableName) //这个方法被 deprecated 采用下面的方式
val table: Table = con.getTable(TableName.valueOf(tableName))
val cf: Array[Byte] = "cf_info".getBytes //列族
//对第一个列处理
val result1: RDD[(ImmutableBytesWritable, KeyValue)] = resultDataFrame
.map(row => {
//将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
val rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
val clounmVale: Array[Byte] = "city_id".getBytes //列的名称
val value: Array[Byte] = Bytes.toBytes(row.getAs[String]("city_id")) //列的值
val kv: KeyValue = new KeyValue(rowkey, cf, clounmVale, value) //封装一下 rowkey, cf, clounmVale, value
(new ImmutableBytesWritable(rowkey), kv) //生成 KeyValue 这里每次只能处理一列(后续看看有没办法处理多列)
})
//对第而个列处理
val result2: RDD[(ImmutableBytesWritable, KeyValue)] = resultDataFrame
.map(row => {
//将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
val rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
val clounmVale: Array[Byte] = "city_name".getBytes //列的名称
val value: Array[Byte] = Bytes.toBytes(row.getAs[String]("city_name")) //列的值
val kv: KeyValue = new KeyValue(rowkey, cf, clounmVale, value) //封装一下 rowkey, cf, clounmVale, value
(new ImmutableBytesWritable(rowkey), kv) //生成 KeyValue 这里每次只能处理一列(后续看看有没办法处理多列)
})
//s"保存 到hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile")
//union两个数据
result1
.union(result2)
.sortBy(x => x._1, true) //要保持 key 整理有序
.saveAsNewAPIHadoopFile("hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile",
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
job.getConfiguration)
}
}
//通过下面的命令将数据loadhbase
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile iptv:spark_test
//生成的HFile
[hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile
0 0 hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/_SUCCESS
20.3 K 40.6 K hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info
//数据load 进hbase后
[hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile
0 0 hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/_SUCCESS
0 0 hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info
//在hbase下的文件结构
[hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/hbase/data/iptv/spark_test
290 580 hdfs://ns1/hbase/data/iptv/spark_test/.tabledesc
0 0 hdfs://ns1/hbase/data/iptv/spark_test/.tmp
32.4 K 64.8 K hdfs://ns1/hbase/data/iptv/spark_test/047ff19285b4360a8cf82bdade12a465
//Hbase 数据情况
hbase(main):005:0> scan 'iptv:spark_test' ,{LIMIT=>5}
ROW COLUMN+CELL
200-\xE5\xB9\xBF\xE5\xB7\x9E column=cf_info:city_id, timestamp=1539496131471, value=200
200-\xE5\xB9\xBF\xE5\xB7\x9E column=cf_info:city_name, timestamp=1539496131471, value=\xE5\xB9\xBF\xE5\xB7\x9E
660-\xE6\xB1\x95\xE5\xB0\xBE column=cf_info:city_id, timestamp=1539496131471, value=660
660-\xE6\xB1\x95\xE5\xB0\xBE column=cf_info:city_name, timestamp=1539496131471, value=\xE6\xB1\x95\xE5\xB0\xBE
662-\xE9\x98\xB3\xE6\xB1\x9F column=cf_info:city_id, timestamp=1539496131471, value=662
662-\xE9\x98\xB3\xE6\xB1\x9F column=cf_info:city_name, timestamp=1539496131471, value=\xE9\x98\xB3\xE6\xB1\x9F
663-\xE6\x8F\xAD\xE9\x98\xB3 column=cf_info:city_id, timestamp=1539496131471, value=663
663-\xE6\x8F\xAD\xE9\x98\xB3 column=cf_info:city_name, timestamp=1539496131471, value=\xE6\x8F\xAD\xE9\x98\xB3
668-\xE8\x8C\x82\xE5\x90\x8D column=cf_info:city_id, timestamp=1539496131994, value=668
668-\xE8\x8C\x82\xE5\x90\x8D column=cf_info:city_name, timestamp=1539496131994, value=\xE8\x8C\x82\xE5\x90\x8D
5 row(s) in 0.0590 seconds
hbase(main):006:0>
至此,将数据通过HFile批量导入到Hbase的目的实现
通过生成HFilede的方式将load Hbase的方式速度是真的快,给人感觉是仅仅是移动了文件
刚接触Hbase 发现还是有点意思的