본문 바로가기

빅데이터/Kafka

스프링 카프카 메시지 리스너 2가지 구현 방법

스프링 카프카에서 메시지 리스너를 구현하는 2가지 방법이 있습니다. 첫번째는 @KafkaListener를 사용하는것이고 두번째는 listenerContainer를 Bean등록하는 것입니다.

 

@KafkaListener로 구현

가장 간단한 방법입니다. @KafkaListener를 통해 선언할 경우 파라미터를 오버로딩해서 알맞는 listenerContainer를 자동으로 주입합니다.

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication
public class Application {

	private final Logger logger = LoggerFactory.getLogger(Application.class);
	
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args).close();
	}

	@KafkaListener(topics = "test")
	public void messageListener(String in) {
		logger.info("MyMessageListener : "+in);
	}

}

 

lisetenerContainer Bean 등록

자신이 사용하고자 하는 Container를 선언하고 Bean을 등록합니다.

package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomKafkaListener {

	private final Logger logger = LoggerFactory.getLogger(CustomKafkaListener.class);

	private Map<String, Object> consumerProperties(){
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
		return props;
	}

	@Bean
	public KafkaMessageListenerContainer<String, String> messageListenerContainer() {

		ContainerProperties containerProperties = new ContainerProperties("test");
		containerProperties.setMessageListener(new MyMessageListener());

		ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
		KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
		listenerContainer.setAutoStartup(false);
		listenerContainer.setBeanName("kafka-message-listener");
		return listenerContainer;
	}


	class MyMessageListener implements MessageListener<String, String> {
		@Override
		public void onMessage(ConsumerRecord<String, String> data) {
			logger.info("MyMessageListener : "+data.value());
		}
	}

}

사용시에는 등록한 Bean을 Autowired로 가져와서 사용해야 합니다.

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.listener.*;

@SpringBootApplication
public class Application {

	@Qualifier
	public final KafkaMessageListenerContainer<String, String> listenerContainer;

	@Autowired
	public Application(KafkaMessageListenerContainer<String, String> listenerContainer) {
		this.listenerContainer = listenerContainer;
		listenerContainer.start();
	}

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args).close();
	}
}