스프링 카프카는 카프카를 스프링에서 쉽게 사용할 수 있도록 하는 라이브러리입니다. 스프링 카프카를 통해 컨슈머를 만드는 가장 간단한 코드를 공유합니다.
준비물
- 그래들
- 스프링부트
- 인텔리제이
디렉토리구조
├── build.gradle
├── settings.gradle
└── src
└── main
├── java
│ └── com
│ └── test
│ └── Main.java
└── resources
└── application.yaml
build.gradle
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
compile 'org.springframework.kafka:spring-kafka:2.3.7.RELEASE'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.3.4.RELEASE'
}
Main.java
package com.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
@KafkaListener(topics = "test")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(partition + " -> " + message);
}
}
test 토픽을 메시지 키와 파티션을 같이 출력하는 애플리케이션.
application.yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: myGroup
스프링 카프카는 yaml을 통해 카프카 설정을 입력할 수 있다.
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256 (0) | 2020.10.23 |
---|---|
스프링 카프카 호환표 (1) | 2020.10.16 |
토픽의 메시지 값을 직렬화/역직렬화가 정상적으로 이루어지지 않는 경우 테스트 (0) | 2020.10.15 |
Reactive를 품은 스프링 카프카 시청 정리 자료 (1) | 2020.09.26 |
카프카 커넥트 JMX + 로그스태시로 모니터링 하기 (0) | 2020.09.24 |
kafka connect 로그를 logstash로 수집하기 + grok 설정(multiline) (0) | 2020.09.23 |