KTable을 Materialized View로 사용할 경우 아래와 같은 에러가 발생할 때가 있습니다.
Exception in thread "Timer-0" org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store address because the stream thread is STARTING, not RUNNING
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
at com.example.stateful.MaterializedTable$ScheduledJob.run(MaterializedTable.java:60)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
상기 에러는 address 토픽에 대해 KTable로 만들고 keyValueStore로 바로 선언할 경우 에러가 발생하였습니다. 이 경우는 streams가 시작했지만 코디네이터에 의해 토픽의 파티션이 스트림즈 애플리케이션에 할당되는데 시간이 걸리기 때문에 지연이 발생해서 생기는 현상입니다. 즉, KTable로 선언한 address토픽이 모두 할당이 되어 구체화된 뷰가 생성되고 난 뒤에 호출이 되어야 합니다. 그렇기 때문에 STARTING 이라고 노출되는 것입니다.
이런 상태를 확인하는 방법은 다음과 같습니다. streams의 상태를 확인하기 위해 아래와 같이 생애주기를 확인할 수 있습니다.
streams.state().isValidTransition(KafkaStreams.State.RUNNING);
상기와 같이 RUNNING이 확인된 이후에 keyValueStore를 사용하면 됩니다.
스트림즈 생애주기
Kafka Streams states are the possible state that a Kafka Streams instance can be in. An instance must only be in one state at a time. The expected state transition with the following defined states is:
+--------------+
+<----- | Created (0) |
| +-----+--------+
| |
| v
| +----+--+------+
| | Re- |
+<----- | Balancing (1)| -------->+
| +-----+-+------+ |
| | ^ |
| v | |
| +--------------+ v
| | Running (2) | -------->+
| +------+-------+ |
| | |
| v |
| +------+-------+ +----+-------+
+-----> | Pending | | Pending |
| Shutdown (3) | | Error (5) |
+------+-------+ +-----+------+
| |
v v
+------+-------+ +-----+--------+
| Not | | Error (6) |
| Running (4) | +--------------+
+--------------+
스트림즈의 생애주기는 Created, Re-Balancing, Running, Pending Shutdown, Not Running, Pending Error, Error로 이루어져 있습니다. 컨슈머를 운영한것과 동일하게 Running과 Re-Balancing이 서로 이동 가능함을 확인할 수 있습니다.
public enum State {
CREATED(1, 3), // 0
REBALANCING(2, 3, 5), // 1
RUNNING(1, 2, 3, 5), // 2
PENDING_SHUTDOWN(4), // 3
NOT_RUNNING, // 4
PENDING_ERROR(6), // 5
ERROR;
...
}
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 스트림즈로 schedule operation 수행하기(번역) (3) | 2021.07.23 |
---|---|
카프카 스트림즈에서 SlidingWindow에 대한 고찰 (0) | 2021.06.25 |
카프카 스트림즈 join 사용시 메시지 키 접근하기 (0) | 2021.06.21 |
카프카 스트림즈 KTable로 선언한 토픽을 key-value 테이블로 사용하기 (0) | 2021.06.16 |
kafka spark structured stream 예제코드 및 실행 (0) | 2021.06.04 |
카프카 스트림즈 Exactly-once 설정하는 방법과 내부 동작 (0) | 2021.06.03 |