본문 바로가기

빅데이터/Kafka

카프카 장애대응 - Consumer offset 지정하기(by partition)

카프카 consumer로 입수를 진행하다보면 예상치 못하게 데이터가 중복입수 또는 유실될 가능성이 있습니다.

 

- Kafka broker 이슈

- Network 이슈

- Consumer application 이슈

 

위와 같은 이슈가 발생했을 경우 이슈가 발생했던 시점보다 더 이전의 데이터부터 입수를 진행해야하는데 이때 offset을 지정해야합니다. offset을 지정하는 방법은 아래와 같습니다.

 

1) Consumer 생성

Consumer 생성은 재입수 하고자 하는 Consumer에 지정하여 신규로 생성.

2) Offset 지정(by console shell script)

./kafka-consumer-groups shell을 통해서 offset을 reset할 수 있습니다.

 

offset reset 옵션:

--shift-by : Long (+/- 모두 가능)
--to-offset : Long
--by-duration  : PnDTnHnMnS
--to-datetime  : YYYY-MM-DDTHH:mm:SS.sss
--to-current
--to-latest
--to-earliest

 

만약 click 이라는 토픽에  click_re_consumer 라는 consumer에 대해 offset을 earliest로 지정하고 싶다면 아래와 같이 호출하면 됩니다.

$ ./kafka-consumer-groups --bootstrap-server {bootstrap 정보} \
  --group click --topic click_re_consumer \
  --reset-offsets --to-earliest --execute

만약 특정 partition(여기서는 1번)의 데이터만 offset을 30으로 지정하고 싶다면 아래와 같이 topic명 뒤에 partition번호를 붙여주면 됩니다.

./kafka-consumer-groups --bootstrap-server {bootstrap 정보} \
--group click --topic click_re_consumer:1 \
--reset-offsets --to-offset 30 --execute

3) 재입수 진행

click_re_consumer 이름으로 click 토픽으로부터 재입수를 진행합니다.

태그