본문 바로가기

빅데이터/Kafka

자바 멀티스레드 카프카 컨슈머 애플리케이션 구현 코드

이번 포스트에서는 자바 멀티스레드 카프카 컨슈머 애플리케이션을 구현해보도록 하겠습니다.

 

준비물

- Intellij 또는 eclipse
- gradle project
- jdk 1.8
- 약 10분

구현방법

이전 포스트(카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala) 바로가기)에서 scala로 구현한 바가 있습니다. 이번에는 자바로 구현하고, consumer.wakeup()을 사용해서 consumer를 안전하게 해제시키는 것을 아래 코드에서 구현해보겠습니다. wakeup() 메서드는 java shutdown hook을 통해 호출하여 안전하게 종료되도록 설정해보겠습니다.

기존 구현방식 처럼 newCachedThreadPool 메서드를 사용해서 multiple thread를 생성해보겠습니다.

코드

필요한 코드는 3가지 입니다. 1) build.gradle 2) ConsumerWithMultiThread.java 3) ConsumerWorker.java 입니다.

- build.gradle : kafka-client dependency를 추가
- ConsumerWithMultiThread.java : 메인 스레드, thread 생성
- ConsumerWorker.java : Consumer thread 구현체

 

1) build.gradle

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

2) ConsumerWithMultiThread.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerWithMultiThread {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
    private static int CONSUMER_COUNT = 3;
    private static List<ConsumerWorker> workerThreads = new ArrayList<>();

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
            workerThreads.add(worker);
            executorService.execute(worker);
        }
    }

    static class ShutdownThread extends Thread {
        public void run() {
            workerThreads.forEach(ConsumerWorker::shutdown);
            System.out.println("Bye");
        }
    }
}

3) ConsumerWorker.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWorker implements Runnable {
    private Properties prop;
    private String topic;
    private String threadName;
    private KafkaConsumer<String, String> consumer;

    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }

    @Override
    public void run() {
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(threadName + " >> " + record.value());
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println(threadName + " trigger WakeupException");
        } finally {
            System.out.println(threadName + " gracefully shutdown");
            consumer.commitSync();
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }
}

실행

Application run을 해보면 test 토픽을 3개의 consumer가 가져가는 것을 확인하실 수 있습니다.