카프카 스트림즈의 KTable은 토픽의 데이터를 key-value형태로 사용할 수 있도록 구체화된 뷰(Materialized View)를 제공합니다. 구현방법은 다음과 같습니다.
0. 카프카 스트림즈 디펜던시 추가
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
compile 'org.slf4j:slf4j-simple:1.7.30'
implementation "org.apache.kafka:kafka-streams:2.8.0"
}
이 예제는 2.8.0을 기준으로 작성하였습니다.
1. 카프카 스트림즈 설정
Properties configs = new Properties();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-materialized-view");
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
configs.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
기본 스트림즈 설정
2. 스트림즈 토폴로지 설정
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table("address",Materialized.as("address"));
구체화된 뷰를 만들 토픽을 KTable로 선언합니다. 파라미터로 Materalized.as()도 추가합니다. 결과적으로 Materialized 인스턴스로 생성된 KeyValueStore가 로컬에 생성됩니다. 추가적으로 changelog도 생성됩니다.
3. 토폴로지 실행
KafkaStreams streams = new KafkaStreams(builder.build(), configs);
streams.start();
토폴로지와 설정을 입력합니다
4. 구체화된 뷰 가져오기
view = streams.store(StoreQueryParameters.fromNameAndType("address", QueryableStoreTypes.keyValueStore()));
여기서 주의해야할 점은 streams가 시작되고 난 이후에 일정시간이 지나야 Materialized View가 생성된다는 점입니다. 생성되고 난 이후에 streams.store()메서드를 호출해야 정상적으로 동작합니다. 그렇지 않을 경우에는 다음과 같은 에러가 발생합니다.
Exception in thread "Timer-0" org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store address because the stream thread is STARTING, not RUNNING
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
at com.example.stateful.MaterializedTable$ScheduledJob.run(MaterializedTable.java:60)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
상기 에러는 KTable로 선언한 Materialized View가 준비되지 않은 상태라는 뜻입니다.
5. keyValue스토어 가져오기
KeyValueIterator<String, String> address = view.all();
address.forEachRemaining(keyValue -> log.info(keyValue.toString()));
결과는 다음과 같이 key-value 형태로 출력됩니다.
[Timer-0] INFO com.example.stateful.MaterializedTable - KeyValue(somin, pusan)
[Timer-0] INFO com.example.stateful.MaterializedTable - KeyValue(cory, newyork)
필요시에는 get()메서드를 사용하여 hashMap의 value를 얻어오는 형태로도 사용할 수 있습니다.
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 스트림즈에서 SlidingWindow에 대한 고찰 (0) | 2021.06.25 |
---|---|
카프카 스트림즈 join 사용시 메시지 키 접근하기 (0) | 2021.06.21 |
Cannot get state store TOPIC because the stream thread is STARTING, not RUNNING 에러 해결 (0) | 2021.06.16 |
kafka spark structured stream 예제코드 및 실행 (0) | 2021.06.04 |
카프카 스트림즈 Exactly-once 설정하는 방법과 내부 동작 (0) | 2021.06.03 |
카프카 스트림즈 All stream threads have died. 오류 해결 방안 (0) | 2021.05.27 |