hcatalog简介和使用

Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要Hive0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。

HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance

由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode

HiveMetaStoreClient.Java

[java]view plaincopy

print?

publicString getDelegationToken(String owner, String renewerKerberosPrincipalName)throws

MetaException, TException {

if(localMetaStore) {

thrownewUnsupportedOperationException(“getDelegationToken() can be “+

“called only in thrift (non local) mode”);

}

returnclient.get_delegation_token(owner, renewerKerberosPrincipalName);

}

HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表

HCatInputFormat API:

[java]view plaincopy

print?

publicstaticvoidsetInput(Job job,

InputJobInfo inputJobInfo)throwsIOException;

先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据

[java]view plaincopy

print?

publicstaticHCatSchema getTableSchema(JobContext context)

throwsIOException;

在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息

HCatOutputFormat API:

[java]view plaincopy

print?

publicstaticvoidsetOutput(Job job, OutputJobInfo outputJobInfo)throwsIOException;

OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values

比如要要写入指定的partition(dt=’2013-06-13′,country=’china’ ),可以这样写

[java]view plaincopy

print?

Map partitionValues =newHashMap();

partitionValues.put(“dt”,”2013-06-13″);

partitionValues.put(“country”,”china”);

HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);

HCatOutputFormat.setOutput(job, info);

[java]view plaincopy

print?

publicstaticHCatSchema getTableSchema(JobContext context)throwsIOException;

获取之前HCatOutputFormat.setOutput指定的table schema信息

[java]view plaincopy

print?

publicstaticvoidsetSchema(finalJob job,finalHCatSchema schema)throwsIOException;

设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息

下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中

[java]view plaincopy

print?

importjava.io.IOException;

importjava.util.Iterator;

importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.conf.Configured;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.io.WritableComparable;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.util.Tool;

importorg.apache.hadoop.util.ToolRunner;

importorg.apache.hcatalog.data.DefaultHCatRecord;

importorg.apache.hcatalog.data.HCatRecord;

importorg.apache.hcatalog.data.schema.HCatSchema;

importorg.apache.hcatalog.mapreduce.HCatInputFormat;

importorg.apache.hcatalog.mapreduce.HCatOutputFormat;

importorg.apache.hcatalog.mapreduce.InputJobInfo;

importorg.apache.hcatalog.mapreduce.OutputJobInfo;

publicclassGroupByGuidextendsConfiguredimplementsTool {

@SuppressWarnings(“rawtypes”)

publicstaticclassMapextends

Mapper {

HCatSchema schema;

Text guid;

IntWritable one;

@Override

protectedvoidsetup(org.apache.hadoop.mapreduce.Mapper.Context context)

throwsIOException, InterruptedException {

guid =newText();

one =newIntWritable(1);

schema = HCatInputFormat.getTableSchema(context);

}

@Override

protectedvoidmap(WritableComparable key, HCatRecord value,

Context context)throwsIOException, InterruptedException {

guid.set(value.getString(“guid”, schema));

context.write(guid, one);

}

}

@SuppressWarnings(“rawtypes”)

publicstaticclassReduceextends

Reducer {

HCatSchema schema;

@Override

protectedvoidsetup(org.apache.hadoop.mapreduce.Reducer.Context context)

throwsIOException, InterruptedException {

schema = HCatOutputFormat.getTableSchema(context);

}

@Override

protectedvoidreduce(Text key, Iterable values,

Context context)throwsIOException, InterruptedException {

intsum =0;

Iterator iter = values.iterator();

while(iter.hasNext()) {

sum++;

iter.next();

}

HCatRecord record =newDefaultHCatRecord(2);

record.setString(“guid”, schema, key.toString());

record.setInteger(“count”, schema, sum);

context.write(null, record);

}

}

@Override

publicintrun(String[] args)throwsException {

Configuration conf = getConf();

String dbname = args[0];

String inputTable = args[1];

String filter = args[2];

String outputTable = args[3];

intreduceNum = Integer.parseInt(args[4]);

Job job =newJob(conf,

“GroupByGuid, Calculating every guid’s pageview”);

HCatInputFormat.setInput(job,

InputJobInfo.create(dbname, inputTable, filter));

job.setJarByClass(GroupByGuid.class);

job.setInputFormatClass(HCatInputFormat.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(WritableComparable.class);

job.setOutputValueClass(DefaultHCatRecord.class);

job.setNumReduceTasks(reduceNum);

HCatOutputFormat.setOutput(job,

OutputJobInfo.create(dbname, outputTable,null));

HCatSchema s = HCatOutputFormat.getTableSchema(job);

HCatOutputFormat.setSchema(job, s);

job.setOutputFormatClass(HCatOutputFormat.class);

return(job.waitForCompletion(true) ?0:1);

}

publicstaticvoidmain(String[] args)throwsException {

intexitCode = ToolRunner.run(newGroupByGuid(), args);

System.exit(exitCode);

}

}

其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了

本文转自 http://blog.csdn.net/lalaguozhe/article/details/9083905

作者:yukangkk

    原文作者:本宝宝天然萌
    原文地址: https://www.jianshu.com/p/ffd9fb0e9223
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞