闪酷跑路网

一行行枯燥的代码,却描绘出人生的点点滴滴

您现在的位置是:首页>_云计算

基于Hive元数据储存和Avro序列化的MapReduce日志解析

发布时间:2018-11-14浏览(2691)

    基于Hive元数据存储和Avro序列化的MapReduce日志解析

    一,相关名词解释:

    Avro:apache提供的序列化框架

    序列化:在HDFS中,网络带宽成为一种稀缺资源,为了实现数据的高效传输,就需要将对象压缩成二进制流

    Avro VS others:

    Avro提供着与诸如Thrift和Protocol Buffers等系统相似的功能,但是在一些基础方面还是有区别的,主要是:

    1 动态类型:Avro并不需要生成代码,模式和数据存放在一起,而模式使得整个数据的处理过程并不生成代码、静态数据类型等等。这方便了数据处 理系统和语言的构造。
    2 未标记的数据:由于读取数据的时候模式是已知的,那么需要和数据一起编码的类型信息就很少了,这样序列化的规模也就小了。
    3 不需要用户指定字段号:即使模式改变,处理数据时新旧模式都是已知的,所以通过使用字段名称可以解决差异问题。 二,实现步骤
    1,定义一个avro文件,例如记录log中异常信息的ex.avsc
    {
    		"type": "record",
    		"name": "Ex",
    		"namespace": "com.tianrandai.test",
    		"fields": [
    			{"name":"AppKey","type":"string"},
    			{"name":"SessionId","type":"string"},									
    			{"name":"IsValidStart","type":"string"},						
    			{"name":"SerialNumber","type":"int"},									
    			{"name":"Strategy","type":"string"},			
    			{"name":"IsSessionStop","type":"boolean"},
    			{"name":"SessionDuration","type":"int"},
    			{"name":"ClientTime","type":"string"},
    			{"name":"isCaught","type":"boolean"},
    			{"name":"ExceptionName","type":"string"},
    			{"name":"CustomExceptionName","type":"string"},
    			{"name":"Stack","type":"string"}
    		]
       }
    2,建立exception对应的元数据库,因为压缩过程采用了avro序列化框架,我们使用hive时,采用avro
    下面演示如何在hive中创建一张动态加载schema的外部元数据表
    a,将预先编写好的avsc文件上传到hdfs
    hadoop fs -put ex.avsc /test
    b,使用hive语句创建表
    CREATE EXTERNAL TABLE Ex
        ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS
        INPUTFORMAT  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
        OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
        LOCATION 'hdfs:///test/ex'
        TBLPROPERTIES (
    	'avro.schema.url'='hdfs:///test/ex.avsc'
        );

    3,日志分析
    a,自定义输入文件过滤器
    public class InputFileFilter extends Configured implements PathFilter {
        private Configuration conf = null;
        private FileSystem fs;
    
        public Configuration getConf() {
            return conf;
        }
    
        public void setConf(Configuration conf) {
            this.conf = conf;
            if(conf != null){
                try {
                    fs= FileSystem.get(conf);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        @Override
        public boolean accept(Path path) {
            String regex = conf.get(ReaderUtils.INPUT_FILTER_REGEX);
            if(regex == null){
                return  true;
            }
            try {
                if(fs.isDirectory(path)){
                    return true;
                }else{
                    return  path.toString().matches(regex);
                }
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
        }
    }
    
    b,构建一个Pair
    Schema.Parser parser = new Schema.Parser();
        Schema exSchema = parser.parse(RecordBuilder.class.getResourceAsStream("/ex.avsc"))
        GenericRecord record = new GenericData.Record(exSchema);
        record.put("AppKey",common.getAppKey());
        record.put("IsValidStart",common.getIsValidStart());
        record.put("SerialNumber",common.getSerialNumber());
        record.put("SessionDuration",common.getSessionDuration());
        record.put("SessionId",common.getSessionId());
        record.put("Strategy",common.getStrategy());
        record.put("ClientTime",common.getClientTime());
        record.put("IsSessionStop",common.isSessionStop());
        record.put("DurTime",pageView.getDurTime());
        record.put("Tile",pageView.getTitle());
    
        Pair exPair = new ImmutablePair<String, GenericRecord>(ex,record);
    c,map过程
     AvroKey<String> outputKey = new AvroKey<String>();
         AvroValue<GenericRecord> outputValue = new AvroValue<GenericRecord>();
         outputKey.datum(pair.getKey());
         outputValue.datum(pair.getValue());
         context.write(outputKey,outputValue);
    d,reduce过程
    	private AvroMultipleOutputs avroMultipleOutputs;
        private AvroKey<GenericRecord> outputKey;
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            outputKey =new AvroKey<GenericRecord>();
            avroMultipleOutputs = new AvroMultipleOutputs(context);
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            avroMultipleOutputs.close();
        }
    
        @Override
        protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
           
            for(AvroValue<GenericRecord> value : values){
                outputKey.datum(value.datum());
                avroMultipleOutputs.write("ex",outputKey,NullWritable.get(),"ex"+"/"+"19000101");                        
            }
        }

    e,runner
    conf.setIfUnset(ReaderUtils.INPUT_FILTER_REGEX, ".*\\.log");
            conf.setBoolean("mapreduce.output.fileoutputformat.compress",true);
            conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
            conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
    
            Job job = Job.getInstance(conf, this.getClass().getName());
           ........
    
            job.setInputFormatClass(TextInputFormat.class);
    
            Schema union = Schema.createUnion(new ArrayList<Schema>(RecordBuilder.schemas.values()));
            AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));
            AvroJob.setMapOutputValueSchema(job,union);
    
            for(Map.Entry<String,Schema> entry : RecordBuilder.schemas.entrySet()){
                AvroMultipleOutputs.addNamedOutput(job, entry.getKey(), AvroKeyOutputFormat.class, entry.getValue(), null);
            }
    
            Path[] pathArr ;
            if(args[0].contains(",")){
                List<Path> listPath = new ArrayList<Path>();
                String [] paths = args[0].split(",");
                for(String path : paths){
                    listPath.add(new Path(path));
                }
                pathArr = listPath.toArray(new Path[listPath.size()]);
            }else {
                pathArr = new Path[]{new Path(args[0])};
            }
            Path outputPath = new Path(args[1]);
            outputPath.getFileSystem(conf).delete(outputPath,true);
            FileInputFormat.setInputPaths(job,pathArr);
            FileInputFormat.setInputDirRecursive(job,true);
            FileInputFormat.setInputPathFilter(job,InputFileFilter.class);
            FileOutputFormat.setOutputPath(job,outputPath);
            return job.waitForCompletion(true) ? 0 : 1;
        }

    3,将mr生成的文件导入hive
    LOAD DATA INPATH 'hdfs:///test/output/ex' OVERWRITE INTO TABLE Ex;