avro(hdfs关联hive)

简介

  • avro是一种固定格式(schema),以文件为单位的 数据序列化系统(类似加密解密)
  • 支持二进制序列化方式,所以可以快速处理大量数据
  • 支持对数据流,javabean等 序列化 反序列化操作,传输等效率高
  • 就是由一个定义好的schema来读取的二进制文本文件。

个人业务分析

  • 大量数据需要落成文件,存放到HDFS并支持 hive外表关联查询(指定目录)
  • 数据入口:kafka
  • 数据出口:hive业务库

分析

  • kafka 实时接收数据落每日本地文件
  • 定时上传每日本地文件到 hive外表目录下
  • hive 外部表 关联(指定schema)

注意事项

  • avro 文件格式 [https://www.jianshu.com/p/a5c0cbfbf608]
  • 一个avro文件 文件由 header 和多个data block 组成,header由 消息包和指定 schema 以及压缩方式等组成
  • 所有一个文件只有一个schema,当写入数据前,首先根据当前记录匹配 header 的schema,如果不符合schema时,则会抛异常
  • 所以,对于单个文件来言,所有数据必须具有相同的schema
  • 写的schema中fields的数量 > 大于读的schema中fields数量时,那么将被忽略
  • 读的schema中fields数量 > 写的schema中fields的数量时,如果无默认值,则报错

Schema格式

{
“type”: “record”,
“name”: “LongList”,
“aliases”: [“LinkedLongs”], // old name for this
“fields” : [
{“name”: “value”, “type”: “long”}, // each element has a long
{“name”: “next”, “type”: [“null”, “LongList”]} // optional next element
]
}

代码

//根据传入的json串生成 schema串
public Map buildSchema(String arg) throws Exception {
        String prefix = "{\"type\":\"record\",\"name\":\"" + table_name + "\",\"fields\":[";
        String suffix = "]}";
        if (table_name == null) {
            throw new Exception("配置文件读取失败");
        }
        Map<String, String> hm = new HashMap<String, String>();
        JSONObject js_ob;
        try {
            js_ob = JSONObject.fromObject(arg);
        } catch (Exception e) {
            throw new Exception("参数有误,检查是否为Json格式");
        }
        Iterator it = js_ob.keys();
        String schemaPj = "";
        StringBuffer sbf = new StringBuffer();
        String finalSbf = "";

        while (it.hasNext()) {
            String key = (String) it.next();
            String value = js_ob.get(key).toString();
            schemaPj = "{\"name\":\"" + key + "\",\"type\":\"string\",\"default\":\"\"},";
            sbf.append(schemaPj);
            hm.put(key, value);
        }
        finalSbf = prefix + sbf.toString().substring(0, sbf.length() - 1) + suffix;
        schema = Schema.parse(finalSbf);

        return hm;
    }
/**生成本地 avro 文件
* @param data 数据流入(json 串)
**/
public void buildFile(String data) throws IOException {
        if (data != null) {
            // 根據入參獲取 schema
            HashMap<String, String> hm = null;
            try {
                hm = (HashMap) buildSchema(data);
            } catch (Exception e1) {
                e1.printStackTrace();
            }

            FileSystem fs = null;
            FilterOutputStream ps = null;

            // 允許 append
            conf.setBoolean("dfs.support.append", true);
            DataFileWriter<Record> writer = new DataFileWriter<Record>(new GenericDatumWriter<Record>(schema));
            InputStream in = null;
            try {
                Record tab = new GenericData.Record(schema);
                String dt = "";
    // 业务数据处理 区域-(根据传入数据时间日期生成日期目录,对应hive外部表partition)
                for (String key : hm.keySet()) {
                    if ("time".equals(key.trim())) {
                        String val = hm.get(key);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
                        dt = sdf.format(new Date(Long.valueOf(val)));
                        tab.put(key, val);
                        continue;
                    }
                    tab.put(key, hm.get(key));
                }
    // -----------------------------------------------------------
                // 本地路径落 avro 定时任务把文件传到 hive外部表路径
                String uri = localPath + folderName + dt;

                File file = new File(uri);
                if (!file.exists()) {
                    file.mkdir();
                }
                String file_url = uri + "/" + file_name;
                File newFile = new File(file_url);
                // File newFile = new File(file_url_new);
                // 如果存在路径 append 不存在 则创建
                if (!newFile.exists()) {
                    newFile.createNewFile();
                    newFile.setWritable(true, false);
                    writer.create(schema, newFile);
                } else {
//文件append 操作
                    writer.appendTo(newFile);
                }
                    writer.append(tab);

            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                IOUtils.closeStream(in);
                writer.close();
            }
        }
    }

hive

//建立外部 schema
CREATE EXTERNAL TABLE avro_test1
ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.avro.AvroSerDe’
STORED AS
INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat’
LOCATION ‘/user/tmp’ (hdfs路径)
TBLPROPERTIES (
‘avro.schema.url’=’hdfs:///user/tmp/avsc/student.avsc’ (外部 schema文件)
);

hadoop fs -cat /user/tmp/avsc/student.avsc
{
“type”: “record”,
“name”: “student”,
“namespace”: “com.tiejia.avro”,
“fields”: [
{ “name”:”SID”, “type”:”string”,”default”:””},
{ “name”:”Name”, “type”:”string”,”default”:””},
{“name”:”Dept”, “type”: “string”,”default”:””},
{ “name”:”Phone”, “type”:”string”,”default”:””},
{“name”:”Age”, “type”: “string”,”default”:””},
{“name”:”Date”, “type”: “string”,”default”:””}
]}

ps:设置默认值,否则如果传入数据为 null 则报错

后记:由于传入数据schema 有七八种之多,原来使用的列式存储 Hbase,一种schema对应一个文件太过于麻烦,最后没有选型avro来实现,欢迎有想法的各位朋友、大神们指导下

点赞