https://books.google.co.kr/books?id=a3wzDwAAQBAJ&pg=PA77&lpg=PA77
참고 출처 - Kafka Definitive guide(카프카 핵심가이드) - O'reilly
Automatic commit
카프카에서는 enable.auto.commit=true 를 사용하여 offset의 commit을 일정간격으로 찍도록 설정할 수 있다.
- enable.auto.commit : commit 여부
- auto.commit.interval.ms : commit interval 시간
auto commit이 true이면 매번 폴링 즉, poll()을 호출할 때마다 commit할 시간이 되었는지 확인한다.
KafkaConsumer.poll()
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
....
....
KafkaConsumer.updateAssignmentMetadataIfNeeded()
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (!coordinator.poll(timer)) {
return false;
}
return updateFetchPositions(timer);
}
ConsumerCoordinator.poll()
public boolean poll(Timer timer) {
invokeCompletedOffsetCommitCallbacks();
...
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}
ConsumerCoordinator.maybeAutoCommitOffsetsAsync()
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}
ConsumerCoordinator의 maybeAutoCommitOffsetAsync method에서 autoCommitEnabled일 경우 timer가 expire되었는지 여부를 확인하고 시간이 되면 async로 topic의 aprtition offset을 commit한다.
Auto commit에서 중복발생
- enable.auto.commit = true
- auto.commit.interval.ms = 5000ms
상기와 같이 설정했을 경우 5초마다 poll이 호출되면 확인하여 commit이 수행된다.
아래와 같은 경우가 생길 수 있다.
1) poll()호출을 통해 record 100개 가져옴(→ 이때 offset commit)
2) record 100개 중 30개 처리 완료(ex. 데이터 저장완료)
3) 갑자기! 어떤 이유(topic partition개수 증가 혹은 consumer 개수 증감)로 rebalancing 시작
4) consumer들이 re-assign됨
5) consumer는 1)에서 commit된 offset부터 다시 데이터를 polling
6) 다시가져온 record를 처리 수행(중복발생)
위와 같은 발생을 제거하기 위해 poll() 다음에 close()를 호출하는 방법이 있을 수도 있다.
(close()를 호출하면 ConsumerCoorinator.maybeAutoCommitOffsetsSync()을 호출)
ConsumerCoordinator.maybeAutoCommitOffsetsSync()
private void maybeAutoCommitOffsetsSync(Timer timer) {
if (autoCommitEnabled) {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
try {
log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets);
if (!commitOffsetsSync(allConsumedOffsets, timer))
log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets);
} catch (WakeupException | InterruptException e) {
log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets);
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage());
}
}
}
그러나, polling roop에서 예외를 처리할때 주의해야한다.(중복이 발생할 수 있다.)
Automatic commits are convenient, but they don't give developers enough control to avoid duplicate messages.
- O'reilly
'빅데이터 > Kafka' 카테고리의 다른 글
KSQL - Docker을 사용한 KSQL server, cli 설치 및 실행 (0) | 2019.10.08 |
---|---|
[빅데이터]Kafka stream과 KSQL 소개 및 설명, 차이점 (3) | 2019.10.08 |
[KAFKA]commitSync() 사용시 rebalance발동시 offset variable을 초기화 해야하는 이유? (0) | 2019.09.30 |
enable.auto.commit 일 때 Kafka consumer close() method 동작분석 (0) | 2019.09.02 |
[confluent]Kafka에 대한 상식 퀴즈 14개 (0) | 2019.08.30 |
Kafka burrow http endpoint 정리 (276) | 2019.08.02 |