본문 바로가기

빅데이터/Kafka

Kafka ConsumerRecord의 timestamp는 0.10.0.0 이후부터 사용가능합니다.

카프카의 ConsumerRecord를 살펴보면 메시지 키와 메시지 값 이외에 추가로 timestamp가 있는 것을 확인할 수 있습니다. timestamp는 Kafka 프로듀서가 ProducerRecord를 생성하면서 timestamp 값을 Unix time으로 입력합니다. 만약 다른 timestamp를 넣고 싶다면 직접 설정하여 넣을수도 있습니다.

 

해당 건은 KAFKA-3025에서 논의했던 내용으로 자세한 내용은 해당 jira에서 확인할 수 있습니다.

https://issues.apache.org/jira/browse/KAFKA-3025

 

[KAFKA-3025] KIP-31&KIP-32 (part 1): Add timestamp field to message, configs, and Producer/ConsumerRecord - ASF JIRA

This JIRA is for changes for KIP-32 excluding broker checking and acting on timestamp field in a message. This JIRA includes: 1. Bump up MessageAndOffset version to version 1 to: a. Change absolute offset to relative offset. b. Add time field to the messag

issues.apache.org

해당 버젼은 RESOLVED되었으며 0.10.0.0 에서 적용되었음을 알 수 있습니다. 이전버젼에서는 timestamp가 유효하지 않고 사용되지 않았다는 것을 알 수 있죠. 0.9 이전버젼의 record에는 timestamp가 존재하지 않습니다. 이 내용은 ConsumerRecord(2.5버젼).java 에서도 확인할 수 있습니다.

/**
 * Creates a record to be received from a specified topic and partition (provided for
 * compatibility with Kafka 0.9 before the message format supported timestamps and before
 * serialized metadata were exposed).
 *
 * @param topic The topic this record is received from
 * @param partition The partition of the topic this record is received from
 * @param offset The offset of this record in the corresponding Kafka partition
 * @param key The key of the record, if one exists (null is allowed)
 * @param value The record contents
 */
public ConsumerRecord(String topic,
                      int partition,
                      long offset,
                      K key,
                      V value) {
    this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
            NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
}

 

반응형