hbase bulkload 踩坑

最近天天在实验室捣鼓一个大数据的项目,前期虽说数据量不是很大,但也算是跑了一个11台机器的集群。在实施之前就预料到了肯定是艰难险阻。毕竟万事开头难,然后中间难,最后结尾难 😢

本文记录下在进行 hbase 导入的时候,采用的策略和遇到的坑。也不单单是 hbase 的缘故,管理一个集群上遇到数据量比较大的情况,各种问题都有,着实应接不暇。

背景 :

源数据是以 oracle 的分库分表的形式存放,加之数据量很大,每天都是百万级别的增量。搞定 oracle 的过程先痛苦的按下不表了。

需求 :

根据源数据,最终实现一个大数据下实时预测的功能。

进度 :

已经将 oracle 中的源数据导出到文本,进入 hdfs 中,用 mr 已经进行了数据的过滤和抽取分析,最后分析结果都已入库。暂且流程化分析不讲。

现在因为要进行快速验证,需要对源数据依据某些抽取条件进行快速查找,吞吐量要高。传统的关系型数据库似乎没有很好的分布式的方案,上手成本低一点,同时我们也不需要强事务,就直接采用 hbase 来作为数据仓库。因此就需要将源数据稍作处理导入进 hbase 中。

方案 :

  1. 直接采用 Put 的方式,是采用 mr 的方式单独用 map 来 put 进库 (小规模数据可用)

  2. 采用 hbase 自带的 ImportTsv 工具 (不适用于 rowkey 变化组合的情况)

  3. 采用 bulkload 的方式,先生成 HFile 再用 completeBulkload 导入到 hbase 中 (符合我所要 rowkey 自定义和大规模数据快速导入的需求,但是不能增量进库)

实施过程 :

First :

直接 hbase 建立表,采用 mr 的方式生成 HFile。代码均在 github 上

  1. 遇到问题
2018-01-02 19:19:10,475 WARN [main] org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/meta-region-server

2018-01-02 19:19:10,475 INFO [main] org.apache.hadoop.hbase.util.RetryCounter: Sleeping 8000ms before retry #3...

2018-01-02 19:19:11,475 INFO [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)

2018-01-02 19:19:11,475 WARN [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)

at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)

at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

解决方案 :增加 hbase 的配置

this.getConf().set("hbase.master", "master:60000");
this.getConf().set("hbase.zookeeper.quorum","slave1,slave2,slave3");
  1. 遇到问题
    Shuffle 过程中 Java Heap Space oom
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#2
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)
    at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46)
    at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:309)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:299)
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:514)
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:336)
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:193)

解决方案
shuffle 有部分需要占用map内存来进行,导致 oom,因此直接将 shuffle 到磁盘上做,降低shuffle占用container内存的比率

job.getConfiguration().setStrings("mapreduce.reduce.shuffle.input.buffer.percent", "0.1");
  1. 更改比率继续重新运行,遇到问题
Error: java.io.IOException: Spill failed
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.checkSpillException(MapTask.java:1562)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1085)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at io.github.zuston.ane.Ewb.EwbImporterMr$EwbImporterMapper.map(EwbImporterMr.java:63)
    at io.github.zuston.ane.Ewb.EwbImporterMr$EwbImporterMapper.map(EwbImporterMr.java:39)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_1514527612161_0048_m_000030_0_spill_5.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:402)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
    at org.apache.hadoop.mapred.YarnOutputFiles.getSpillFileForWrite(YarnOutputFiles.java:159)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1592)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$900(MapTask.java:876)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1532)

思考:
这是 Could not find any valid local directory , 猜测可能因为 map shuffle 很大数据到一台服务器上,磁盘空间不够了。回web ui上一看,只要一个 reduce 在运行。显然将数据量集中在一台上了,为什么只有一个 reduce 呢 ?
尝试:
直接设置了 job.setNumReduceTasks(10),然而并不起作用,在方法外又设置了一遍,发现再跑报错直接显示 split region dont match,方法设置前后导致起作用了,说明在中间有方法已经有预设了reduce数目,从dont match 上来看,reduce的数目与region数目是相关的。
代码

// bulk load
    public static Job HbaseQuickImportJobGnerator(Tool tool, Configuration configuration, String[] args, HTable table) throws IOException {
        configuration.set("hbase.master", "master:60000");
        configuration.set("hbase.zookeeper.quorum","slave1,slave2,slave3");
        Job job = new Job(configuration);

        job.setJarByClass(tool.getClass());
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        job.setSpeculativeExecution(false);
        job.setReduceSpeculativeExecution(false);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        table = new HTable(configuration, args[2]);
        HFileOutputFormat2.configureIncrementalLoad(job, table);

        return job;
    }
}

有必要探究下 bulkload 的机理:
主要方法就是HFileOutputFormat2,看下它的reduce数目是依据什么来判断的

public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
        configureIncrementalLoad(job, table, HFileOutputFormat2.class);
    }
static void configureIncrementalLoad(Job job, HTable table, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
        Configuration conf = job.getConfiguration();
        // ImmutableBytesWritable.class 必须指定为不可变的,即是 rowkey
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        // 也可以是 PUT 对象
        job.setOutputValueClass(KeyValue.class);
        job.setOutputFormatClass(cls);
        // 分 map 的输出类型进行 reduce sort, hbase 存储的顺序也是按照字典序排列,reduce 做的也就是输出有序文件。
        // 此处可以看到 map 输出类型分为 KeyValue, Put, Text 三种均可
        if(KeyValue.class.equals(job.getMapOutputValueClass())) {
            job.setReducerClass(KeyValueSortReducer.class);
        } else if(Put.class.equals(job.getMapOutputValueClass())) {
            job.setReducerClass(PutSortReducer.class);
        } else if(Text.class.equals(job.getMapOutputValueClass())) {
            job.setReducerClass(TextSortReducer.class);
        } else {
            LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
        }

        conf.setStrings("io.serializations", new String[]{conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()});
        LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
        // 获取 region 数目,startKeys 即为每个region的起始位置
        List startKeys = getRegionStartKeys(table);
        LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count");
        // 将 job reduce 数目设置为 region 数目
        job.setNumReduceTasks(startKeys.size());
       // 按照 hbase 的存储格式写入HDFS 中,具体看下面方法
        configurePartitioner(job, startKeys);
        configureCompression(table, conf);
        configureBloomType(table, conf);
        // 块大小匹配 hbase 的 块大小
        configureBlockSize(table, conf);
        // 编码格式设置为一致
        configureDataBlockEncoding(table, conf);
        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.initCredentials(job);
        LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured.");
    }

static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException {
        FileSystem fs = FileSystem.get(job.getConfiguration());
        //  hbase 已有的分区文件暂存在 hdfs 
        Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
        fs.makeQualified(partitionsPath);
        fs.deleteOnExit(partitionsPath);
        // 写入hdfs,根据分区点写。对于每个分区点采用的是 TreeSet,将所有的分区点 startKeys 排序
        writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
        // 设定 partitioner 为全局有序,分区点即为 hbase 的分 region 文件,这样 reduce 的时候输出文件都是有序的,符合 hbase 的设计
        job.setPartitionerClass(TotalOrderPartitioner.class);
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
    }

仅仅从代码中看,可以确定 reduce 数目是由 region 的数目而来的,可以解释一开始就是 1 个reduce的疑惑。并且刚开始建表确实是没有指定分region
进一步解决:那么扩大 reduce 的个数,将解决现有问题。那么就可以hbase 预分区来扩大

Second :

建表时候分区,分2个步骤

  1. 确定采样点
  2. 根据采样点均分,从而 region 确定各个 startKey 来建立表

采样的方法
方法有很多种,一般是确定采样数,或者是确定比例来采样。

  1. 确定采样数为 n,可以采用堆排序,给每条记录随机生成一个竞争值 KEY, 维持一个大根堆。(适合小数据量)
  2. 确定比例,就直接随机比例来采集样本数。(采样数据集大小不确定,只能估算)
    可以参考 : http://dongxicheng.org/data-mining/hadoop-sampling

根据分区点,建立好表之后,继续跑代码

遇到问题:

map reduce 过程中 reduce 一直被 KILL, 无法申请到资源,导致出现卡住的状况
job 状态出现 :ACCEPTED: waiting for AM container to be allocated, launched and register with RM

2018-01-08 02:32:22,816 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigning container Container: [ContainerId: container_1514527612161_0053_03_000018, NodeId: slave4:42105, NodeHttpAddress: slave4:8042, Resource: <memory:4096, vCores:1>, Priority: 5, Token: Token { kind: ContainerToken, service: 10.10.0.139:42105 }, ] to fast fail map
2018-01-08 02:32:22,816 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned from earlierFailedMaps
2018-01-08 02:32:22,816 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned container container_1514527612161_0053_03_000018 to attempt_1514527612161_0053_m_000013_2000
2018-01-08 02:32:22,816 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Recalculating schedule, headroom=<memory:2048, vCores:1>
2018-01-08 02:32:22,816 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: completedMapPercent 0.32871974 totalResourceLimit:<memory:30720, vCores:8> finalMapResourceLimit:<memory:20622, vCores:6> finalReduceResourceLimit:<memory:10098, vCores:2> netScheduledMapResource:<memory:794624, vCores:194> netScheduledReduceResource:<memory:8192, vCores:1>
2018-01-08 02:32:22,816 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: After Scheduling: PendingReds:6 ScheduledMaps:187 ScheduledReds:1 AssignedMaps:7 AssignedReds:0 CompletedMaps:95 CompletedReds:4 ContAlloc:17 ContRel:0 HostLocal:7 RackLocal:0
2018-01-08 02:32:22,817 INFO [AsyncDispatcher event handler] org.apache.hadoop.yarn.util.RackResolver: Resolved slave4 to /default-rack

2018-01-08 02:32:22,817 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1514527612161_0053_m_000013_2000 TaskAttempt Transitioned from UNASSIGNED to ASSIGNED
2018-01-08 02:32:22,817 INFO [ContainerLauncher #1] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Processing the event EventType: CONTAINER_REMOTE_LAUNCH for container container_1514527612161_0053_03_000018 taskAttempt attempt_1514527612161_0053_m_000013_2000
2018-01-08 02:32:22,817 INFO [ContainerLauncher #1] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Launching attempt_1514527612161_0053_m_000013_2000
2018-01-08 02:32:22,817 INFO [ContainerLauncher #1] org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: Opening proxy : slave4:42105
2018-01-08 02:32:22,825 INFO [ContainerLauncher #1] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Shuffle port returned by ContainerManager for attempt_1514527612161_0053_m_000013_2000 : 13562
2018-01-08 02:32:22,825 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: TaskAttempt: [attempt_1514527612161_0053_m_000013_2000] using containerId: [container_1514527612161_0053_03_000018 on NM: [slave4:42105]

部分 log 当时未留存,不是很全
查阅资料,看 log是推测 reduce和map互相竞争资源,互相 kill ,陷入死循环中,一直卡住

解决方案:
修改 reduce 启动时机,等待 map 全部完成之后再启动 reduce, 减小 reduce > copy 过程对于内存的占用

mapreduce.job.reduce.slowstart.completedmaps 设置为 1.0

参考资料 :
http://blog.csdn.net/lipeng_bigdata/article/details/51285687
https://www.cnblogs.com/yueweimian/p/4667888.html
https://community.hortonworks.com/questions/27334/mapreduce-job-hang-waiting-for-am-container-to-be.html

遇到问题:

修改参数过后又遇到问题,发现在将要成功之际,job完成度到 90%时候,reduce 全部被 kill 掉,又开始 map过程。

此时发现有一个节点挂掉了,同时 node 全部为 unhealty 状态

思考
在上述 HFileOutputFormat2 的代码中看到 reduce 过程要占用很大的磁盘空间,并且对全局排序,数据分布不均匀则导致单点执行时间过程。猜测是不是磁盘容量不够

解决方案

  1. 重启挂掉的节点
  2. 将节点的健康检查阈值降低,暂且挨过这一次处理

待补充

    原文作者:沙茶是个好名字
    原文地址: https://www.jianshu.com/p/fe9af752864f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞