본문 바로가기

빅데이터/Kafka

카프카 스트림즈에서 stateful window 처리를 다루는 방법 그리고 커밋타이밍

반응형

카프카 스트림즈는 비상태기반(stateless), 상태기반(stateful) 처리를 지원하는 다양한 메서드를 제공합니다. 스트림즈DSL을 통해 구현할 수도 있고 또는 프로세서API를 사용해서 직접 구현하는 방식도 있습니다. 이 포스팅에서는 스트림즈DSL을 활용하여 상태기반 데이터 처리할 때 어떻게 input, output이 동작하는지 설명합니다.

 

카프카 스트림즈의 스트림즈DSL에서 window function은 4가지가 있습니다.

- 텀블링 윈도우

- 세션 윈도우

- 슬라이딩 윈도우

- 호핑 윈도우

 

이 중 가장 텀블링 윈도우에 대해 살펴볼건데요. 텀블링 윈도우는 특정 사이즈의 윈도우가 서로 다른 윈도우와 겹치지 않게 지속되는 시간 단위 윈도우를 뜻합니다. 다음은 텀블링 윈도우 예시 사진입니다.

https://docs.confluent.io/platform/current/streams/concepts.html#streams-concepts-windowing

레코드의 메시지 키에 따라서 5초동안 하나의 윈도우로 묶어 데이터를 처리하는 모습을 볼 수 있습니다. 

 

텀블링 윈도우를 구현하는 스트림즈DSL코드는 다음과 같습니다.

KStream<String, String> userClickStream = ..;

KTable<Windowed<String>, Long> userClickWindowTable = userClickStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
    .count();

사용자의 클릭을 스트림으로 받은 뒤 5초마다 텀블링 윈도우를 통해 데이터를 추출하는 코드입니다. 당연하게도 유저의 정보를 취합하기 위해서는 레코드의 메시지 키를 유저 정보(유저 키)로 사용해야 합니다. 우리가 생각대로 코드가 동작한다고 가정한다면 아래와 같은 input과 output이 나와야 합니다.

input
1초 : A
2초 : A
4초 : A,B
6초 : A
7초 : A
output
5초 : (A,3),(B,1)
10초 : (A,2)

5초마다 텀블링 윈도우를 돌렸고 유저 키 단위로 count를 추출했기 때문에 당연한 결과인데요. 하지만 실제로 코드를 실행해보면 영~ 다른 결과가 나옵니다. 다른 결과가 나오는 이유는 commit interval에 따라 다른데요. commit interval이 3초 인 경우에는 다음과 같은 output이 추출됩니다.

output
3초 : (A,2)
6초 : (A,3),(A,1),(B,1)
9초 : (A,2)

텀블링윈도우는 분명 서로 다른 윈도우를 침범하지 않는데 서로 다른 윈도우가 동시에 나타나는 현상이 나타나고 있습니다. 게다가 모든 값을 더해보면 A가 8번 호출되었다고 볼 수 있는데 사실 input을 보면 A는 5번 호출되었습니다. 어떻게 된 일일까요?

 

그 이유는 커밋이라는 독특한 스트림즈 만의 데이터 처리 방식 때문에 생기는 현상입니다. 사실 output의 Windowed의 내부를 살펴보면 다음과 같이 Window 처리 시간이 포함되어 있는 것을 확인할 수 있습니다.

output
3초 : (A,2 - 0초~5초)
6초 : (A,3 - 0초~5초), (A,1 - 6초~10초), (B,1 - 0초~5초)
9초 : (A,2 - 6초~10초)

눈치채신분도 계시겠지만 각 텀블링 윈도우는 겹치지 않았습니다. 다만, 동일한 윈도우의 결과가 커밋 타이밍때 한번 더 추출된 것을 볼 수 있습니다. 0~5초 사이에 A가 호출된 것을 집계하는 윈도우가 3초에 한번 6초에 한번 더 추출된 것을 볼 수 있습니다. 이것은 각 커밋타이밍 때 마다 메모리에 가지고 있던 윈도우의 계산 값을 나타내기 때문에 생기는 현상입니다. 그렇기 때문에 사용자가 원하는 값은 각 윈도우의 마지막 값을 따로 추출해야합니다. 진짜로 원하는 가장 마지막 윈도우 처리는 bold로 표시한 아래 데이터 일것입니다.

output
3초 : (A,2 - 0초~5초)
6초 : (A,3 - 0초~5초), (A,1 - 6초~10초), (B,1 - 0초~5초)
9초 : (A,2 - 6초~10초)

우리가 진짜로 원하는 데이터를 얻기 위해 어떻게 해야할까요? 방법은 두가지입니다.

 

방법1) 윈도우 타임을 포함한 데이터를 사용하여 idemoptence 동작하도록 설정

방법2) suppress() 사용

 

방법1의 경우 가장 쉬운 방법입니다. 동일 window시간이 포함된 데이터가 처음에 한번 그리고 나중에 한번 들어왔을 때 나중의 데이터를 넣는 방식입니다. 이 방식을 사용하면 몇번이고 데이터가 유입되더라도 가장 마지막의 window 연산 데이터가 추출되는 것이기 때문에 멱등성 동작을 만족하면서도 사용자가 원하는 window 데이터를 저장/사용 할 수 있습니다.

 

방법2는 suppress() 메서드를 사용하는 방식입니다. 윈도우 연산을 처리할 때 이러한 이슈는 카프카 내부에서도 논의된 모양입니다. 결과적으로 suppress() 메서드를 만들었고 2.1에서 release하였습니다. 커밋타이밍과 무관하게 윈도우가 끝날 때 또는 메모리에 데이터가 가득 찼을 때 마지막의 윈도우 연산을 추출하는 것인데요. 제가 2.8을 기준으로 테스트를 진행했을 때 생각대로 잘 동작하지는 않았습니다. 게다가 연산을 수행할 때 serde 에러가 지속적으로 발생해서 너무 힘들었는데요. 점차 개선되고나면 아마 쓸만해질 수 있지 않을까 생각되는데, 아직까지는 쓰지 못할것 같습니다.

 

여기까지 읽어주셔서 감사합니다.

반응형
  • 김태선 2021.09.23 15:27 댓글주소 수정/삭제 댓글쓰기

    안녕하세요 데브원영님 ~!
    카프카 스트림즈로 프로젝트를 설계하고있는 서버 개발자 입니다.
    카프카에 대해서 항상 좋은 내용 공유해주셔서 진심으로 감사합니다.

    카프카의 토픽 레코드들을 필터링하거나, 토픽 레코드 데이터를 분할하여 여러 토픽 레코드로 생성하는 스트림앱을 다수 개발하려고 하는데요. 아래 3가지 이슈가 있어서 질문드립니다.

    Q1. 카프카 스트림이 단순 라이브러리 이다보니 기본 자바 어플리케이션으로 개발하는 샘플을 많이 봤는데, 엔터프라이즈 레벨에서는 스프링 자바 어플리케이션으로 개발하는 경우도 있는지, 그냥 자바 어플리케이션으로 개발해도 충분한지 궁금합니다.

    Q2. 또한 카프카 커넥트는 Landoop UI를 통해 관리할 수 있는것으로 알고있는데
    카프카 스트림은 이러한 콘솔이나 관리툴 또는 모니터링 툴이 따로 있는지 문의드립니다.

    Q3. 카프카 스트림은 파티션 개수만큼 테스크가 생성되는것으로 알고있는데 최대 테스크 갯수는 몇개인지, 카프카 프로듀서나 컨슈머처럼 성능측정은 어떻게 할 수 있는지 궁금합니다.

    감사합니다!

    • 안녕하세요. 제 컨텐츠들 좋아해주셔서 감사합니다! 문의주신 내용 답변드립니다.
      Q1: 카프카 스트림즈는 라이브러리이기 때문에 기본 자바 애플리케이션으로 개발할 대도 있고 스프링을 활용할 때도 있습니다. 선택의 차이입니다.
      Q2: 스트림즈는 자체 모니터링 웹 툴은 없습니다. jmx로 수집하여 시각화 하는 편입니다.
      Q3: 최대 태스크 개수는 파티션 개수와 동일합니다. 그리고 성능측정은 jmx를 통해 input 데이터 측정으로 확인하실 수 있습니다.