카프카 스트림즈에는 suppress()메서드가 있다. 이 메서드를 통해 일정 윈도우 간격 안에 데이터를 모아서 처리할 수 있는데, 이슈가 있다. 값을 모아서 state store(rocksDB)에 저장할 때 제대로 deserialize를 하지못하는것이다.
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
이 에러를 해결하기 위해 아래와 같은 방법으로 임시로 해결할 수 있다.
KTable<Windowed<String>, Long> KT01 = myStream
.groupByKey()
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(0)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>
as("streams-cell-tp-winagg").withKeySerde(Serdes.String())
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
중요 요지는 count(), reduce(), aggregation() 메서드에 Materialized를 지정하고 반드시 WindowStore로 지정하는 것이다.
이 내용은 현재 issues.apache.org/jira/browse/KAFKA-9259 에서 해결하려고 하고 있는데, 2.1.0 버전에서 나온 버그를 아직 안고치고 있다. 아마도 우선순위가 낮아서 밀리는것 같다.
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
레디슈 큐(queue), 레디스 스트림(streams), 레디스 펍섭(pub/sub) 그리고 카프카와 비교 (0) | 2021.04.25 |
---|---|
카프카 스트림즈의 commit.interval.ms옵션 (2) | 2021.04.15 |
초~중급자를 위한 [아파치 카프카 애플리케이션 개발] 서적을 출간하였습니다. (12) | 2021.04.11 |
Kafka streams 에러 : Size of data received by LongDeserializer is not 8 (0) | 2021.04.06 |
카프카 스트림즈 애플리케이션 초기화 명령 (0) | 2021.03.31 |
local kafka single broker 띄우기 with 도커 (0) | 2021.03.05 |