본문 바로가기

빅데이터/Kafka

카프카 스트림즈 애플리케이션 초기화 명령

카프카 스트림즈 애플리케이션을 운영하다가 또는 테스트 중에 오프셋을 초기화 해야할 때가 있습니다. 이때는 아래와 같은 명령어를 사용하면 됩니다.

$ ./kafka-streams-application-reset.sh --application-id join-application --input-topics streams-test

Reset-offsets for input topics [A.s2olap-kakaotalk]
Following input topics offsets will be reset to (for consumer group join-application)
Topic: streams-test Partition: 2 Offset: 0
Topic: streams-test Partition: 3 Offset: 0
Topic: streams-test Partition: 0 Offset: 0
Topic: streams-test Partition: 1 Offset: 0
Topic: streams-test Partition: 6 Offset: 0
Topic: streams-test Partition: 7 Offset: 0
Topic: streams-test Partition: 4 Offset: 0
Topic: streams-test Partition: 5 Offset: 0
Done.
Deleting all internal/auto-created topics for application join-application
Done.

--aplication-id는 필수 값입니다.
--bootstrap-servers는 필수 값이 아니지만, 넣지 않으면 localhost:9092를 기준으로 작동하게 됩니다.

 

그 외 옵션값에 대한 설명은 아래를 참고해주세요.

Option (* = required)                 Description
---------------------                 -----------
* --application-id <String: id>       The Kafka Streams application ID
                                        (application.id).
--bootstrap-servers <String: urls>    Comma-separated list of broker urls with
                                        format: HOST1:PORT1,HOST2:PORT2
                                        (default: localhost:9092)
--by-duration <String>                Reset offsets to offset by duration from
                                        current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name>     Property file containing configs to be
                                        passed to admin clients and embedded
                                        consumer.
--dry-run                             Display the actions that would be
                                        performed without executing the reset
                                        commands.
--execute                             Execute the command.
--force                               Force the removal of members of the
                                        consumer group (intended to remove
                                        stopped members if a long session
                                        timeout was used). Make sure to shut
                                        down all stream applications when this
                                        option is specified to avoid unexpected
                                        rebalances.
--from-file <String>                  Reset offsets to values defined in CSV
                                        file.
--help                                Print usage information.
--input-topics <String: list>         Comma-separated list of user input
                                        topics. For these topics, the tool will
                                        reset the offset to the earliest
                                        available offset.
--intermediate-topics <String: list>  Comma-separated list of intermediate user
                                        topics (topics that are input and
                                        output topics, e.g., used in the
                                        deprecated through() method). For these
                                        topics, the tool will skip to the end.
--shift-by <Long: number-of-offsets>  Reset offsets shifting current offset by
                                        'n', where 'n' can be positive or
                                        negative
--to-datetime <String>                Reset offsets to offset from datetime.
                                        Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest                         Reset offsets to earliest offset.
--to-latest                           Reset offsets to latest offset.
--to-offset <Long>                    Reset offsets to a specific offset.
--version                             Print version information and exit.
--zookeeper                           Zookeeper option is deprecated by
                                        bootstrap.servers, as the reset tool
                                        would no longer access Zookeeper
                                        directly.
반응형