스프링 카프카에서 메시지 리스너를 구현하는 2가지 방법이 있습니다. 첫번째는 @KafkaListener를 사용하는것이고 두번째는 listenerContainer를 Bean등록하는 것입니다.
@KafkaListener로 구현
가장 간단한 방법입니다. @KafkaListener를 통해 선언할 경우 파라미터를 오버로딩해서 알맞는 listenerContainer를 자동으로 주입합니다.
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@KafkaListener(topics = "test")
public void messageListener(String in) {
logger.info("MyMessageListener : "+in);
}
}
lisetenerContainer Bean 등록
자신이 사용하고자 하는 Container를 선언하고 Bean을 등록합니다.
package com.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class CustomKafkaListener {
private final Logger logger = LoggerFactory.getLogger(CustomKafkaListener.class);
private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
return props;
}
@Bean
public KafkaMessageListenerContainer<String, String> messageListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties("test");
containerProperties.setMessageListener(new MyMessageListener());
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
listenerContainer.setAutoStartup(false);
listenerContainer.setBeanName("kafka-message-listener");
return listenerContainer;
}
class MyMessageListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
logger.info("MyMessageListener : "+data.value());
}
}
}
사용시에는 등록한 Bean을 Autowired로 가져와서 사용해야 합니다.
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.listener.*;
@SpringBootApplication
public class Application {
@Qualifier
public final KafkaMessageListenerContainer<String, String> listenerContainer;
@Autowired
public Application(KafkaMessageListenerContainer<String, String> listenerContainer) {
this.listenerContainer = listenerContainer;
listenerContainer.start();
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
}
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
Reactive를 품은 스프링 카프카 시청 정리 자료 (1) | 2020.09.26 |
---|---|
카프카 커넥트 JMX + 로그스태시로 모니터링 하기 (0) | 2020.09.24 |
kafka connect 로그를 logstash로 수집하기 + grok 설정(multiline) (0) | 2020.09.23 |
카프카 컨슈머 파티셔너 종류 및 정리(2.5.0 기준) (1) | 2020.07.23 |
카프카 프로듀서 파티셔너 종류 및 정리(2.5.0 기준) (1) | 2020.07.23 |
컨슈머 스레드가 많다고 처리량이 높을까? 아닐까? 컨텍스트 스위칭으로 인한 예외 상황 (0) | 2020.07.22 |