카프카 스트림즈 사용시 토폴로지 내부 에러로 인해 아래와 같은 오류가 발생할 수 있다.
All stream threads have died. The instance will be in error state and should be closed.
이에 대응하기 위해 2.8.0에서 추가된 setUncaughtExceptionHandler메서드를 사용하면 스레드를 재시작하여 데이터를 지속 처리할 수 있다.
streams.setUncaughtExceptionHandler((exception)
-> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
2.8.0 이전에는 setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) 을 사용했으나, Depreacted 되었다.
setUncaughtExceptionHandler
Set the handler invoked when an internal stream thread throws an unexpected exception. These might be exceptions indicating rare bugs in Kafka Streams, or they might be exceptions thrown by your code, for example a NullPointerException thrown from your processor logic. The handler will execute on the thread that produced the exception. In order to get the thread that threw the exception, use Thread.currentThread().
Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any thread that encounters such an exception.
StreamsUncaughtExceptionHandler에 등록할 수 있는 것은 아래와 같다.
- REPLACE_THREAD
- SHUTDOWN_CLIENT
- SHUTDOWN_APPLICATION
public interface StreamsUncaughtExceptionHandler {
/**
* Inspect the exception received in a stream thread and respond with an action.
* @param exception the actual exception
*/
StreamThreadExceptionResponse handle(final Throwable exception);
/**
* Enumeration that describes the response from the exception handler.
*/
enum StreamThreadExceptionResponse {
REPLACE_THREAD(0, "REPLACE_THREAD"),
SHUTDOWN_CLIENT(1, "SHUTDOWN_KAFKA_STREAMS_CLIENT"),
SHUTDOWN_APPLICATION(2, "SHUTDOWN_KAFKA_STREAMS_APPLICATION");
/** an english description of the api--this is for debugging and can change */
public final String name;
/** the permanent and immutable id of an API--this can't change ever */
public final int id;
StreamThreadExceptionResponse(final int id, final String name) {
this.id = id;
this.name = name;
}
}
}
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 스트림즈 KTable로 선언한 토픽을 key-value 테이블로 사용하기 (0) | 2021.06.16 |
---|---|
kafka spark structured stream 예제코드 및 실행 (0) | 2021.06.04 |
카프카 스트림즈 Exactly-once 설정하는 방법과 내부 동작 (0) | 2021.06.03 |
ProducerRecord에 파티션 번호를 지정하면 어떻게 동작할까? (0) | 2021.05.05 |
카프카를 이벤트 소싱, CQRS로 사용할 수 있을까? (2) | 2021.05.03 |
confluent-kafka-go 컨슈머를 구현하는 5가지 방법 (0) | 2021.04.30 |