Kafka는 분산 메시징 플랫폼으로 폭넓은 확장성과 우수한 성능을 가진다. Kafka의 간단한 사용을 위해 Spring boot를 사용하여 consumer, producer개념을 익힐 수 있다. Spring boot의 scheduler기능을 통해서 producer가 kafka에 topic을 내려 주면, subscribe하고 있는 consumer가 해당 메시지를 받는 형태로 만들 것이다.
Architecture
Spring boot scheduler와 kafka의 연동 구성도
Kafka 설치
Kafka의 설치과정은 아래 posting에서 확인할 수 있다.
Macbook에 Kafka 1분만에 설치하기(바로가기)
Project directory
프로젝트 directory는 intellij의 spring boot default 설정을 따라간다.
build.gradle
이번 포스팅에서는 gradle을 사용하여 spring boot를 구성하고자 한다. gradle에 대한 자세한 설명은 Gradle build tool 4.0 가이드(바로가기) 포스팅에서 확인 가능하다.
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.5.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
bootJar {
baseName = 'gs-scheduling-tasks'
version = '0.1.0'
}
repositories {
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
compile("org.springframework.boot:spring-boot-starter")
compile "org.springframework.kafka:spring-kafka:2.1.10.RELEASE"
testCompile("org.springframework.boot:spring-boot-starter-test")
}
application.properties
spring-kafka 사용시 설정할 몇가지 부분에 대해서 셋팅한다. 여기서는 반드시 필요한 2가지 요소인 consumer.group-id와 server ip를 선언하였다.
application.properties 혹은 application.yaml을 통해서 spring에서 사용할 kafka설정을 자유자재로 설정이 가능한데 각각의 설정에 대한 설명은 Spring boot common-properties 공식사이트(바로가기) 에서 확인 가능하다.
spring.kafka.consumer.group-id=kafka-intro
spring.kafka.bootstrap-servers=localhost:9092
Application.java
scheduler처리를 위해 @EnableScheduling 을 추가.
package hello;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class);
}
}
ScheduledTasks.java
kafka연동의 핵심적인 부분이다. 크게 두가지 역할을 하는 것으로 볼 수 있다.
1) 1000ms마다 producer는 "helloworld"+now Date() format의 데이터를 send.
2) consumer는 "test" topic에 들어오는 모든 메시지를 가져와서 log를 남김.
package hello;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class ScheduledTasks {
private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
@Autowired
private KafkaTemplate<string, string=""> kafkaTemplate;
public void send(String topic, String payload) {
kafkaTemplate.send(topic, payload);
log.info("Message: " + payload + " sent to topic: " + topic);
}
@Scheduled(fixedRate = 1000)
public void reportCurrentTime() {
send("test", "helloworld " + dateFormat.format(new Date()));
}
@KafkaListener(topics = "test")
public void receiveTopic1(ConsumerRecord consumerRecord) {
log.info("Receiver on topic1: "+consumerRecord.toString());
}
}
</string,>
결과물
Producer는 정상적으로 topic을 send하고, consumer도 정상적으로 topic을 receive하는 모습을 볼 수 있다.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.5.RELEASE)
2018-11-06 20:37:52.755 INFO 58130 --- [ main] hello.Application : Starting Application on 1003855ui-MacBook-Pro.local with PID 58130
...
...
...
2018-11-06 20:37:53.925 INFO 58130 --- [pool-1-thread-1] hello.ScheduledTasks : Message: helloworld 20:37:53 sent to topic: test
2018-11-06 20:37:54.038 INFO 58130 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=kafka-intro] Successfully joined group with generation 1
2018-11-06 20:37:54.039 INFO 58130 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=kafka-intro] Setting newly assigned partitions [test-1, test-0, test-3, test-2, test-13, test-12, test-15, test-14, test-17, test-16, test-19, test-18, test-5, test-4, test-7, test-6, test-9, test-8, test-11, test-10]
2018-11-06 20:37:54.061 INFO 58130 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [test-1, test-0, test-3, test-2, test-13, test-12, test-15, test-14, test-17, test-16, test-19, test-18, test-5, test-4, test-7, test-6, test-9, test-8, test-11, test-10]
2018-11-06 20:37:54.744 INFO 58130 --- [pool-1-thread-1] hello.ScheduledTasks : Message: helloworld 20:37:54 sent to topic: test
2018-11-06 20:37:54.769 INFO 58130 --- [ntainer#0-0-C-1] hello.ScheduledTasks : Receiver on topic1: ConsumerRecord(topic = test, partition = 5, offset = 4, CreateTime = 1541504274743, serialized key size = -1, serialized value size = 19, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld 20:37:54)
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
아파치 Kafka Consumer의 데이터 처리 내부 architecture 설명 및 튜닝포인트 (378) | 2018.12.24 |
---|---|
아파치 Kafka Producer의 데이터 처리 내부 architecture 설명 및 튜닝포인트 (380) | 2018.12.24 |
빅 데이터 처리를 위한 아파치 Kafka 개요 및 설명 (265) | 2018.12.24 |
Spring boot에서 kafka 사용시 application.yaml 설정 (590) | 2018.11.06 |
Macbook에 Kafka 1분만에 설치하기 (703) | 2018.11.06 |
Kafka opensource 분석을 통한 replication assignment 로직 확인 (1096) | 2018.09.28 |