본문 바로가기

빅데이터/Kafka

[KAFKA]commitSync() 사용시 rebalance발동시 offset variable을 초기화 해야하는 이유?

아래는 oreilly의 Kafka: The Definitive Guide(카프카 핵심가이드)의 commitSync()와 rebalanceListener를 사용하여 topic을 consume하는 예시 코드이다.

 

url : https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener { 
    public void onPartitionsAssigned(Collection<TopicPartition>
        partitions) { 
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance. " +
            "Committing current offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets); 
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance()); 

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d,
                 customer = %s, country = %s\n",
                 record.topic(), record.partition(), record.offset(),
                 record.key(), record.value());
             currentOffsets.put(
                 new TopicPartition(record.topic(), record.partition()),
                 new OffsetAndMetadata(record.offset()+1, null));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // ignore, we're closing
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(currentOffsets);
    } finally {
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }
}

상기코드는 kafka에 저장된 특정 토픽의 record들을 poll()을 통해 가져오는 코드이며 아래와 같은 특징을 가진다.

 

- currentOffsets이름의 HashMap에 partition/offset data 저장

- auto.commit.enable = false

- commitSync()와 commitAsync()의 혼합사용

- rebalancing이 일어날때, partition이 revoke되었을때 commitSync() 호출

- WakeupException 혹은 Exception 발생했을 경우 마지막으로 commitSync() 호출 이후 close()

 

상기 구문은 보면 이슈가 없어보이지만 실제로 코드를 실행하였을 경우 아래와 같은 동작에서 lag이 이상하게 나타나는 현상을 확인할 수 있다.

 

이상현상이 일어나는 Example

1) 2개 partition으로 이루어진 1개 topic 준비

2) 1개 consumer(A process)로 해당 topic consume 시작

3) 또다른 1개 consumer(B process)생성, 2)과 동일 groupId로 동일 topic에 대해 consume 시작

4) topic rebalance 발동

5) A process는 1개 partition과 연결, B process는 1개 partition과 연결

6) A process, B process poll()을 통해 데이터 처리 시작

7) B process에 연결된 partition의 lag수치 증가. 그런데 lag이 0도 포함되어 있음

 

burrow로 측정한 offset, lag 특정 partition의 lag과 offset 수치가 이상하다

이상현상이 일어나는 이유

처음에는 왜 저런 현상이 일어나는지 몰랐다. 내가 개발한 consumer가 잘못되었거나 혹은 burrow의 데이터를 수집할때 이상이 있다고 추측했지만, burrow는 정상적으로 작동하고 있었고, consumer는 oreilly의 책에 있는 그대로 작성하였기 때문에 크게 틀릴 부분이 없다고 생각했다.

 

그러나 로그를 찍어보고 알게된 사실은 바로 offset을 저장하는 변수에 함정이 있었다.

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

상기 변수는 consumer가 poll()한 이후에 현재 offset에 대해 Key-Value로 저장한다.

아래는 Key-Value저장값의 예시이다.

{test-topic-0=OffsetAndMetadata{offset=82898884658, leaderEpoch=null, metadata='no metadata'},
test-topic-1=OffsetAndMetadata{offset=82900361267, leaderEpoch=null, metadata='no metadata'}}

- TopicPartiton : 토픽이름과 파티션 저장 class

- OffsetAndMetaData : offset과 metadata 저장 class

 

예시에서 언급한 A process는 기존에 0번과 1번 partition을 consume하고 있었다. 그러므로 currentOffsets variable은 2개의 partition과 offset에 대한 정보를 지니고 있었을 것이다.

 

Rebalancing이 일어난 이후 A process는 더이상 1번 partition에 대해 consume하고 있지 않지만 currentOffsets 변수를 초기화하지 않았기 때문에 계속해서 1번 partition에 대해 offset을 commit시도하였고 이로 인해 kafka내부에는 이상한 offset 기록이 존재했던 것이다.

 

해결방법

해결방법은 간단하다 rebalancing이 일어날 때 currentOffsets 변수를 초기화 하면 된다. onPartitionsRevoked method에서 commitSync()를 수행한 이후 currentOffsets 변수를 초기화 하면, 이후 offset commit시에는 해당 process(혹은 thread)가 consume하는 offset만 commit하게 된다.

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener { 
    public void onPartitionsAssigned(Collection<TopicPartition>
        partitions) { 
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance. " +
            "Committing current offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets); 
        
        currentOffsets.clear() // 추가 코드
    }
}