본문 바로가기

빅데이터/Kafka

카프카 스트림즈 All stream threads have died. 오류 해결 방안

카프카 스트림즈 사용시 토폴로지 내부 에러로 인해 아래와 같은 오류가 발생할 수 있다.

All stream threads have died. The instance will be in error state and should be closed.

 

이에 대응하기 위해 2.8.0에서 추가된 setUncaughtExceptionHandler메서드를 사용하면 스레드를 재시작하여 데이터를 지속 처리할 수 있다.

streams.setUncaughtExceptionHandler((exception) 
         -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);

2.8.0 이전에는 setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) 을 사용했으나, Depreacted 되었다.

 

setUncaughtExceptionHandler

Set the handler invoked when an internal stream thread throws an unexpected exception. These might be exceptions indicating rare bugs in Kafka Streams, or they might be exceptions thrown by your code, for example a NullPointerException thrown from your processor logic. The handler will execute on the thread that produced the exception. In order to get the thread that threw the exception, use Thread.currentThread().
Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any thread that encounters such an exception.

StreamsUncaughtExceptionHandler에 등록할 수 있는 것은 아래와 같다.

- REPLACE_THREAD
- SHUTDOWN_CLIENT
- SHUTDOWN_APPLICATION

 

public interface StreamsUncaughtExceptionHandler {
    /**
     * Inspect the exception received in a stream thread and respond with an action.
     * @param exception the actual exception
     */
    StreamThreadExceptionResponse handle(final Throwable exception);

    /**
     * Enumeration that describes the response from the exception handler.
     */
    enum StreamThreadExceptionResponse {
        REPLACE_THREAD(0, "REPLACE_THREAD"),
        SHUTDOWN_CLIENT(1, "SHUTDOWN_KAFKA_STREAMS_CLIENT"),
        SHUTDOWN_APPLICATION(2, "SHUTDOWN_KAFKA_STREAMS_APPLICATION");

        /** an english description of the api--this is for debugging and can change */
        public final String name;

        /** the permanent and immutable id of an API--this can't change ever */
        public final int id;

        StreamThreadExceptionResponse(final int id, final String name) {
            this.id = id;
            this.name = name;
        }
    }
}

 

 

반응형