본문 바로가기

빅데이터/Kafka

enable.auto.commit 일 때 Kafka consumer close() method 동작분석

Kafka의 consumer를 사용할 때 offset에 대한 commit offset timing에 대해 여러가지 방법이 있다.

만약 enable.auto.commit=true 로 사용시 consumer에서 close()를 호출한다면 어떻게 kafka 내부에서 offset을 처리하는지 확인해보려고 한다.

(아래는 kafka consumer 2.1.0 기준으로 작성되었습니다)

 

1. close() 호출

@Override
public void close() {
    close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
}

Kafka의 consumer가 close될 때 timeout 시간 내에 consumer를 종료한다.(default는 30초)

만약 auto-commit=true 된 상태라면 현재 consumer의 offset을 commit 한다.

 

2. close(Duration timeout) 호출

@Override
public void close(Duration timeout) {
    if (timeout.toMillis() < 0)
        throw new IllegalArgumentException("The timeout cannot be negative.");
    acquire();
    try {
        if (!closed) {
            closed = true;
            close(timeout.toMillis(), false);
        }
    } finally {
        release();
    }
}

3. close(long timeoutMs, boolean swallowException) 호출

private void close(long timeoutMs, boolean swallowException) {
    log.trace("Closing the Kafka consumer");
    AtomicReference<Throwable> firstException = new AtomicReference<>();
    try {
        if (coordinator != null)
            coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs)));
    } catch (Throwable t) {
        firstException.compareAndSet(null, t);
        log.error("Failed to close coordinator", t);
    }
	... 중략
}

Kafka consumer client에서 close를 요청의 마지막 단계로 ConsumerCoordinator의 close method를 호출한다.

 

4. ConsumerCoordinator.close(final Timer timer) 호출

public void close(final Timer timer) {
    // we do not need to re-enable wakeups since we are closing already
    client.disableWakeups();
    try {
        maybeAutoCommitOffsetsSync(timer); // auto commit offset에 대한 method
        while (pendingAsyncCommits.get() > 0 && timer.notExpired()) {
            ensureCoordinatorReady(timer);
            client.poll(timer);
            invokeCompletedOffsetCommitCallbacks();
        }
    } finally {
        super.close(timer);
    }
}

5. ConsumerCoordinator.maybeAutoCommitOffsetsSync(Timer timer) 호출

private void maybeAutoCommitOffsetsSync(Timer timer) {
    if (autoCommitEnabled) {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
        try {
            log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets);
            if (!commitOffsetsSync(allConsumedOffsets, timer))
                log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets);
        } catch (WakeupException | InterruptException e) {
            log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets);
            // rethrow wakeups since they are triggered by the user
            throw e;
        } catch (Exception e) {
            // consistent with async auto-commit failures, we do not propagate the exception
            log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage());
        }
    }
}

상기 method는 autoCommitEnabled의 분기문을 타는 오직 autoCommitEnabled를 위한 method임을 알 수 있다.

 

(1) SubscriptionState의 allConsumed method를 통해 consumer의 모든 파티션의 status에 대해 확인

(2) ConsumerCoordinator의 commitOffsetsSync method를 통해 offset의 commmit의 synchronously하게 진행한다.

 

위 내용은 document에도 나와있다.

Tries to close the consumer cleanly within the specified timeout. This method waits up to timeout for the consumer to complete pending commits and leave the group. If auto-commit is enabled, this will commit the current offsets if possible within the timeout. If the consumer is unable to complete offset commits and gracefully leave the group before the timeout expires, the consumer is force closed. Note that wakeup() cannot be used to interrupt close.