이번 포스트에서는 자바 멀티스레드 카프카 컨슈머 애플리케이션을 구현해보도록 하겠습니다.
준비물
- Intellij 또는 eclipse
- gradle project
- jdk 1.8
- 약 10분
구현방법
이전 포스트(카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala) 바로가기)에서 scala로 구현한 바가 있습니다. 이번에는 자바로 구현하고, consumer.wakeup()을 사용해서 consumer를 안전하게 해제시키는 것을 아래 코드에서 구현해보겠습니다. wakeup() 메서드는 java shutdown hook을 통해 호출하여 안전하게 종료되도록 설정해보겠습니다.
기존 구현방식 처럼 newCachedThreadPool 메서드를 사용해서 multiple thread를 생성해보겠습니다.
코드
필요한 코드는 3가지 입니다. 1) build.gradle 2) ConsumerWithMultiThread.java 3) ConsumerWorker.java 입니다.
- build.gradle : kafka-client dependency를 추가
- ConsumerWithMultiThread.java : 메인 스레드, thread 생성
- ConsumerWorker.java : Consumer thread 구현체
1) build.gradle
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}
2) ConsumerWithMultiThread.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerWithMultiThread {
private static String TOPIC_NAME = "test";
private static String GROUP_ID = "testgroup";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
private static int CONSUMER_COUNT = 3;
private static List<ConsumerWorker> workerThreads = new ArrayList<>();
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < CONSUMER_COUNT; i++) {
ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
workerThreads.add(worker);
executorService.execute(worker);
}
}
static class ShutdownThread extends Thread {
public void run() {
workerThreads.forEach(ConsumerWorker::shutdown);
System.out.println("Bye");
}
}
}
3) ConsumerWorker.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerWorker implements Runnable {
private Properties prop;
private String topic;
private String threadName;
private KafkaConsumer<String, String> consumer;
ConsumerWorker(Properties prop, String topic, int number) {
this.prop = prop;
this.topic = topic;
this.threadName = "consumer-thread-" + number;
}
@Override
public void run() {
consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(threadName + " >> " + record.value());
}
consumer.commitSync();
}
} catch (WakeupException e) {
System.out.println(threadName + " trigger WakeupException");
} finally {
System.out.println(threadName + " gracefully shutdown");
consumer.commitSync();
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
실행
Application run을 해보면 test 토픽을 3개의 consumer가 가져가는 것을 확인하실 수 있습니다.
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카의 토픽 데이터를 REST api로 주고받자 - Kafka rest proxy 사용 (0) | 2020.06.25 |
---|---|
AWS MSK 사용시 인스턴스 유형별 최대 토픽 개수 (0) | 2020.06.24 |
아파치 카프카 입문과 활용 강의자료 슬라이드 (2) | 2020.06.23 |
Kafka-client client.dns.lookup 옵션 정리 (0) | 2020.04.13 |
카프카 클러스터 클러스터ip DNS 연동방법. use_all_dns_ips 사용(in AWS, route53) (0) | 2020.03.20 |
AWS에 카프카 클러스터 설치하기(ec2, 3 brokers) (1) | 2020.03.18 |