스프링 카프카에서 메시지 리스너를 구현하는 2가지 방법이 있습니다. 첫번째는 @KafkaListener를 사용하는것이고 두번째는 listenerContainer를 Bean등록하는 것입니다.
@KafkaListener로 구현
가장 간단한 방법입니다. @KafkaListener를 통해 선언할 경우 파라미터를 오버로딩해서 알맞는 listenerContainer를 자동으로 주입합니다.
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication
public class Application {
	private final Logger logger = LoggerFactory.getLogger(Application.class);
	
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args).close();
	}
	@KafkaListener(topics = "test")
	public void messageListener(String in) {
		logger.info("MyMessageListener : "+in);
	}
}
lisetenerContainer Bean 등록
자신이 사용하고자 하는 Container를 선언하고 Bean을 등록합니다.
package com.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class CustomKafkaListener {
	private final Logger logger = LoggerFactory.getLogger(CustomKafkaListener.class);
	private Map<String, Object> consumerProperties(){
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
		return props;
	}
	@Bean
	public KafkaMessageListenerContainer<String, String> messageListenerContainer() {
		ContainerProperties containerProperties = new ContainerProperties("test");
		containerProperties.setMessageListener(new MyMessageListener());
		ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
		KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
		listenerContainer.setAutoStartup(false);
		listenerContainer.setBeanName("kafka-message-listener");
		return listenerContainer;
	}
	class MyMessageListener implements MessageListener<String, String> {
		@Override
		public void onMessage(ConsumerRecord<String, String> data) {
			logger.info("MyMessageListener : "+data.value());
		}
	}
}
사용시에는 등록한 Bean을 Autowired로 가져와서 사용해야 합니다.
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.listener.*;
@SpringBootApplication
public class Application {
	@Qualifier
	public final KafkaMessageListenerContainer<String, String> listenerContainer;
	@Autowired
	public Application(KafkaMessageListenerContainer<String, String> listenerContainer) {
		this.listenerContainer = listenerContainer;
		listenerContainer.start();
	}
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args).close();
	}
}
반응형
    
    
    
  '빅데이터 > Kafka' 카테고리의 다른 글
| Reactive를 품은 스프링 카프카 시청 정리 자료 (1) | 2020.09.26 | 
|---|---|
| 카프카 커넥트 JMX + 로그스태시로 모니터링 하기 (0) | 2020.09.24 | 
| kafka connect 로그를 logstash로 수집하기 + grok 설정(multiline) (0) | 2020.09.23 | 
| 카프카 컨슈머 파티셔너 종류 및 정리(2.5.0 기준) (1) | 2020.07.23 | 
| 카프카 프로듀서 파티셔너 종류 및 정리(2.5.0 기준) (1) | 2020.07.23 | 
| 컨슈머 스레드가 많다고 처리량이 높을까? 아닐까? 컨텍스트 스위칭으로 인한 예외 상황 (0) | 2020.07.22 |