본문 바로가기

빅데이터

AvroFlumeEvent 포멧 java Decoding source

AvroFlumeEvent포멧은 아래와 같은 특징을 가진다.

- Header : Map<CharSequence, CharSequence>

- Body : ByteBuffer

 

AvroFlumeEvent포멧을 사용하기 위해서 필요한 dependency

dependencies {
    compile 'org.apache.flume:flume-ng-core:1.9.0'
}

DeserializeValue method : 

private static Event deserializeValue(byte[] value) throws IOException {
    Event e;
    DatumReader<AvroFlumeEvent> reader = new SpecificDatumReader<>(AvroFlumeEvent.class);
    ByteArrayInputStream in = new ByteArrayInputStream(value);
    BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
    AvroFlumeEvent event = reader.read(null, decoder);
    e = EventBuilder.withBody(event.getBody().array(), toStringMap(event.getHeaders()));
    return e;
}

private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
    Map<String, String> stringMap = new HashMap<String, String>();
    for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
        stringMap.put(entry.getKey().toString(), entry.getValue().toString());
    }
    return stringMap;
}

Use case :

Event event = deserializeValue(record.value());
Map<String, String> headers = event.getHeaders();
String body = event.getBody();

System.out.println("head : " + headers.toString());
System.out.println("body : " + new String(body));

Result : 

=============
head : {version=v1, table=click_test}
body : test_data
=============
head : {version=v1, table=log_test}
body : log_data
=============

태그