본문 바로가기

빅데이터/Kafka

특정 시점(날짜+시간)의 레코드부터 가져오도록 설정하기.

카프카의 토픽에 저장된 레코드는 0.11.0.0 이후부터 timestamp가 저장됩니다. 그러므로 kafka-consumer-group.sh 스크립트로 오프셋을 직접 지정할 수 있는데 --to-datetime을 사용하면 특정 시점(시간) 데이터부터 가져가도록 설정할 수 있습니다.

 

Example) 

토픽의 레코드가 다음과 같이 있다고 가정합니다.

ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1614748497034, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = a)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1614748497722, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = b)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1614748497923, serialized key size = -1, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = )
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1614748498554, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = c)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1614748498826, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 10, CreateTime = 1614748519187, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = a)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1614748519411, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = b)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1614748519595, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = c)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 13, CreateTime = 1614748519779, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 14, CreateTime = 1614748520019, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = e)

총 10개의 레코드이고 모두 test 토픽, 0번 파티션에 존재하고 있습니다.

CreateTime을 보면 1614748497034 부터 1614748520019 까지 기간에 생성된 것이라는 것을 확인할 수 있습니다. 이 시간은 unix타임으로 GMT기준으로 변환하면 

Unixtime 1614748497034 → Wed Mar 03 2021 05:14:57 GMT+0000
Unixtime 1614748520019 Wed Mar 03 2021 05:15:20 GMT+0000

입니다.

 

만약 15분 00초(GMT 기준)부터 데이터를 가져가고 싶다면 아래와 같은 명령어로 컨슈머 그룹의 오프셋 위치를 지정할 수 있습니다.

$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic test --group test-group --to-datetime 2021-03-03T05:15:00.000 --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
test-group                     test                           0          10

설정 이후에 컨슈머 애플리케이션을 실행하면 15분 00초 이후의 데이터를 컨슈밍 하는 것을 확인할 수 있습니다.

ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 10, CreateTime = 1614748519187, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = a)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1614748519411, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = b)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1614748519595, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = c)
...생략 

(Unixtime 1614748519187  → Wed Mar 03 2021 05:15:19 GMT+0000)

 

참고사이트

유닉스타임 계산기 : www.unixtimestamp.com/index.php

반응형