일반적으로 사용되는 아파치 카프카는 이벤트 스트림으로 파티션단위로 데이터를 처리하기 때문에 컨슈머 개수를 파티션 개수만큼 실행시켜 운영하는 것이 일반적이다. Queues for Kafka는 이와 다르게 파티션 개수보다 더 많은 컨슈머를 운영하기 위한 기능이다.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
KIP-932: Queues for Kafka - Apache Kafka - Apache Software Foundation
Status Current state: Accepted Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq JIRA: https://issues.apache.org/jira/browse/KAFKA-16092 Please keep the discussion on the mailing list rather than commenting on the wiki (
cwiki.apache.org
Queues for Kafka (KIP-932) - Preview Release Notes - Apache Kafka - Apache Software Foundation
Apache Kafka 4.1.0 is shipped with a preview release of KIP-932: Queues for Kafka. This feature is not yet recommended for use on production clusters, but it is ready for evaluation and testing. This KIP introduces the concept of a share group as a way of
cwiki.apache.org
4.1.0이 릴리즈되면서 Queues for Kafka는 이제 early access에서 preview로 바뀌었고, 상용환경에서도 위험을 감수하고 사용할만큼 보완되었다.
KafkaShareConsumer 사용 방법
1) cluster 실행 및 설정
version: "3.8"
services:
kafka:
image: apache/kafka:4.1.0
container_name: kafka41
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9091,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://127.0.0.1:9092,EXTERNAL://host.docker.internal:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS: 1
# Share Coordinator 내부 상태 토픽 생성 조건 완화
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: "1"
상기와 같이 설정하고 docker-compose up 으로 실행
$ ./kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature share.version=1
share.version was upgraded to 1.
실행중인 브로커에 피쳐를 on 하기 위해 상기 명령어를 실행한다.
위 스크립트를 실행하지 않으면, 컨슈머에서 다음과 같은 에러가 발생한다.
Exception in thread "main" org.apache.kafka.common.errors.UnsupportedVersionException: The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled.
2) consumer code
2-1) ImplicitAck mode
package org.example;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class Main {
private final static Logger log = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group"); // 공유 그룹 ID
KafkaShareConsumer<String, String> consumer =
new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(List.of("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
records.forEach(record -> {
log.info("Processed: " + record.value());
});
}
}
}
별도로 모드를 지정하지 않고 KafkaShareConsumer를 사용할 경우 암시적인 ack 모드로 사용된다. poll()로 가져온 배치는 자동으로 성공된것으로 간주되어 ACK가 된다.
[main] INFO org.apache.kafka.common.config.AbstractConfig - ShareConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
group.id = test-group
share.acknowledgement.mode = implicit // 기본으로 설정된 mode
... 생략
[main] WARN org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator - Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production.
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 4.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 13f70256db3c994c
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1757651562780
[main] INFO org.apache.kafka.clients.consumer.internals.ShareConsumerImpl - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Subscribed to topics: test
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Member Hq3kzUZhQ4mt3l6afzGrVg with epoch 0 transitioned from UNSUBSCRIBED to JOINING.
[consumer_background_thread] INFO org.apache.kafka.clients.Metadata - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Cluster ID: 5L6g3nShT-eMCtK--X86sw
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Discovered group coordinator Coordinator(key='test-group', nodeId=1, host='127.0.0.1', port=9092, errorCode=0, errorMessage='')
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Member Hq3kzUZhQ4mt3l6afzGrVg with epoch 10 transitioned from JOINING to RECONCILING.
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Reconciling assignment with local epoch 0
Member: Hq3kzUZhQ4mt3l6afzGrVg
Assigned partitions: [test-0]
Current owned partitions: []
Added partitions (assigned - owned): [test-0]
Revoked partitions (owned - assigned): []
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Member Hq3kzUZhQ4mt3l6afzGrVg with epoch 10 transitioned from RECONCILING to ACKNOWLEDGING.
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Member Hq3kzUZhQ4mt3l6afzGrVg with epoch 10 transitioned from ACKNOWLEDGING to STABLE.
2-2) ExplicitAck mode
package org.example;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class Main {
private final static Logger log = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group"); // 공유 그룹 ID
props.put("share.acknowledgement.mode", "explicit"); // ACK 모드
KafkaShareConsumer<String, String> consumer =
new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(List.of("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
records.forEach(record -> {
try {
doProcessing(record.value());
// 성공적으로 처리 → ACK
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (RetryableException e) {
// 재시도 가능한 오류 → RELEASE
consumer.acknowledge(record, AcknowledgeType.RELEASE);
} catch (Exception e) {
// 복구 불가능 오류 → REJECT
consumer.acknowledge(record, AcknowledgeType.REJECT);
}
});
// 메시지 단위 ACK을 브로커에 커밋
consumer.commitSync();
}
}
private static void doProcessing(String task) throws Exception {
// 예: 이미지 처리, API 호출 등
if (task.contains("fail")) {
throw new RetryableException("Temporary failure");
}
log.info("Processed: " + task);
}
static class RetryableException extends Exception {
public RetryableException(String msg) {
super(msg);
}
}
}
명시적 모드인 경우 commit을 수행해야 한다. AcknowledgeType은 총 3가지가 있으며 각 특징은 다음과 같다.
/**
* The acknowledge type is used with {@link KafkaShareConsumer#acknowledge(ConsumerRecord, AcknowledgeType)} to indicate
* whether the record was consumed successfully.
*/
@InterfaceStability.Evolving
public enum AcknowledgeType {
/** The record was consumed successfully. */
ACCEPT((byte) 1),
/** The record was not consumed successfully. Release it for another delivery attempt. */
RELEASE((byte) 2),
/** The record was not consumed successfully. Reject it and do not release it for another delivery attempt. */
REJECT((byte) 3);
AcknowledgeType | 의미 | 사용 상황 | 브로커 동작 |
ACCEPT | 성공적으로 처리됨 | 정상 처리 완료 | Archived(완료)로 이동, 재전달 없음 |
RELEASE | 일시적 실패, 재시도 필요 | 네트워크/DB 오류, 일시적 장애 | 다시 Available 상태로 이동, deliveryCount +1, 동일 컨슈머 또는 다른 컨슈머가 재시도 |
REJECT | 처리 불가능한 오류 | 포이즌 메시지, 스키마 불일치 | 즉시 Archived 상태, 재전달 없음 |
deliveryCount란?
Share Group(KIP-932)에서 각 레코드가 컨슈머에게 획득(acquire) 될 때마다 증가하는 카운터. 즉, 이 메시지가 몇 번 재시도되었는가를 추적하는 값임. 메시지가 무한정 재시도되는 것을 막기 위함. 잘못된 메시지(포이즌 레코드)가 무한 루프에 빠지지 않도록 최대 횟수 제한을 둠. group.share.delivery.count.limit(기본값 5) 설정에 의해 limit 제어됨. 기본값(5)이라면 한 레코드는 최대 5회까지 poll → acquire 될 수 있음. 6번째부터는 더 이상 Available로 돌아가지 않고 Archived(폐기)상태로 전환됨.
┌──────────────────────────────────────────────────────────────────────┐
│ Share Group State Machine │
└──────────────────────────────────────────────────────────────────────┘
[Produced]
│ (레코드가 토픽에 기록됨)
▼
[Available]
│ poll() 시 컨슈머가 획득(acquire)
▼
[Acquired / In-Flight] ── 처리 시도 (process)
│
├─ acknowledge(..., ACCEPT)
│ │
│ ▼
│ [Acknowledged] ──(commitSync/Async)──► __share_group_state 반영
│ (완료 상태. 재전달되지 않음)
│
├─ acknowledge(..., RELEASE)
│ │
│ ├─ deliveryCount < maxDeliveries
│ │ │
│ │ ▼
│ │ [Available] 로 복귀
│ │ (같은 컨슈머 또는 다른 컨슈머가 재시도 가능,
│ │ deliveryCount += 1)
│ │
│ └─ deliveryCount ≥ maxDeliveries
│ │
│ ▼
│ [Archived] (더 이상 배달하지 않음)
│
├─ acknowledge(..., REJECT)
│ │
│ ▼
│ [Archived] (즉시 보관/차단. 재전달 없음)
│ (보통 DLQ로 복사 후 REJECT)
│
└─ (명시적 ACK 없이 루프 종료)
│
├─ share.acknowledgement.mode = explicit
│ → ACK 안 한 레코드들은 같은 acquisition으로 재제공
│ (다음 poll에서 다시 처리, deliveryCount 증가 없음)
│
└─ share.acknowledgement.mode = implicit
→ 다음 poll() 시 이전 배치가 암시적 ACK 처리됨
(실패 레코드도 성공으로 간주될 수 있으므로 주의)
[Archived] / [Acknowledged]
└─ 단말 상태(terminal). 컨슈머에게 더 이상 전달되지 않음.
5개 인스턴스(컨슈머) 실행해보기
- test 토픽, 파티션0개
- intellij에서 multi instance run
쉘스크립트로 상태 확인하기
$ ./kafka-groups.sh --bootstrap-server localhost:9092 --list
GROUP TYPE PROTOCOL
test-group Share share
$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --list
test-group
$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP TOPIC PARTITION START-OFFSET
test-group test 0 13
$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --members
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
test-group 531rie8GTeGXgr4mT0jViw /192.168.65.1 consumer-test-group-1 1 test:0
test-group GMGiDkymRLqmH2sA_Py4yw /192.168.65.1 consumer-test-group-1 1 test:0
test-group Wq1DHTFSRQWTH5m1-AA_-A /192.168.65.1 consumer-test-group-1 1 test:0
test-group WyCVBVNBQsyeAFHMeYJCKw /192.168.65.1 consumer-test-group-1 1 test:0
test-group aaeOrPSiQC-r69sdvoIsOQ /192.168.65.1 consumer-test-group-1 1 test:0
$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --state
GROUP COORDINATOR (ID) STATE #MEMBERS
test-group 127.0.0.1:9092 (1) Stable 5
토픽의 파티션 개수가 변경되면 어떻게 될까?
$ ./kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic test --partitions 2
$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP TOPIC PARTITION START-OFFSET
test-group test 0 13
test-group test 1 -
$ bin ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --members
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
test-group 531rie8GTeGXgr4mT0jViw /192.168.65.1 consumer-test-group-1 1 test:0
test-group GMGiDkymRLqmH2sA_Py4yw /192.168.65.1 consumer-test-group-1 1 test:1
test-group Wq1DHTFSRQWTH5m1-AA_-A /192.168.65.1 consumer-test-group-1 1 test:1
test-group WyCVBVNBQsyeAFHMeYJCKw /192.168.65.1 consumer-test-group-1 1 test:0
test-group aaeOrPSiQC-r69sdvoIsOQ /192.168.65.1 consumer-test-group-1 1 test:0
kafka-share-groups.sh에 partitions는 해당 컨슈머가 점유중인 파티션 개수를 뜻함. 그리고 assignment는 해당 컨슈머가 할당된 토픽+파티션의 조합을 나타냄. kafkaShareConsumer인 경우 assignment는 기존 방식과 다름. Share Coordinator가 가용상태에 따라 동적으로 정함.
'빅데이터 > Kafka' 카테고리의 다른 글
Apache Kafka 4.1.0 docker-compose.yaml 실행 (0) | 2025.09.12 |
---|---|
윈도우즈 wsl2 환경에서 로컬 카프카 브로커 연동하기 (0) | 2025.05.25 |
Sent auto-creation request for Set(__consumer_offsets) to the active controller 에러 해결 방법 (0) | 2025.04.18 |
카프카4.0 부터는 eager rebalancing protocol이 삭제됩니다. (0) | 2025.02.28 |
[local macOS 환경] apache kafka(3.5.0기준) + redpanda/console 로 편하게 테스트 하기 (0) | 2025.02.26 |
카프카에서 데이터 삭제는 어떻게 이루어 지는가> (0) | 2025.02.13 |