KIP-932: Queues for Kafka 조사
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
KIP-932는 Kafka에 새로운 Queue Topic을 도입하여 기존 Topic/Partition 모델의 한계를 보완하고, 메시지를 단일 Consumer에 할당하는 Single-Consumer Semantics를 제공하려는 제안임. Queue Topic은 기존 Kafka Topic과 달리 FIFO(First-In-First-Out) 메시지 순서를 보장하며, 메시지 처리의 동적 확장성을 제공함.
프로듀서는 어떻게 다르게 동작하나?
프로듀서의 동작은 기존과 거의 동일하지만, 파티션 키가 불필요하다. Queue Topic이라고 불리는 이 토픽은 전체에서 순서를 유지하도록 설계되어 있다.
Queue 토픽
- QUEUE 모드라는 것을 명시적으로 선언하지 않아도 됨. 이런 동작을 위해서 컨슈머측에서만 따로 그룹 설정시 정의하면 됨.
- FIFO 순서보장 : 전체 Queue에 대해 FIFO 유지. 파티션들을 단일 파티션으로 보이게 하는 것.
- 내부적으로 파티션을 사용하긴하지만 컨슈머입장에서는 단일 큐로 보임.
컨슈머가 동작하게 하는 방법?
kafka-console-consumer.sh --bootstrap-server <broker-address> \
--topic <topic-name> \
--group <share-group-name> \
--consumer-property group.type=share
GroupType에 "share"라는 ENUM이 새로 만들어지게됨. 그리고 kafka-console-share-consumer.sh라는 새로운 쉘 명령어가 만들어질듯함.
Option | Description |
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
--consumer-config <String: config file> | Consumer config properties file. Note that [consumer-property] takes precedence over this config. |
--consumer-property <String: consumer_prop> | Consumer property in the form key=value. |
--enable-systest-events | Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) |
--formatter <String: class> | The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter) |
--formatter-config <String: config file> | Config properties file to initialize the message formatter. Note that [property] takes precedence of this config. |
--group <String: share groud id> | The share group id of the consumer. (default: "console-share-consumer" ) |
--help | Print usage information. |
--key-deserializer <String: deserializer for keys> | The name of the class to use for deserializing keys. |
--max-messages <Integer: num_messages> | The maximum number of messages to consume before exiting. If not set, consumption is continual. |
--property <String: prop> | The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.offset=true|false print.delivery=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> header.deserializer=<header.deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers. |
--reject | If specified, messages are rejected as they are consumed. |
--reject-message-on-error | If there is an error when processing a message, reject it instead of halting. |
--release | If specified, messages are released as they are consumed. |
--timeout-ms <Integer: timeout_ms> | If specified, exit if no message is available for consumption for the specific interval. |
--topic <String: topic> | REQUIRED: The topic to consume from. |
--value-deserializer <String: deserializer for values> | The name of the class to use for deserializing values. |
--version | Display Kafka version. |
실제 코드는 어떻게 될까?
아직 모든게 만들어지지 않아서 앞으로 변화될 여지는 있지만, 아래와 같이 설정될 것으로 추정됨.
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ShareConsumerConfig;
import java.time.Duration;
import java.util.Collections;
public class SharedGroupConsumer {
public static void main(String[] args) {
// Kafka Share Consumer 설정
ShareConsumerConfig config = new ShareConsumerConfig();
config.setBootstrapServers("localhost:9092");
config.setGroupId("my-shared-group");
config.setTopic("my-queue-topic");
// KafkaShareConsumer 생성
try (KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(config)) {
// 메시지 처리 루프
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("Consumed record: key = %s, value = %s, offset = %d%n",
record.key(), record.value(), record.offset());
// 메시지 처리 후 Ack 전송
consumer.ack(record);
});
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
관련 PR
- https://github.com/apache/kafka/pull/16461/files
- https://github.com/apache/kafka/pull/16134/files#diff-d1ccc49e6566a37e558ff2cecc49d1d984465b2b26d0d676f4a2f143fe3b3d86
trunk branch에 merge되고 있으므로 trunk에서 확인 가능
https://github.com/apache/kafka/tree/trunk