카프카의 토픽에 저장된 레코드는 0.11.0.0 이후부터 timestamp가 저장됩니다. 그러므로 kafka-consumer-group.sh 스크립트로 오프셋을 직접 지정할 수 있는데 --to-datetime을 사용하면 특정 시점(시간) 데이터부터 가져가도록 설정할 수 있습니다.
Example)
토픽의 레코드가 다음과 같이 있다고 가정합니다.
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1614748497034, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = a)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1614748497722, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = b)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1614748497923, serialized key size = -1, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = )
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1614748498554, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = c)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1614748498826, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 10, CreateTime = 1614748519187, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = a)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1614748519411, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = b)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1614748519595, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = c)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 13, CreateTime = 1614748519779, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 14, CreateTime = 1614748520019, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = e)
총 10개의 레코드이고 모두 test 토픽, 0번 파티션에 존재하고 있습니다.
CreateTime을 보면 1614748497034 부터 1614748520019 까지 기간에 생성된 것이라는 것을 확인할 수 있습니다. 이 시간은 unix타임으로 GMT기준으로 변환하면
Unixtime 1614748497034 → Wed Mar 03 2021 05:14:57 GMT+0000
Unixtime 1614748520019 → Wed Mar 03 2021 05:15:20 GMT+0000
입니다.
만약 15분 00초(GMT 기준)부터 데이터를 가져가고 싶다면 아래와 같은 명령어로 컨슈머 그룹의 오프셋 위치를 지정할 수 있습니다.
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic test --group test-group --to-datetime 2021-03-03T05:15:00.000 --execute
GROUP TOPIC PARTITION NEW-OFFSET
test-group test 0 10
설정 이후에 컨슈머 애플리케이션을 실행하면 15분 00초 이후의 데이터를 컨슈밍 하는 것을 확인할 수 있습니다.
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 10, CreateTime = 1614748519187, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = a)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1614748519411, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = b)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1614748519595, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = c)
...생략
(Unixtime 1614748519187 → Wed Mar 03 2021 05:15:19 GMT+0000)
참고사이트
유닉스타임 계산기 : www.unixtimestamp.com/index.php
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
Kafka streams 에러 : Size of data received by LongDeserializer is not 8 (0) | 2021.04.06 |
---|---|
카프카 스트림즈 애플리케이션 초기화 명령 (0) | 2021.03.31 |
local kafka single broker 띄우기 with 도커 (0) | 2021.03.05 |
아파치 카프카를 데이터 레이크로 사용할 수 있을까? (1) | 2021.02.25 |
macOS에서 카프카 버로우 빌드 및 실행하기. (0) | 2020.11.29 |
카프카 토픽의 오프셋 최대 크기는 얼마일까? (0) | 2020.11.17 |