본문 바로가기

빅데이터/Kafka

토픽의 메시지 값을 직렬화/역직렬화가 정상적으로 이루어지지 않는 경우 테스트

토픽의 메시지 값 또는 메시지 키는 직렬화하여 저장되어 있다. 만약 직렬화와 역직렬화 포맷이 다르면 어떻게 될까?

 

- test 토픽

- 직렬화 : StringStringSerializer

- 역직렬화 : UUIDDeserializer

 

의도적으로 직렬화 포맷과 역직렬화 포맷을 다르게 하여 테스트를 진행합니다.

 

1. 프로듀서

public class SimpleProducer {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {


        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = index + "";
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
            try {
                producer.send(record, (recordMetadata, e) -> System.out.println(recordMetadata.toString()));

                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
            } catch (Exception e) {
                System.out.println(e);
            }
        }

        producer.flush();
        producer.close();
    }
}

 

2. 컨슈머

public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-consumer-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class.getName());

        KafkaConsumer<String, Integer> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, Integer> record : records) {
                logger.info("{}",record);
            }
        }
    }
}

결과

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-0 at offset 139. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error parsing data into UUID
Caused by: java.lang.IllegalArgumentException: Invalid UUID string: 0
	at java.util.UUID.fromString(UUID.java:194)
	at org.apache.kafka.common.serialization.UUIDDeserializer.deserialize(UUIDDeserializer.java:48)
	at org.apache.kafka.common.serialization.UUIDDeserializer.deserialize(UUIDDeserializer.java:29)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
	at com.example.SimpleConsumer.main(SimpleConsumer.java:37)

Kafka에 속한 SerializationException이 발생했습니다. UUID 내부에서 IllegarArgumentException이 발생하는건 그럴듯하지만 왜 DeserializationException이 아니라 SerializationException이 발생한걸까요?

구글링해보니 내부적으로 해당 오류에 대해 논의중인것으로 보입니다. 

issues.apache.org/jira/browse/KAFKA-4740

 

[KAFKA-4740] Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop - ASF JIRA

 

issues.apache.org

어쨋든! 포맷이 맞지 않아서 deserialization이 정상적으로 되지 않았을때, 장애가 발생할 수 있습니다.

흠. 그러면 어떻게 해소해야할까요,

try {
    while (true) {
        ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, Integer> record : records) {
            logger.info("{}", record);
        }
    }

} catch (SerializationException e) {
    logger.info(e.getMessage(), e);
} finally {
    consumer.close();
}

위와같이 SerializationException으로 구성했지만 polling시점에서 에러가 발생하기 때문에 사실상 추가로 할수있는 일 없이 consumer가 종료됩니다. 

 

이와 같은 장애에서는 해당 컨슈머 그룹의 커슈머오프셋을 수동으로 옮겨주는수 밖에 없을것 같아요.

$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group

Consumer group 'test-consumer-group' has no active members.

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-consumer-group test            0          163             164             1               -               -               -
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-consumer-group --topic test:0 --reset-offsets --to-offset 164 --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
test-consumer-group            test                           0          164
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group

Consumer group 'test-consumer-group' has no active members.

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-consumer-group test            0          164             164             0               -               -               -
반응형