본문 바로가기

빅데이터/Kafka

Kafka consumer의 Automatic Commit은 중복이 생길 수 있다

https://books.google.co.kr/books?id=a3wzDwAAQBAJ&pg=PA77&lpg=PA77

 

Kafka: The Definitive Guide

Every enterprise application creates data, whether it’s log messages, metrics, user activity, outgoing messages, or something else. And how to move all of this data becomes nearly as important as the data itself. If you’re an application architect, develop

books.google.co.jp

참고 출처 - Kafka Definitive guide(카프카 핵심가이드) - O'reilly


Automatic commit

카프카에서는 enable.auto.commit=true 를 사용하여 offset의 commit을 일정간격으로 찍도록 설정할 수 있다.

- enable.auto.commit : commit 여부

- auto.commit.interval.ms : commit interval 시간

auto commit이 true이면 매번 폴링 즉, poll()을 호출할 때마다 commit할 시간이 되었는지 확인한다.

 

KafkaConsumer.poll()

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        // poll for new data until the timeout expires
        do {
            client.maybeTriggerWakeup();

            if (includeMetadataInTimeout) {
                if (!updateAssignmentMetadataIfNeeded(timer)) {
                    return ConsumerRecords.empty();
                }
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                    log.warn("Still waiting for metadata");
                }
            }
            ....
    ....

KafkaConsumer.updateAssignmentMetadataIfNeeded()

boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
    if (!coordinator.poll(timer)) {
        return false;
    }

    return updateFetchPositions(timer);
}

ConsumerCoordinator.poll()

public boolean poll(Timer timer) {
    invokeCompletedOffsetCommitCallbacks();

    ...

    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
    return true;
}

ConsumerCoordinator.maybeAutoCommitOffsetsAsync()

public void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        nextAutoCommitTimer.update(now);
        if (nextAutoCommitTimer.isExpired()) {
            nextAutoCommitTimer.reset(autoCommitIntervalMs);
            doAutoCommitOffsetsAsync();
        }
    }
}

ConsumerCoordinator의 maybeAutoCommitOffsetAsync method에서 autoCommitEnabled일 경우 timer가 expire되었는지 여부를 확인하고 시간이 되면 async로 topic의 aprtition offset을 commit한다.

Auto commit에서 중복발생

- enable.auto.commit = true

- auto.commit.interval.ms = 5000ms

상기와 같이 설정했을 경우 5초마다 poll이 호출되면 확인하여 commit이 수행된다.

 

아래와 같은 경우가 생길 수 있다.

 

1) poll()호출을 통해 record 100개 가져옴(→ 이때 offset commit)

2) record 100개 중 30개 처리 완료(ex. 데이터 저장완료)

3) 갑자기! 어떤 이유(topic partition개수 증가 혹은 consumer 개수 증감)로 rebalancing 시작

4) consumer들이 re-assign됨

5) consumer는 1)에서 commit된 offset부터 다시 데이터를 polling

6) 다시가져온 record를 처리 수행(중복발생)

 

위와 같은 발생을 제거하기 위해 poll() 다음에 close()를 호출하는 방법이 있을 수도 있다.

(close()를 호출하면 ConsumerCoorinator.maybeAutoCommitOffsetsSync()을 호출)

 

ConsumerCoordinator.maybeAutoCommitOffsetsSync()

private void maybeAutoCommitOffsetsSync(Timer timer) {
    if (autoCommitEnabled) {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
        try {
            log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets);
            if (!commitOffsetsSync(allConsumedOffsets, timer))
                log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets);
        } catch (WakeupException | InterruptException e) {
            log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets);
            // rethrow wakeups since they are triggered by the user
            throw e;
        } catch (Exception e) {
            // consistent with async auto-commit failures, we do not propagate the exception
            log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage());
        }
    }
}

그러나, polling roop에서 예외를 처리할때 주의해야한다.(중복이 발생할 수 있다.)

 

Automatic commits are convenient, but they don't give developers enough control to avoid duplicate messages.
- O'reilly