카프카 스트림즈 애플리케이션을 운영하다가 또는 테스트 중에 오프셋을 초기화 해야할 때가 있습니다. 이때는 아래와 같은 명령어를 사용하면 됩니다.
$ ./kafka-streams-application-reset.sh --application-id join-application --input-topics streams-test
Reset-offsets for input topics [A.s2olap-kakaotalk]
Following input topics offsets will be reset to (for consumer group join-application)
Topic: streams-test Partition: 2 Offset: 0
Topic: streams-test Partition: 3 Offset: 0
Topic: streams-test Partition: 0 Offset: 0
Topic: streams-test Partition: 1 Offset: 0
Topic: streams-test Partition: 6 Offset: 0
Topic: streams-test Partition: 7 Offset: 0
Topic: streams-test Partition: 4 Offset: 0
Topic: streams-test Partition: 5 Offset: 0
Done.
Deleting all internal/auto-created topics for application join-application
Done.
--aplication-id는 필수 값입니다.
--bootstrap-servers는 필수 값이 아니지만, 넣지 않으면 localhost:9092를 기준으로 작동하게 됩니다.
그 외 옵션값에 대한 설명은 아래를 참고해주세요.
Option (* = required) Description
--------------------- -----------
* --application-id <String: id> The Kafka Streams application ID
(application.id).
--bootstrap-servers <String: urls> Comma-separated list of broker urls with
format: HOST1:PORT1,HOST2:PORT2
(default: localhost:9092)
--by-duration <String> Reset offsets to offset by duration from
current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name> Property file containing configs to be
passed to admin clients and embedded
consumer.
--dry-run Display the actions that would be
performed without executing the reset
commands.
--execute Execute the command.
--force Force the removal of members of the
consumer group (intended to remove
stopped members if a long session
timeout was used). Make sure to shut
down all stream applications when this
option is specified to avoid unexpected
rebalances.
--from-file <String> Reset offsets to values defined in CSV
file.
--help Print usage information.
--input-topics <String: list> Comma-separated list of user input
topics. For these topics, the tool will
reset the offset to the earliest
available offset.
--intermediate-topics <String: list> Comma-separated list of intermediate user
topics (topics that are input and
output topics, e.g., used in the
deprecated through() method). For these
topics, the tool will skip to the end.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
'n', where 'n' can be positive or
negative
--to-datetime <String> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long> Reset offsets to a specific offset.
--version Print version information and exit.
--zookeeper Zookeeper option is deprecated by
bootstrap.servers, as the reset tool
would no longer access Zookeeper
directly.
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
초~중급자를 위한 [아파치 카프카 애플리케이션 개발] 서적을 출간하였습니다. (12) | 2021.04.11 |
---|---|
카프카 스트림즈 suppress() 사용할 때 ClassCastException 문제 (0) | 2021.04.06 |
Kafka streams 에러 : Size of data received by LongDeserializer is not 8 (0) | 2021.04.06 |
local kafka single broker 띄우기 with 도커 (0) | 2021.03.05 |
특정 시점(날짜+시간)의 레코드부터 가져오도록 설정하기. (2) | 2021.03.03 |
아파치 카프카를 데이터 레이크로 사용할 수 있을까? (1) | 2021.02.25 |