본문 바로가기

빅데이터/Kafka

카프카 스트림즈 suppress() 사용할 때 ClassCastException 문제

카프카 스트림즈에는 suppress()메서드가 있다. 이 메서드를 통해 일정 윈도우 간격 안에 데이터를 모아서 처리할 수 있는데, 이슈가 있다. 값을 모아서 state store(rocksDB)에 저장할 때 제대로 deserialize를 하지못하는것이다.

java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)

이 에러를 해결하기 위해 아래와 같은 방법으로 임시로 해결할 수 있다.

KTable<Windowed<String>, Long> KT01 = myStream
        .groupByKey()
        .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(0)))
        .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>
                as("streams-cell-tp-winagg").withKeySerde(Serdes.String())
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

중요 요지는 count(), reduce(), aggregation() 메서드에 Materialized를 지정하고 반드시 WindowStore로 지정하는 것이다.

 

이 내용은 현재 issues.apache.org/jira/browse/KAFKA-9259 에서 해결하려고 하고 있는데, 2.1.0 버전에서 나온 버그를 아직 안고치고 있다. 아마도 우선순위가 낮아서 밀리는것 같다.

반응형