본문 바로가기

빅데이터/Kafka

카프카 스트림즈로 schedule operation 수행하기(번역)

반응형

원문 : https://kafka-tutorials.confluent.io/kafka-streams-schedule-operations/kstreams.html

 

카프카 스트림즈는 토픽의 데이터를 읽어 상태기반, 비상태기반 처리를 하는 스트리밍 라이브러리입니다. 오늘은 컨플루언트에서 카프카 스트림즈가 스케쥴링 동작을 어떻게 수행하는지에 대한 코드 예시를 번역하였습니다.


1. 프로젝트 초기화

신규 디렉토리를 생성합니다.

$ mkdir kafka-streams-schedule-operations && cd kafka-streams-schedule-operations

2. 컨플루언트 플랫폼 가져오기

Dockerfile을 통해 데이터 생성기를 먼저 가져옵니다. 하기 Dockerfile은 Dockerfile-connect 이름으로 신규 생성합니다.

FROM confluentinc/cp-kafka-connect-base:6.1.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0

다음으로 컨플루언트 플랫폼(카프카 엔터프라이즈 배포 버전)을 가져오기 위해 docker-compose.yaml을 생성합니다. 내부 내용은 다음과 같습니다.

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.1.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.1.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:6.1.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

  connect:
    image: localimage/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile-connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    volumes:
      - ./datagen-logintime.avsc:/tmp/datagen-logintime.avsc
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"

다음으로 도커파일들을 빌드하고 시작합니다. 

$ docker-compose up -d --build

3. 프로젝트 설정

자바 애플리케이션을 만들기 위해 build.gradle을 생성하고 아래와 같이 설정합니다.

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.22.0"
        classpath "com.github.jengelman.gradle.plugins:shadow:6.0.0"
    }
}

plugins {
    id "java"
    id "com.google.cloud.tools.jib" version "2.8.0"
    id "idea"
    id "eclipse"
}

sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1"

repositories {
    mavenCentral()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.commercehub.gradle.plugin.avro"
apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.apache.avro:avro:1.10.2"
    implementation "org.slf4j:slf4j-simple:1.7.30"
    implementation "org.apache.kafka:kafka-streams:2.7.0"
    implementation('io.confluent:kafka-streams-avro-serde:6.1.1') {
        // this transitive dependency exclusion
        // is needed as kafka-clients:6.1.0-ccs has a version
        // of FixedOrderMap which throws an exception from
        // the clear method so we need AK kafka-clients
        exclude group:'org.apache.kafka', module: 'kafka-clients'
    }
    implementation "org.apache.kafka:kafka-clients:2.7.0"
    testImplementation "org.apache.kafka:kafka-streams-test-utils:2.7.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.KafkaStreamsPunctuation"
    )
  }
}

shadowJar {
    archiveBaseName = "kafka-streams-schedule-operations-standalone"
    archiveClassifier = ''
}

gradle wrapper을 실행합니다.

$ gradle wrapper

다음으로 설정파일을 등록합니다. 설정파일은 configuration 디렉토리에 위치합니다.

$ mkdir configuration

내부에는 dev.properties 파일을 생성합니다. 내부 코드는 다음과 같습니다.

application.id=kafka-streams-schedule-operations
bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081

input.topic.name=login-events
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

4. 모델의 스키마 설정

avro 스키마적용을 위해 디렉토리를 생성합니다.

$ mkdir -p src/main/avro

디렉토리 내부에는 logintime.avsc 파일을 생성하고 LoginTime객체에 대한 스키마를 다음과 같이 등록합니다.

{
  "namespace": "io.confluent.developer.avro",
  "type": "record",
  "name": "LoginTime",
  "fields": [
    {"name": "logintime", "type": "long" },
    {"name": "userid", "type": "string" },
    {"name": "appid", "type": "string" }
  ]
}

그레이들을 사용하여 avro plugin을 등록합니다.

$ ./gradlew build

5. 카프카 스트림즈 토폴로지 설정

다음은 가장 중요한 스트림 토폴로지 설정입니다. 자바 파일을 만들기 위해 아래와 같이 디렉토리를 생성합니다.

$ mkdir -p src/main/java/io/confluent/developer

다음으로 스트림즈 애플리케이션에서 동작할 코드를 넣습니다. 이번 예제에서는 key-value 짝으로 이루어진 레코드 데이터를 상태 저장소에 저장하고 일정 시간이 지난 이후에 비즈니스 로직을 태우는 로직을 적용합니다. 카프카 스트림즈에서는 일정한 시간 간격으로 코드를 수행할 수 있는 ProcessorContext.schedule 메서드를 제공합니다.

 

ProcessorContext를 사용하기 위해서는 프로세서API를 사용해야 합니다. 카프카 스트림즈는 스트림즈DSL과 프로세서API를 제공하는데, 프로세서API를 사용하면 스트림즈DSL에서 제공하지 않는 기능들을 구현할 수 있습니다. 그 중 하나가 이런 스케쥴링 동작이라 볼 수 있습니다. 스트림즈DSL과 프로세서API를 결합하여 동작하기 위해서 KStreams.transform 메서드를 사용합니다.

KStream.transform은 메시지 키를 활용할 수 있습니다. 즉, KStream에서 리파티션 과정이 필요할 수 있다는 뜻입니다. 그러나 리파티션은 오직 join 또는 aggregation을 수행할 때만 필요합니다. 이번 예제에서는 ProcessorContext.forward 메서드를 사용할 예정입니다. 추가로, join 또는 aggregation이 하지 않는다면 라피타니셔닝은 반드시 필요한 것은 아닙니다. 

이제 애플리케이션의 일부 중요사항을 살펴보겠습니다.

 

사용자가 얼마나 오랫동안 로그인해서 머물렀는지 알고 싶은 것이 첫번째입니다. 그래서 매 5초마다 사용자들의 데이터를 확인하여 가장 긴 로그인 시간을 추출하는 것입니다. 이 작업을 위해서는 유저의 데이터와 로그인 시간을 상태 저장소에 저장해야 합니다. 추가로 20초가 지나고 나면 누적 시간을 초기화하는 코드를 넣었습니다.

 

Transformer을 사용하는 카프카 스트림즈 애플리케이션

final KStream<String, LoginTime> loginTimeStream = builder.stream(loginTimeInputTopic, Consumed.with(Serdes.String(), loginTimeSerde));
loginTimeStream.transform(getTransformerSupplier(loginTimeStore), Named.as("max-login-time-transformer"),loginTimeStore) 
               .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));


private TransformerSupplier<String, LoginTime, KeyValue<String, Long>> getTransformerSupplier(final String storeName) {
	    return () -> new Transformer<String, LoginTime, KeyValue<String, Long>>() { 
	        private KeyValueStore<String, Long> store;
	        private ProcessorContext context;
            @Override
            public void init(ProcessorContext context) { 
                   this.context = context;
                   store = (KeyValueStore<String, Long>) this.context.getStateStore(storeName);
                   this.context.schedule(Duration.ofSeconds(5), PunctuationType.STREAM_TIME, this::streamTimePunctuator); 
                   this.context.schedule(Duration.ofSeconds(20), PunctuationType.WALL_CLOCK_TIME, this::wallClockTimePunctuator); 
            }


@Override
public KeyValue<String, Long> transform(String key, LoginTime value) { 
       Long currentVT = store.putIfAbsent(key, value.getLogintime());
       if (currentVT != null) {
           store.put(key, currentVT + value.getLogintime());
       }
       return null;
}

Transformer 인터페이스로 구현한 클래스를 람다로 추가하여 생성하였습니다. 총 2개의 큰 메서드로 이루어져 있습니다. 첫번째는 init, 두번쨰는 transform 입니다. init 메서드에서는 키밸류스토어를 지정하고 스케쥴링 시간을 정합니다. 매 20초 마다 wallClockTimePunctuator 메서드를 실행하고 매 5초마다 스트림시간을 기준으로 streamTimePunctuator 메서드를 호출합니다.

 

transform 동작은 스트림으로부터 토픽의 데이터를 읽는 동작을 합니다. 여기서 가장 중요한 부분은 스케쥴링을 수행하는 schedule메서드를 호출하는 부분인데요. 2가지 스케쥴 타입이 있는 것을 확인할 수 있습니다. STREAM_TIME과 WALL_CLOCK_TIME입니다. 이 2개는 PunctuationType에 속한 2가지 타입입니다.

 

STREAM_TIME 타입은 레코드의 timestamp 시간을 기준으로 스케쥴링 됩니다. WALL_CLOCK_TIME 타입은 현재 스트림즈 애플리케이션의 시스템 시간에 따릅니다. 카프카 스트림즈에서 시간을 어떻게 처리하는지에 대한 사항은 이 링크를 통해 확인할 수 있습니다.

 

스케쥴링 처리에 대해 추가로 더 알아보겠습니다. 스케쥴링을 수행할 때 코드에서 필요한 것은 세가지 파라미터입니다.

 

1. 스케쥴링 인터벌 시간을 얼마나할 것인지에 대한 Duration 

2. 스트림 시간을 기준으로 할 것인지 월클락 시간을 기준으로 할 것인지에 대한 PunctuationType

3. Punctuator interface 인스턴스. 이 인스턴스는 람다식 또는 메서드 기반으로 생성할 수 있습니다.

 

그럼 구현한 코드를 살펴보겠습니다.

void wallClockTimePunctuator(Long timestamp){ 
    try (KeyValueIterator<String, Long> iterator = store.all()) {
        while (iterator.hasNext()) {
            KeyValue<String, Long> keyValue = iterator.next();
            store.put(keyValue.key, 0L);
        }
    }
    System.out.println("@" + new Date(timestamp) +" Reset all view-times to zero");
}

void streamTimePunctuator(Long timestamp) { 
    Long maxValue = Long.MIN_VALUE;
    String maxValueKey = "";
    try (KeyValueIterator<String, Long> iterator = store.all()) {
        while (iterator.hasNext()) {
            KeyValue<String, Long> keyValue = iterator.next();
            if (keyValue.value > maxValue) {
                maxValue = keyValue.value;
                maxValueKey = keyValue.key;
            }
        }
    }
    context.forward(maxValueKey +" @" + new Date(timestamp), maxValue); 
}

WallClockTimePunctuator는 매 20초마다 수행되고 streamTimePunctuator는 매 5초마다 수행됩니다. streamTimePunctuator에서 조건에 맞는 데이터는 forward 메서드를 통해 다음 토폴로지로 데이터가 넘어가게 됩니다.

 

전체 코드는 다음과 같습니다.

package io.confluent.developer;


import io.confluent.developer.avro.LoginTime;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class KafkaStreamsPunctuation {


	public Properties buildStreamsProperties(Properties envProps) {
        Properties props = new Properties();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));

        return props;
    }

    public Topology buildTopology(Properties envProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String loginTimeInputTopic = envProps.getProperty("input.topic.name");
        final String outputTopic = envProps.getProperty("output.topic.name");
        final String loginTimeStore = "logintime-store";
        final Serde<LoginTime> loginTimeSerde = getSpecificAvroSerde(envProps);
        StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(loginTimeStore),Serdes.String(), Serdes.Long());
        builder.addStateStore(storeBuilder);
        final KStream<String, LoginTime> loginTimeStream = builder.stream(loginTimeInputTopic, Consumed.with(Serdes.String(), loginTimeSerde));

        loginTimeStream.transform(getTransformerSupplier(loginTimeStore), Named.as("max-login-time-transformer"),loginTimeStore)
                      .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();
    }


    private TransformerSupplier<String, LoginTime, KeyValue<String, Long>> getTransformerSupplier(final String storeName) {
	    return () -> new Transformer<String, LoginTime, KeyValue<String, Long>>() {
	        private KeyValueStore<String, Long> store;
	        private ProcessorContext context;
            @Override
            public void init(ProcessorContext context) {
                   this.context = context;
                   store = (KeyValueStore<String, Long>) this.context.getStateStore(storeName);
                   this.context.schedule(Duration.ofSeconds(5), PunctuationType.STREAM_TIME, this::streamTimePunctuator);
                   this.context.schedule(Duration.ofSeconds(20), PunctuationType.WALL_CLOCK_TIME, this::wallClockTimePunctuator);
            }

            void wallClockTimePunctuator(Long timestamp){
                try (KeyValueIterator<String, Long> iterator = store.all()) {
                    while (iterator.hasNext()) {
                        KeyValue<String, Long> keyValue = iterator.next();
                        store.put(keyValue.key, 0L);
                    }
                }
                System.out.println("@" + new Date(timestamp) +" Reset all view-times to zero");
            }

            void streamTimePunctuator(Long timestamp) {
                Long maxValue = Long.MIN_VALUE;
                String maxValueKey = "";
                try (KeyValueIterator<String, Long> iterator = store.all()) {
                    while (iterator.hasNext()) {
                        KeyValue<String, Long> keyValue = iterator.next();
                        if (keyValue.value > maxValue) {
                            maxValue = keyValue.value;
                            maxValueKey = keyValue.key;
                        }
                    }
                }
                context.forward(maxValueKey +" @" + new Date(timestamp), maxValue);
            }

            @Override
            public KeyValue<String, Long> transform(String key, LoginTime value) {
                   Long currentVT = store.putIfAbsent(key, value.getLogintime());
                   if (currentVT != null) {
                       store.put(key, currentVT + value.getLogintime());
                   }
                   return null;
            }

            @Override
            public void close() {

            }
        };
    }



    static <T extends SpecificRecord> SpecificAvroSerde<T> getSpecificAvroSerde(final Properties envProps) {
        final SpecificAvroSerde<T> specificAvroSerde = new SpecificAvroSerde<>();

        final HashMap<String, String> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                envProps.getProperty("schema.registry.url"));

        specificAvroSerde.configure(serdeConfig, false);
        return specificAvroSerde;
    }

    public void createTopics(final Properties envProps) {
        final Map<String, Object> config = new HashMap<>();
        config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
        try (final AdminClient client = AdminClient.create(config)) {

        final List<NewTopic> topics = new ArrayList<>();

            topics.add(new NewTopic(
                    envProps.getProperty("output.topic.name"),
                    Integer.parseInt(envProps.getProperty("output.topic.partitions")),
                    Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));

            client.createTopics(topics);
        }
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        final Properties envProps = new Properties();
        final FileInputStream input = new FileInputStream(fileName);
        envProps.load(input);
        input.close();

        return envProps;
    }

    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        final KafkaStreamsPunctuation instance = new KafkaStreamsPunctuation();
        final Properties envProps = instance.loadEnvProperties(args[0]);
        final Properties streamProps = instance.buildStreamsProperties(envProps);
        final Topology topology = instance.buildTopology(envProps);

        instance.createTopics(envProps);

        final KafkaStreams streams = new KafkaStreams(topology, streamProps);
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

}

6. 카프카 스트림즈 애플리케이션을 위한 데이터 생성

카프카 스트림즈 애플리케이션을 실행하기 전에 데이터를 우선 넣어야합니다. DatagenConnector를 사용하면 HTTP put 메서드를 통해 데이터를 쉽게 넣을 수 있스비다.

 

우선 스키마 정의를 위해 현재 실행중인 디렉토리에 datagen-logintime.avsc 파일을 생성합니다.

{
  "namespace": "io.confluent.developer.avro",
  "type": "record",
  "name": "LoginTime",
  "fields": [
    {"name": "logintime", "type": {
      "type": "long",
      "format_as_time" : "unix_long",
      "arg.properties": {
        "iteration": { "start": 1, "step": 100}
      }
    }},
    {"name": "userid", "type": {
      "type": "string",
      "arg.properties": {
        "regex": "User_[1-9]{0,1}"
      }
    }},
    {"name": "appid", "type": {
      "type": "string",
      "arg.properties": {
        "regex": "App[1-9][0-9]?"
      }
    }}
  ]
}

기존에 만들었던 스키마와 유사하지만 데이터 생성을 위한 구문이 추가되어 있습니다. kafka-connect-datagen 커넥터avro random generator를 사용하여 데이터를 생성하기 때문입니다.

 

이제, 터미널을 열고 데이터 제네레이터 커넥터를 동작시킵니다.

$ curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "kafka.topic": "login-events",
            "schema.filename": "/tmp/datagen-logintime.avsc",
            "schema.keyfield": "userid",
            "max.interval": 1000,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

동작이 잘 수행되었다면 다음과 같은 출력이 나타나게 됩니다.

HTTP/1.1 200 OK
Date: Thu, 20 Aug 2020 20:15:22 GMT
Content-Type: application/json
Content-Length: 441
Server: Jetty(9.4.24.v20191120)

{"name":"datagen_local_01","config":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","kafka.topic":"login-events","schema.filename":"/schemas/datagen-logintime.avsc","schema.keyfield":"userid","max.interval":"1000","iterations":"10000000","tasks.max":"1","name":"datagen_local_01"},"tasks":[{"connector":"datagen_local_01","task":0}],"type":"source"}

7. 컴파일 그리고 실행

이제 남은 작업은 애플리케이션을 실행하는 것입니다. shadowJar을 통해 디펜던시가 추가된 jar를 생성합니다.

$ ./gradlew shadowJar

카프카 스트림즈는 다음과 같이 로컬에서 실행할 수 있습니다. 

$ java -jar build/libs/kafka-streams-schedule-operations-standalone-0.0.1.jar configuration/dev.properties

8. 생성된 토픽 읽어보기

이제 스트림즈 애플리케이션을 실행시키고 console-consumer로 데이터를 읽어봅니다.

$ docker-compose exec broker kafka-console-consumer \
 --bootstrap-server broker:9092 \
 --topic output-topic \
 --property print.key=true \
 --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer" \
 --property key.separator=" : "  \
 --from-beginning \
 --max-messages 10

결과는 다음과 같습니다.

User_6 @Thu Aug 20 16:30:33 EDT 2020 : 1
User_9 @Thu Aug 20 16:30:35 EDT 2020 : 601
User_9 @Thu Aug 20 16:30:40 EDT 2020 : 2903
User_4 @Thu Aug 20 16:30:45 EDT 2020 : 5904
User_3 @Thu Aug 20 16:30:50 EDT 2020 : 13305
User_8 @Thu Aug 20 16:30:55 EDT 2020 : 28909
User_9 @Thu Aug 20 16:31:00 EDT 2020 : 18303
User_9 @Thu Aug 20 16:31:05 EDT 2020 : 24804
User_9 @Thu Aug 20 16:31:10 EDT 2020 : 32205
User_9 @Thu Aug 20 16:31:15 EDT 2020 : 58108

여기까지 카프카 스트림즈의 상태기반 스케쥴링 처리를 수행하는 예제 코드를 확인해 보았습니다. 이처럼 카프카 스트림즈는 스트림 데이터를 잘 활용할 수 있도록 많은 기능이 추가되어 있습니다. 스트림 데이터를 활용하는 조직이라면 스트림즈를 사용하여 스트림 데이터의 활용도를 높이는 방법을 연구해 보는 것도 좋을것 같습니다.

반응형
  • 익명 2021.10.05 16:56 댓글주소 수정/삭제 댓글쓰기

    비밀댓글입니다

    • 글 잘 봐주셔서 감사합니다^^
      1) 커넥터를 특정 서버에 설치해서 카프카의 로그를 fileSourceConnector로 가져갈수도 있습니다. 이 경우 단일모드 커넥트를 사용하는 것도 좋은 방법이겠네요.
      2) 데이터를 종류별로 일자별로 파일로 write하는 커넥터를 java로 만드신다면 가능합니다.
      3) 파일 write방법도 커넥터 개발시 적용하는 방법에 따라 다르게 가져갈 수 있을것 같습니다.