반응형
카프카 스트림즈에는 suppress()메서드가 있다. 이 메서드를 통해 일정 윈도우 간격 안에 데이터를 모아서 처리할 수 있는데, 이슈가 있다. 값을 모아서 state store(rocksDB)에 저장할 때 제대로 deserialize를 하지못하는것이다.
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
이 에러를 해결하기 위해 아래와 같은 방법으로 임시로 해결할 수 있다.
KTable<Windowed<String>, Long> KT01 = myStream
.groupByKey()
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(0)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>
as("streams-cell-tp-winagg").withKeySerde(Serdes.String())
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
중요 요지는 count(), reduce(), aggregation() 메서드에 Materialized를 지정하고 반드시 WindowStore로 지정하는 것이다.
이 내용은 현재 issues.apache.org/jira/browse/KAFKA-9259 에서 해결하려고 하고 있는데, 2.1.0 버전에서 나온 버그를 아직 안고치고 있다. 아마도 우선순위가 낮아서 밀리는것 같다.
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 스트림즈 suppress() 사용할 때 ClassCastException 문제 (0) | 2021.04.06 |
---|---|
Kafka streams 에러 : Size of data received by LongDeserializer is not 8 (0) | 2021.04.06 |
카프카 스트림즈 애플리케이션 초기화 명령 (0) | 2021.03.31 |
local kafka single broker 띄우기 with 도커 (0) | 2021.03.05 |
특정 시점(날짜+시간)의 레코드부터 가져오도록 설정하기. (0) | 2021.03.03 |
아파치 카프카를 데이터 레이크로 사용할 수 있을까? (0) | 2021.02.25 |
macOS에서 카프카 버로우 빌드 및 실행하기. (0) | 2020.11.29 |