카프카 스트림즈에는 suppress()메서드가 있다. 이 메서드를 통해 일정 윈도우 간격 안에 데이터를 모아서 처리할 수 있는데, 이슈가 있다. 값을 모아서 state store(rocksDB)에 저장할 때 제대로 deserialize를 하지못하는것이다.
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)이 에러를 해결하기 위해 아래와 같은 방법으로 임시로 해결할 수 있다.
KTable<Windowed<String>, Long> KT01 = myStream
        .groupByKey()
        .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(0)))
        .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>
                as("streams-cell-tp-winagg").withKeySerde(Serdes.String())
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));중요 요지는 count(), reduce(), aggregation() 메서드에 Materialized를 지정하고 반드시 WindowStore로 지정하는 것이다.
이 내용은 현재 issues.apache.org/jira/browse/KAFKA-9259 에서 해결하려고 하고 있는데, 2.1.0 버전에서 나온 버그를 아직 안고치고 있다. 아마도 우선순위가 낮아서 밀리는것 같다.
반응형
    
    
    
  '빅데이터 > Kafka' 카테고리의 다른 글
| 레디슈 큐(queue), 레디스 스트림(streams), 레디스 펍섭(pub/sub) 그리고 카프카와 비교 (0) | 2021.04.25 | 
|---|---|
| 카프카 스트림즈의 commit.interval.ms옵션 (2) | 2021.04.15 | 
| 초~중급자를 위한 [아파치 카프카 애플리케이션 개발] 서적을 출간하였습니다. (12) | 2021.04.11 | 
| Kafka streams 에러 : Size of data received by LongDeserializer is not 8 (0) | 2021.04.06 | 
| 카프카 스트림즈 애플리케이션 초기화 명령 (0) | 2021.03.31 | 
| local kafka single broker 띄우기 with 도커 (0) | 2021.03.05 | 
 
										
									 
										
									 
										
									