토픽의 메시지 값 또는 메시지 키는 직렬화하여 저장되어 있다. 만약 직렬화와 역직렬화 포맷이 다르면 어떻게 될까?
- test 토픽
- 직렬화 : StringStringSerializer
- 역직렬화 : UUIDDeserializer
의도적으로 직렬화 포맷과 역직렬화 포맷을 다르게 하여 테스트를 진행합니다.
1. 프로듀서
public class SimpleProducer {
private static String TOPIC_NAME = "test";
private static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = index + "";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
try {
producer.send(record, (recordMetadata, e) -> System.out.println(recordMetadata.toString()));
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
} catch (Exception e) {
System.out.println(e);
}
}
producer.flush();
producer.close();
}
}
2. 컨슈머
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-consumer-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class.getName());
KafkaConsumer<String, Integer> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, Integer> record : records) {
logger.info("{}",record);
}
}
}
}
결과
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-0 at offset 139. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error parsing data into UUID
Caused by: java.lang.IllegalArgumentException: Invalid UUID string: 0
at java.util.UUID.fromString(UUID.java:194)
at org.apache.kafka.common.serialization.UUIDDeserializer.deserialize(UUIDDeserializer.java:48)
at org.apache.kafka.common.serialization.UUIDDeserializer.deserialize(UUIDDeserializer.java:29)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at com.example.SimpleConsumer.main(SimpleConsumer.java:37)
Kafka에 속한 SerializationException이 발생했습니다. UUID 내부에서 IllegarArgumentException이 발생하는건 그럴듯하지만 왜 DeserializationException이 아니라 SerializationException이 발생한걸까요?
구글링해보니 내부적으로 해당 오류에 대해 논의중인것으로 보입니다.
issues.apache.org/jira/browse/KAFKA-4740
어쨋든! 포맷이 맞지 않아서 deserialization이 정상적으로 되지 않았을때, 장애가 발생할 수 있습니다.
흠. 그러면 어떻게 해소해야할까요,
try {
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, Integer> record : records) {
logger.info("{}", record);
}
}
} catch (SerializationException e) {
logger.info(e.getMessage(), e);
} finally {
consumer.close();
}
위와같이 SerializationException으로 구성했지만 polling시점에서 에러가 발생하기 때문에 사실상 추가로 할수있는 일 없이 consumer가 종료됩니다.
이와 같은 장애에서는 해당 컨슈머 그룹의 커슈머오프셋을 수동으로 옮겨주는수 밖에 없을것 같아요.
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
Consumer group 'test-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-consumer-group test 0 163 164 1 - - -
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-consumer-group --topic test:0 --reset-offsets --to-offset 164 --execute
GROUP TOPIC PARTITION NEW-OFFSET
test-consumer-group test 0 164
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
Consumer group 'test-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-consumer-group test 0 164 164 0 - - -
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 커넥터 빌드시 JDK11이 아닌 JDK8로 그래들 빌드해야합니다. (2) | 2020.11.02 |
---|---|
failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256 (0) | 2020.10.23 |
스프링 카프카 호환표 (1) | 2020.10.16 |
정말정말 간단한 스프링 카프카 컨슈머 애플리케이션 예제 (0) | 2020.10.13 |
Reactive를 품은 스프링 카프카 시청 정리 자료 (1) | 2020.09.26 |
카프카 커넥트 JMX + 로그스태시로 모니터링 하기 (0) | 2020.09.24 |