본문 바로가기

빅데이터/Kafka

enable.auto.commit 일 때 Kafka consumer close() method 동작분석

Kafka의 consumer를 사용할 때 offset에 대한 commit offset timing에 대해 여러가지 방법이 있다.

만약 enable.auto.commit=true 로 사용시 consumer에서 close()를 호출한다면 어떻게 kafka 내부에서 offset을 처리하는지 확인해보려고 한다.

(아래는 kafka consumer 2.1.0 기준으로 작성되었습니다)

 

1. close() 호출

@Override
public void close() {
    close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
}

Kafka의 consumer가 close될 때 timeout 시간 내에 consumer를 종료한다.(default는 30초)

만약 auto-commit=true 된 상태라면 현재 consumer의 offset을 commit 한다.

 

2. close(Duration timeout) 호출

@Override
public void close(Duration timeout) {
    if (timeout.toMillis() < 0)
        throw new IllegalArgumentException("The timeout cannot be negative.");
    acquire();
    try {
        if (!closed) {
            closed = true;
            close(timeout.toMillis(), false);
        }
    } finally {
        release();
    }
}

3. close(long timeoutMs, boolean swallowException) 호출

private void close(long timeoutMs, boolean swallowException) {
    log.trace("Closing the Kafka consumer");
    AtomicReference<Throwable> firstException = new AtomicReference<>();
    try {
        if (coordinator != null)
            coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs)));
    } catch (Throwable t) {
        firstException.compareAndSet(null, t);
        log.error("Failed to close coordinator", t);
    }
	... 중략
}

Kafka consumer client에서 close를 요청의 마지막 단계로 ConsumerCoordinator의 close method를 호출한다.

 

4. ConsumerCoordinator.close(final Timer timer) 호출

public void close(final Timer timer) {
    // we do not need to re-enable wakeups since we are closing already
    client.disableWakeups();
    try {
        maybeAutoCommitOffsetsSync(timer); // auto commit offset에 대한 method
        while (pendingAsyncCommits.get() > 0 && timer.notExpired()) {
            ensureCoordinatorReady(timer);
            client.poll(timer);
            invokeCompletedOffsetCommitCallbacks();
        }
    } finally {
        super.close(timer);
    }
}

5. ConsumerCoordinator.maybeAutoCommitOffsetsSync(Timer timer) 호출

private void maybeAutoCommitOffsetsSync(Timer timer) {
    if (autoCommitEnabled) {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
        try {
            log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets);
            if (!commitOffsetsSync(allConsumedOffsets, timer))
                log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets);
        } catch (WakeupException | InterruptException e) {
            log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets);
            // rethrow wakeups since they are triggered by the user
            throw e;
        } catch (Exception e) {
            // consistent with async auto-commit failures, we do not propagate the exception
            log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage());
        }
    }
}

상기 method는 autoCommitEnabled의 분기문을 타는 오직 autoCommitEnabled를 위한 method임을 알 수 있다.

 

(1) SubscriptionState의 allConsumed method를 통해 consumer의 모든 파티션의 status에 대해 확인

(2) ConsumerCoordinator의 commitOffsetsSync method를 통해 offset의 commmit의 synchronously하게 진행한다.