본문 바로가기

빅데이터/Kafka

KIP-932 Queues for Kafka 사용해보기(KafkaShareConsumer)

일반적으로 사용되는 아파치 카프카는 이벤트 스트림으로 파티션단위로 데이터를 처리하기 때문에 컨슈머 개수를 파티션 개수만큼 실행시켜 운영하는 것이 일반적이다. Queues for Kafka는 이와 다르게 파티션 개수보다 더 많은 컨슈머를 운영하기 위한 기능이다.

 

https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka

 

KIP-932: Queues for Kafka - Apache Kafka - Apache Software Foundation

Status Current state: Accepted Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq JIRA: https://issues.apache.org/jira/browse/KAFKA-16092 Please keep the discussion on the mailing list rather than commenting on the wiki (

cwiki.apache.org

https://cwiki.apache.org/confluence/display/KAFKA/Queues+for+Kafka+%28KIP-932%29+-+Preview+Release+Notes

 

Queues for Kafka (KIP-932) - Preview Release Notes - Apache Kafka - Apache Software Foundation

Apache Kafka 4.1.0 is shipped with a preview release of KIP-932: Queues for Kafka. This feature is not yet recommended for use on production clusters, but it is ready for evaluation and testing. This KIP introduces the concept of a share group as a way of

cwiki.apache.org

 

4.1.0이 릴리즈되면서 Queues for Kafka는 이제 early access에서 preview로 바뀌었고, 상용환경에서도 위험을 감수하고 사용할만큼 보완되었다. 

 

KafkaShareConsumer 사용 방법

1) cluster 실행 및 설정

version: "3.8"

services:
  kafka:
    image: apache/kafka:4.1.0
    container_name: kafka41
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9091,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://127.0.0.1:9092,EXTERNAL://host.docker.internal:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS: 1

      # Share Coordinator 내부 상태 토픽 생성 조건 완화
      KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: "1"

상기와 같이 설정하고 docker-compose up 으로 실행

$ ./kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature share.version=1
share.version was upgraded to 1.

실행중인 브로커에 피쳐를 on 하기 위해 상기 명령어를 실행한다.

 

위 스크립트를 실행하지 않으면, 컨슈머에서 다음과 같은 에러가 발생한다.

Exception in thread "main" org.apache.kafka.common.errors.UnsupportedVersionException: The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled.

2) consumer code

2-1) ImplicitAck mode

package org.example;

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class Main {
    private final static Logger log = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group"); // 공유 그룹 ID

        KafkaShareConsumer<String, String> consumer =
                new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());

        consumer.subscribe(List.of("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            records.forEach(record -> {
                log.info("Processed: " + record.value());
            });
        }
    }
}

별도로 모드를 지정하지 않고 KafkaShareConsumer를 사용할 경우 암시적인 ack 모드로 사용된다.  poll()로 가져온 배치는 자동으로 성공된것으로 간주되어 ACK가 된다.

[main] INFO org.apache.kafka.common.config.AbstractConfig - ShareConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9092]
	group.id = test-group
	share.acknowledgement.mode = implicit  // 기본으로 설정된 mode 
    
    ... 생략
[main] WARN org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator - Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production.
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 4.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 13f70256db3c994c
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1757651562780
[main] INFO org.apache.kafka.clients.consumer.internals.ShareConsumerImpl - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Subscribed to topics: test
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Member Hq3kzUZhQ4mt3l6afzGrVg with epoch 0 transitioned from UNSUBSCRIBED to JOINING.
[consumer_background_thread] INFO org.apache.kafka.clients.Metadata - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Cluster ID: 5L6g3nShT-eMCtK--X86sw
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Discovered group coordinator Coordinator(key='test-group', nodeId=1, host='127.0.0.1', port=9092, errorCode=0, errorMessage='')
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Member Hq3kzUZhQ4mt3l6afzGrVg with epoch 10 transitioned from JOINING to RECONCILING.
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Reconciling assignment with local epoch 0
	Member:                                    Hq3kzUZhQ4mt3l6afzGrVg
	Assigned partitions:                       [test-0]
	Current owned partitions:                  []
	Added partitions (assigned - owned):       [test-0]
	Revoked partitions (owned - assigned):     []

[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Member Hq3kzUZhQ4mt3l6afzGrVg with epoch 10 transitioned from RECONCILING to ACKNOWLEDGING.
[consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager - [ShareConsumer clientId=consumer-test-group-1, groupId=test-group] Member Hq3kzUZhQ4mt3l6afzGrVg with epoch 10 transitioned from ACKNOWLEDGING to STABLE.

 

2-2) ExplicitAck mode

package org.example;

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class Main {
    private final static Logger log = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group"); // 공유 그룹 ID
        props.put("share.acknowledgement.mode", "explicit"); // ACK 모드

        KafkaShareConsumer<String, String> consumer =
                new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());

        consumer.subscribe(List.of("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            records.forEach(record -> {
                try {
                    doProcessing(record.value());

                    // 성공적으로 처리 → ACK
                    consumer.acknowledge(record, AcknowledgeType.ACCEPT);

                } catch (RetryableException e) {
                    // 재시도 가능한 오류 → RELEASE
                    consumer.acknowledge(record, AcknowledgeType.RELEASE);

                } catch (Exception e) {
                    // 복구 불가능 오류 → REJECT
                    consumer.acknowledge(record, AcknowledgeType.REJECT);
                }
            });

            // 메시지 단위 ACK을 브로커에 커밋
            consumer.commitSync();
        }
    }

    private static void doProcessing(String task) throws Exception {
        // 예: 이미지 처리, API 호출 등
        if (task.contains("fail")) {
            throw new RetryableException("Temporary failure");
        }
        log.info("Processed: " + task);
    }

    static class RetryableException extends Exception {
        public RetryableException(String msg) {
            super(msg);
        }
    }
}

 

명시적 모드인 경우 commit을 수행해야 한다. AcknowledgeType은 총 3가지가 있으며 각 특징은 다음과 같다.

/**
 * The acknowledge type is used with {@link KafkaShareConsumer#acknowledge(ConsumerRecord, AcknowledgeType)} to indicate
 * whether the record was consumed successfully.
 */
@InterfaceStability.Evolving
public enum AcknowledgeType {
    /** The record was consumed successfully. */
    ACCEPT((byte) 1),

    /** The record was not consumed successfully. Release it for another delivery attempt. */
    RELEASE((byte) 2),

    /** The record was not consumed successfully. Reject it and do not release it for another delivery attempt. */
    REJECT((byte) 3);

 

AcknowledgeType 의미 사용 상황 브로커 동작
ACCEPT 성공적으로 처리됨 정상 처리 완료 Archived(완료)로 이동, 재전달 없음
RELEASE 일시적 실패, 재시도 필요 네트워크/DB 오류, 일시적 장애 다시 Available 상태로 이동, deliveryCount +1, 동일 컨슈머 또는 다른 컨슈머가 재시도
REJECT 처리 불가능한 오류 포이즌 메시지, 스키마 불일치 즉시 Archived 상태, 재전달 없음

 

deliveryCount란?

Share Group(KIP-932)에서 각 레코드가 컨슈머에게 획득(acquire) 될 때마다 증가하는 카운터. 즉, 이 메시지가 몇 번 재시도되었는가를 추적하는 값임. 메시지가 무한정 재시도되는 것을 막기 위함. 잘못된 메시지(포이즌 레코드)가 무한 루프에 빠지지 않도록 최대 횟수 제한을 둠. group.share.delivery.count.limit(기본값 5) 설정에 의해 limit 제어됨. 기본값(5)이라면 한 레코드는 최대 5회까지 poll → acquire 될 수 있음. 6번째부터는 더 이상 Available로 돌아가지 않고 Archived(폐기)상태로 전환됨.

┌──────────────────────────────────────────────────────────────────────┐
│                          Share Group State Machine                   │
└──────────────────────────────────────────────────────────────────────┘

[Produced]
    │  (레코드가 토픽에 기록됨)
    ▼
[Available]
    │  poll() 시 컨슈머가 획득(acquire)
    ▼
[Acquired / In-Flight]  ── 처리 시도 (process)
    │
    ├─ acknowledge(..., ACCEPT)
    │        │
    │        ▼
    │   [Acknowledged]  ──(commitSync/Async)──► __share_group_state 반영
    │                      (완료 상태. 재전달되지 않음)
    │
    ├─ acknowledge(..., RELEASE)
    │        │
    │        ├─ deliveryCount < maxDeliveries
    │        │        │
    │        │        ▼
    │        │   [Available] 로 복귀
    │        │   (같은 컨슈머 또는 다른 컨슈머가 재시도 가능,
    │        │    deliveryCount += 1)
    │        │
    │        └─ deliveryCount ≥ maxDeliveries
    │                 │
    │                 ▼
    │            [Archived]  (더 이상 배달하지 않음)
    │
    ├─ acknowledge(..., REJECT)
    │        │
    │        ▼
    │   [Archived]  (즉시 보관/차단. 재전달 없음)
    │   (보통 DLQ로 복사 후 REJECT)
    │
    └─ (명시적 ACK 없이 루프 종료)
             │
             ├─ share.acknowledgement.mode = explicit
             │     → ACK 안 한 레코드들은 같은 acquisition으로 재제공
             │       (다음 poll에서 다시 처리, deliveryCount 증가 없음)
             │
             └─ share.acknowledgement.mode = implicit
                   → 다음 poll() 시 이전 배치가 암시적 ACK 처리됨
                     (실패 레코드도 성공으로 간주될 수 있으므로 주의)

[Archived] / [Acknowledged]
    └─ 단말 상태(terminal). 컨슈머에게 더 이상 전달되지 않음.

 

5개 인스턴스(컨슈머) 실행해보기

- test 토픽, 파티션0개

- intellij에서 multi instance run

 

쉘스크립트로 상태 확인하기

$ ./kafka-groups.sh --bootstrap-server localhost:9092 --list
GROUP                TYPE                 PROTOCOL
test-group           Share                share

$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --list
test-group

$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP           TOPIC           PARTITION  START-OFFSET
test-group      test            0          13

$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --members
GROUP           CONSUMER-ID            HOST            CLIENT-ID             #PARTITIONS  ASSIGNMENT
test-group      531rie8GTeGXgr4mT0jViw /192.168.65.1   consumer-test-group-1 1            test:0
test-group      GMGiDkymRLqmH2sA_Py4yw /192.168.65.1   consumer-test-group-1 1            test:0
test-group      Wq1DHTFSRQWTH5m1-AA_-A /192.168.65.1   consumer-test-group-1 1            test:0
test-group      WyCVBVNBQsyeAFHMeYJCKw /192.168.65.1   consumer-test-group-1 1            test:0
test-group      aaeOrPSiQC-r69sdvoIsOQ /192.168.65.1   consumer-test-group-1 1            test:0

$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --state

GROUP           COORDINATOR (ID)          STATE           #MEMBERS
test-group      127.0.0.1:9092  (1)       Stable          5

 

토픽의 파티션 개수가 변경되면 어떻게 될까?

$ ./kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter --topic test --partitions 2

$ ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP           TOPIC           PARTITION  START-OFFSET
test-group      test            0          13
test-group      test            1          -

$ bin ./kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --members
GROUP           CONSUMER-ID            HOST            CLIENT-ID             #PARTITIONS  ASSIGNMENT
test-group      531rie8GTeGXgr4mT0jViw /192.168.65.1   consumer-test-group-1 1            test:0
test-group      GMGiDkymRLqmH2sA_Py4yw /192.168.65.1   consumer-test-group-1 1            test:1
test-group      Wq1DHTFSRQWTH5m1-AA_-A /192.168.65.1   consumer-test-group-1 1            test:1
test-group      WyCVBVNBQsyeAFHMeYJCKw /192.168.65.1   consumer-test-group-1 1            test:0
test-group      aaeOrPSiQC-r69sdvoIsOQ /192.168.65.1   consumer-test-group-1 1            test:0

 

kafka-share-groups.sh에 partitions는 해당 컨슈머가 점유중인 파티션 개수를 뜻함. 그리고 assignment는 해당 컨슈머가 할당된 토픽+파티션의 조합을 나타냄. kafkaShareConsumer인 경우 assignment는 기존 방식과 다름. Share Coordinator가 가용상태에 따라 동적으로 정함.

 

 

반응형