본문 바로가기

빅데이터/Kafka

Spring boot scheduler를 활용한 kafka producer/consumer 예제

Kafka는 분산 메시징 플랫폼으로 폭넓은 확장성과 우수한 성능을 가진다. Kafka의 간단한 사용을 위해 Spring boot를 사용하여 consumer, producer개념을 익힐 수 있다. Spring boot의 scheduler기능을 통해서 producer가 kafka에 topic을 내려 주면, subscribe하고 있는 consumer가 해당 메시지를 받는 형태로 만들 것이다.

 

Architecture

Spring boot scheduler와 kafka의 연동 구성도

 

Kafka 설치

Kafka의 설치과정은 아래 posting에서 확인할 수 있다.

Macbook에 Kafka 1분만에 설치하기(바로가기)

 

 

Project directory

프로젝트 directory는 intellij의 spring boot default 설정을 따라간다. 

 

 

build.gradle

이번 포스팅에서는 gradle을 사용하여 spring boot를 구성하고자 한다. gradle에 대한 자세한 설명은 Gradle build tool 4.0 가이드(바로가기) 포스팅에서 확인 가능하다.

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.5.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

bootJar {
    baseName = 'gs-scheduling-tasks'
    version =  '0.1.0'
}

repositories {
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter")
    compile "org.springframework.kafka:spring-kafka:2.1.10.RELEASE"
    testCompile("org.springframework.boot:spring-boot-starter-test")
}

 

application.properties

spring-kafka 사용시 설정할 몇가지 부분에 대해서 셋팅한다. 여기서는 반드시 필요한 2가지 요소인 consumer.group-id와 server ip를 선언하였다.
 
application.properties 혹은 application.yaml을 통해서 spring에서 사용할 kafka설정을 자유자재로 설정이 가능한데 각각의 설정에 대한 설명은 Spring boot common-properties 공식사이트(바로가기) 에서 확인 가능하다.
 
spring.kafka.consumer.group-id=kafka-intro
spring.kafka.bootstrap-servers=localhost:9092

 

Application.java

scheduler처리를 위해 @EnableScheduling 을 추가.
package hello;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class);
    }
}

 

 

ScheduledTasks.java

kafka연동의 핵심적인 부분이다. 크게 두가지 역할을 하는 것으로 볼 수 있다.

1) 1000ms마다 producer는 "helloworld"+now Date() format의 데이터를 send.

2) consumer는 "test" topic에 들어오는 모든 메시지를 가져와서 log를 남김.

package hello;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class ScheduledTasks {

    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);

    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

    @Autowired
    private KafkaTemplate<string, string=""> kafkaTemplate;

    public void send(String topic, String payload) {
        kafkaTemplate.send(topic, payload);
        log.info("Message: " + payload + " sent to topic: " + topic);
    }


    @Scheduled(fixedRate = 1000)
    public void reportCurrentTime() {
        send("test", "helloworld " + dateFormat.format(new Date()));
    }

    @KafkaListener(topics = "test")
    public void receiveTopic1(ConsumerRecord consumerRecord) {
        log.info("Receiver on topic1: "+consumerRecord.toString());
    }
}
</string,>

 

 

결과물

Producer는 정상적으로 topic을 send하고, consumer도 정상적으로 topic을 receive하는 모습을 볼 수 있다.
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.5.RELEASE)

2018-11-06 20:37:52.755  INFO 58130 --- [           main] hello.Application                        : Starting Application on 1003855ui-MacBook-Pro.local with PID 58130
...
...
...
2018-11-06 20:37:53.925  INFO 58130 --- [pool-1-thread-1] hello.ScheduledTasks                     : Message: helloworld 20:37:53 sent to topic: test
2018-11-06 20:37:54.038  INFO 58130 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=kafka-intro] Successfully joined group with generation 1
2018-11-06 20:37:54.039  INFO 58130 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=kafka-intro] Setting newly assigned partitions [test-1, test-0, test-3, test-2, test-13, test-12, test-15, test-14, test-17, test-16, test-19, test-18, test-5, test-4, test-7, test-6, test-9, test-8, test-11, test-10]
2018-11-06 20:37:54.061  INFO 58130 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test-1, test-0, test-3, test-2, test-13, test-12, test-15, test-14, test-17, test-16, test-19, test-18, test-5, test-4, test-7, test-6, test-9, test-8, test-11, test-10]
2018-11-06 20:37:54.744  INFO 58130 --- [pool-1-thread-1] hello.ScheduledTasks                     : Message: helloworld 20:37:54 sent to topic: test
2018-11-06 20:37:54.769  INFO 58130 --- [ntainer#0-0-C-1] hello.ScheduledTasks                     : Receiver on topic1: ConsumerRecord(topic = test, partition = 5, offset = 4, CreateTime = 1541504274743, serialized key size = -1, serialized value size = 19, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld 20:37:54)

 

반응형