KIP-932: Queues for Kafka 조사
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
KIP-932: Queues for Kafka - Apache Kafka - Apache Software Foundation
Status Current state: Accepted Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq JIRA: https://issues.apache.org/jira/browse/KAFKA-16092 Please keep the discussion on the mailing list rather than commenting on the wiki (
cwiki.apache.org
KIP-932는 Kafka에 새로운 Queue Topic을 도입하여 기존 Topic/Partition 모델의 한계를 보완하고, 메시지를 단일 Consumer에 할당하는 Single-Consumer Semantics를 제공하려는 제안임. Queue Topic은 기존 Kafka Topic과 달리 FIFO(First-In-First-Out) 메시지 순서를 보장하며, 메시지 처리의 동적 확장성을 제공함.
프로듀서는 어떻게 다르게 동작하나?
프로듀서의 동작은 기존과 거의 동일하지만, 파티션 키가 불필요하다. Queue Topic이라고 불리는 이 토픽은 전체에서 순서를 유지하도록 설계되어 있다.
Queue 토픽
- QUEUE 모드라는 것을 명시적으로 선언하지 않아도 됨. 이런 동작을 위해서 컨슈머측에서만 따로 그룹 설정시 정의하면 됨.
- FIFO 순서보장 : 전체 Queue에 대해 FIFO 유지. 파티션들을 단일 파티션으로 보이게 하는 것.
- 내부적으로 파티션을 사용하긴하지만 컨슈머입장에서는 단일 큐로 보임.
컨슈머가 동작하게 하는 방법?
kafka-console-consumer.sh --bootstrap-server <broker-address> \
    --topic <topic-name> \
    --group <share-group-name> \
    --consumer-property group.type=shareGroupType에 "share"라는 ENUM이 새로 만들어지게됨. 그리고 kafka-console-share-consumer.sh라는 새로운 쉘 명령어가 만들어질듯함.
| Option | Description | 
| --bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. | 
| --consumer-config <String: config file> | Consumer config properties file. Note that [consumer-property] takes precedence over this config. | 
| --consumer-property <String: consumer_prop> | Consumer property in the form key=value. | 
| --enable-systest-events | Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) | 
| --formatter <String: class> | The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter) | 
| --formatter-config <String: config file> | Config properties file to initialize the message formatter. Note that [property] takes precedence of this config. | 
| --group <String: share groud id> | The share group id of the consumer. (default: "console-share-consumer" ) | 
| --help | Print usage information. | 
| --key-deserializer <String: deserializer for keys> | The name of the class to use for deserializing keys. | 
| --max-messages <Integer: num_messages> | The maximum number of messages to consume before exiting. If not set, consumption is continual. | 
| --property <String: prop> | The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.offset=true|false print.delivery=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> header.deserializer=<header.deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers. | 
| --reject | If specified, messages are rejected as they are consumed. | 
| --reject-message-on-error | If there is an error when processing a message, reject it instead of halting. | 
| --release | If specified, messages are released as they are consumed. | 
| --timeout-ms <Integer: timeout_ms> | If specified, exit if no message is available for consumption for the specific interval. | 
| --topic <String: topic> | REQUIRED: The topic to consume from. | 
| --value-deserializer <String: deserializer for values> | The name of the class to use for deserializing values. | 
| --version | Display Kafka version. | 
실제 코드는 어떻게 될까?
아직 모든게 만들어지지 않아서 앞으로 변화될 여지는 있지만, 아래와 같이 설정될 것으로 추정됨.
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ShareConsumerConfig;
import java.time.Duration;
import java.util.Collections;
public class SharedGroupConsumer {
    public static void main(String[] args) {
        // Kafka Share Consumer 설정
        ShareConsumerConfig config = new ShareConsumerConfig();
        config.setBootstrapServers("localhost:9092");
        config.setGroupId("my-shared-group");
        config.setTopic("my-queue-topic");
        // KafkaShareConsumer 생성
        try (KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(config)) {
            // 메시지 처리 루프
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("Consumed record: key = %s, value = %s, offset = %d%n",
                            record.key(), record.value(), record.offset());
                    // 메시지 처리 후 Ack 전송
                    consumer.ack(record);
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
관련 PR
- https://github.com/apache/kafka/pull/16461/files
- https://github.com/apache/kafka/pull/16134/files#diff-d1ccc49e6566a37e558ff2cecc49d1d984465b2b26d0d676f4a2f143fe3b3d86
trunk branch에 merge되고 있으므로 trunk에서 확인 가능
https://github.com/apache/kafka/tree/trunk
GitHub - apache/kafka: Mirror of Apache Kafka
Mirror of Apache Kafka. Contribute to apache/kafka development by creating an account on GitHub.
github.com