빅데이터/Kafka

Cannot get state store TOPIC because the stream thread is STARTING, not RUNNING 에러 해결

AndersonChoi 2021. 6. 16. 10:17

KTable을 Materialized View로 사용할 경우 아래와 같은 에러가 발생할 때가 있습니다.

Exception in thread "Timer-0" org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store address because the stream thread is STARTING, not RUNNING
	at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
	at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
	at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
	at com.example.stateful.MaterializedTable$ScheduledJob.run(MaterializedTable.java:60)
	at java.util.TimerThread.mainLoop(Timer.java:555)
	at java.util.TimerThread.run(Timer.java:505)

상기 에러는 address 토픽에 대해 KTable로 만들고 keyValueStore로 바로 선언할 경우 에러가 발생하였습니다. 이 경우는 streams가 시작했지만 코디네이터에 의해 토픽의 파티션이 스트림즈 애플리케이션에 할당되는데 시간이 걸리기 때문에 지연이 발생해서 생기는 현상입니다. 즉, KTable로 선언한 address토픽이 모두 할당이 되어 구체화된 뷰가 생성되고 난 뒤에 호출이 되어야 합니다. 그렇기 때문에 STARTING 이라고 노출되는 것입니다. 

 

이런 상태를 확인하는 방법은 다음과 같습니다. streams의 상태를 확인하기 위해 아래와 같이 생애주기를 확인할 수 있습니다.

streams.state().isValidTransition(KafkaStreams.State.RUNNING);

상기와 같이 RUNNING이 확인된 이후에 keyValueStore를 사용하면 됩니다.

 

스트림즈 생애주기

Kafka Streams states are the possible state that a Kafka Streams instance can be in. An instance must only be in one state at a time. The expected state transition with the following defined states is:
                       +--------------+
               +<----- | Created (0)  |
               |       +-----+--------+
               |             |
               |             v
               |       +----+--+------+
               |       | Re-          |
               +<----- | Balancing (1)| -------->+
               |       +-----+-+------+          |
               |             | ^                 |
               |             v |                 |
               |       +--------------+          v
               |       | Running (2)  | -------->+
               |       +------+-------+          |
               |              |                  |
               |              v                  |
               |       +------+-------+     +----+-------+
               +-----> | Pending      |     | Pending    |
                       | Shutdown (3) |     | Error (5)  |
                       +------+-------+     +-----+------+
                              |                   |
                              v                   v
                       +------+-------+     +-----+--------+
                       | Not          |     | Error (6)    |
                       | Running (4)  |     +--------------+
                       +--------------+

스트림즈의 생애주기는 Created, Re-Balancing, Running, Pending Shutdown, Not Running, Pending Error, Error로 이루어져 있습니다. 컨슈머를 운영한것과 동일하게 Running과 Re-Balancing이 서로 이동 가능함을 확인할 수 있습니다.

public enum State {
        CREATED(1, 3),          // 0
        REBALANCING(2, 3, 5),   // 1
        RUNNING(1, 2, 3, 5),    // 2
        PENDING_SHUTDOWN(4),    // 3
        NOT_RUNNING,            // 4
        PENDING_ERROR(6),       // 5
        ERROR;    
        ...
}

 

 

반응형