Kafka-client library를 사용하여 JVM위에 올라가는 consumer/producer를 작성할 수 있습니다. 이번 포스팅에서는 scala로 Kafka consumer를 멀티쓰레드로 실행하는 애플리케이션 예제 코드를 공유, 설명 드리겠습니다.
제약조건
- Kafka consumer
- Multi thread(2개 이상) 지원
- Scala
코드
Scala를 실행하는 멀티쓰레드 카프카 컨슈머 애플리케이션의 파일은 크게 4개로 나뉘어져 있습니다. 먼저, Scala application을 실행하는 Main.scala와 실제로 Consumer역할을 하게 되는 Runnable Thread인 ConsumerWorker.scala, Consumer의 상태를 기록할 ConsumerStatus.scala 마지막으로 디펜던시를 관리하는 build.sbt 입니다.
build.sbt
name := "kafka-multi-thread-consumer"
version := "0.1"
scalaVersion := "2.13.1"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "2.3.1"
)
build.sbt를 통해 library dependency를 정의합니다.
ConsumerStatus.scala
object ConsumerStatus {
var isRunning = true
var consumerInfo = scala.collection.mutable.Set[String]()
}
object class인 ConsumerStatus는 Singleton class처럼 1개의 상태만 존재할 수 있습니다. isRunning 변수를 통해 consumer polling을 종료하는데 사용됩니다.
ConsumerWorker.scala
class ConsumerWorker(val props: Properties, val topic: String, val id: Int) extends Runnable {
@Override
override def run(): Unit = {
val threadName = "Thread" + id
ConsumerStatus.consumerInfo += threadName
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
consumer.subscribe(util.Collections.singletonList(topic))
while (ConsumerStatus.isRunning) {
val records = consumer.poll(Duration.ofSeconds(1))
records.forEach(item => {
println(threadName + " " + new String(item.value()))
})
}
consumer.close()
println("Shutdown Thread" + id)
ConsumerStatus.consumerInfo -= threadName
}
}
실제로 polling하고 데이터를 처리하는데 사용되는 구문입니다. Runnable class로 정의된 ConsumerWorker는 지속적으로 polling하여 데이터를 처리(print)하며, Status상태에 따라 consumer를 종료합니다.
Main.scala
object Main extends App {
// Define configurations for consume
val threadCount = 2
val topic: String = "test-topic"
val consumerId: String = "test-consumer"
val properties = new Properties()
properties.put("group.id", consumerId)
properties.put("bootstrap.servers", "localhost:9092")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
val executor = Executors.newCachedThreadPool()
// Submit n thread kafka consumers
for (threadNo <- 0 until threadCount) {
executor.submit(new ConsumerWorker(properties, topic, threadNo))
}
// Shutdown hook makes graceful shutdown
sys.ShutdownHookThread {
ConsumerStatus.isRunning = false
while (ConsumerStatus.consumerInfo.nonEmpty){
Thread.sleep(1000)
}
println("Shutdown application")
}
}
실제로 application이 실행되는 Main object는 consumer의 각종 configuration을 정의, Thread 생성 그리고 shutdown hook을 통해 consumer의 정상적인 종료를 수행합니다.
특이사항
1) newCachedThreadPool 사용
newCachedThreadPool은 처리할 스레드를 추가하면 스레드를 증가하여 생성하는 thread pool입니다. 최대 Integer.MAX까지 생성할 수 있으며 Thread의 작업이 종료되면 Thread는 종료됩니다.
newCachedThreadPool을 통해 시작되는 ConsumerWorker는 내부에서 무한루프로 consumer.poll()을 통해 데이터를 지속적으로 들고옵니다. 만약 ConsumerStatus.isRunning이 false가 발생하게 되면 모든 작업(println)을 처리한 뒤에 안전하게 close()까지 수행하여 종료하게 됩니다.
wakeup Exception 등을 통해 처리할 수 도 있지만, 위와 같은 방식도 훌륭히 종료 처리 가능합니다.
2) shutdownHook을 통한 consumer close 처리
application종료시 안전하게 자원을 종료할 의무가 있습니다. 이러한 상황에서 shutdown hook으로 신호를 받아 각 consumer thread를 안전하게 종료할 수 있습니다.
전체 소스코드
github repository에서 전체 소스코드를 다운받을 수 있습니다.
https://github.com/AndersonChoi/scala-kafka-multi-thread-consumer
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 클러스터 클러스터ip DNS 연동방법. use_all_dns_ips 사용(in AWS, route53) (0) | 2020.03.20 |
---|---|
AWS에 카프카 클러스터 설치하기(ec2, 3 brokers) (1) | 2020.03.18 |
카프카 버로우 = consumer lag 모니터링 오픈소스 애플리케이션 (0) | 2020.03.07 |
카프카 auto.offset.reset 종류 및 사용방법 (0) | 2020.02.06 |
카프카 장애대응 - Consumer offset 지정하기(by partition) (2) | 2020.01.31 |
Kafka burrow 모니터링 하지 않는 consumer group 수동제거방법 (0) | 2020.01.15 |