본문 바로가기

빅데이터/Kafka

KIP-932: Queues for Kafka 조사

https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka

 

KIP-932: Queues for Kafka - Apache Kafka - Apache Software Foundation

Status Current state: Accepted Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq JIRA: https://issues.apache.org/jira/browse/KAFKA-16092 Please keep the discussion on the mailing list rather than commenting on the wiki (

cwiki.apache.org

KIP-932는 Kafka에 새로운 Queue Topic을 도입하여 기존 Topic/Partition 모델의 한계를 보완하고, 메시지를 단일 Consumer에 할당하는 Single-Consumer Semantics를 제공하려는 제안임. Queue Topic은 기존 Kafka Topic과 달리 FIFO(First-In-First-Out) 메시지 순서를 보장하며, 메시지 처리의 동적 확장성을 제공함.

 

프로듀서는 어떻게 다르게 동작하나?

프로듀서의 동작은 기존과 거의 동일하지만, 파티션 키가 불필요하다. Queue Topic이라고 불리는 이 토픽은 전체에서 순서를 유지하도록 설계되어 있다.

Queue 토픽

- QUEUE 모드라는 것을 명시적으로 선언하지 않아도 됨. 이런 동작을 위해서 컨슈머측에서만 따로 그룹 설정시 정의하면 됨.

- FIFO 순서보장 : 전체 Queue에 대해 FIFO 유지. 파티션들을 단일 파티션으로 보이게 하는 것.

- 내부적으로 파티션을 사용하긴하지만 컨슈머입장에서는 단일 큐로 보임.

 

컨슈머가 동작하게 하는 방법?

kafka-console-consumer.sh --bootstrap-server <broker-address> \
    --topic <topic-name> \
    --group <share-group-name> \
    --consumer-property group.type=share

GroupType에 "share"라는 ENUM이 새로 만들어지게됨. 그리고 kafka-console-share-consumer.sh라는 새로운 쉘 명령어가 만들어질듯함.

Option Description
--bootstrap-server <String: server to connect to> REQUIRED: The server(s) to connect to.
--consumer-config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--consumer-property <String: consumer_prop> Consumer property in the form key=value.
--enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)
--formatter <String: class> The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)
--formatter-config <String: config file> Config properties file to initialize the message formatter. Note that [property] takes precedence of this config.
--group <String: share groud id> The share group id of the consumer. (default: "console-share-consumer" )
--help Print usage information.
--key-deserializer <String: deserializer for keys> The name of the class to use for deserializing keys.
--max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual.
--property <String: prop> The properties to initialize the message formatter. Default properties include:
 print.timestamp=true|false
 print.key=true|false
 print.offset=true|false
 print.delivery=true|false
 print.partition=true|false
 print.headers=true|false
 print.value=true|false
 key.separator=<key.separator>
 line.separator=<line.separator>
 headers.separator=<line.separator>
 null.literal=<null.literal>
 key.deserializer=<key.deserializer>
 value.deserializer=<value.deserializer>
 header.deserializer=<header.deserializer>
Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.
--reject If specified, messages are rejected as they are consumed.
--reject-message-on-error If there is an error when processing a message, reject it instead of halting.
--release If specified, messages are released as they are consumed.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specific interval.
--topic <String: topic> REQUIRED: The topic to consume from.
--value-deserializer <String: deserializer for values> The name of the class to use for deserializing values.
--version Display Kafka version.

실제 코드는 어떻게 될까?

아직 모든게 만들어지지 않아서 앞으로 변화될 여지는 있지만, 아래와 같이 설정될 것으로 추정됨.

import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ShareConsumerConfig;

import java.time.Duration;
import java.util.Collections;

public class SharedGroupConsumer {
    public static void main(String[] args) {
        // Kafka Share Consumer 설정
        ShareConsumerConfig config = new ShareConsumerConfig();
        config.setBootstrapServers("localhost:9092");
        config.setGroupId("my-shared-group");
        config.setTopic("my-queue-topic");

        // KafkaShareConsumer 생성
        try (KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(config)) {
            // 메시지 처리 루프
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("Consumed record: key = %s, value = %s, offset = %d%n",
                            record.key(), record.value(), record.offset());
                    // 메시지 처리 후 Ack 전송
                    consumer.ack(record);
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

관련 PR

- https://github.com/apache/kafka/pull/16461/files
- https://github.com/apache/kafka/pull/16134/files#diff-d1ccc49e6566a37e558ff2cecc49d1d984465b2b26d0d676f4a2f143fe3b3d86

 

trunk branch에 merge되고 있으므로 trunk에서 확인 가능

https://github.com/apache/kafka/tree/trunk

 

GitHub - apache/kafka: Mirror of Apache Kafka

Mirror of Apache Kafka. Contribute to apache/kafka development by creating an account on GitHub.

github.com

 

반응형