본문 바로가기

빅데이터/Kafka

카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala)

Kafka-client library를 사용하여 JVM위에 올라가는 consumer/producer를 작성할 수 있습니다. 이번 포스팅에서는 scala로 Kafka consumer를 멀티쓰레드로 실행하는 애플리케이션 예제 코드를 공유, 설명 드리겠습니다.

 

제약조건

- Kafka consumer 

- Multi thread(2개 이상) 지원

- Scala

코드

Scala를 실행하는 멀티쓰레드 카프카 컨슈머 애플리케이션의 파일은 크게 4개로 나뉘어져 있습니다. 먼저, Scala application을 실행하는 Main.scala와 실제로 Consumer역할을 하게 되는 Runnable Thread인 ConsumerWorker.scala, Consumer의 상태를 기록할 ConsumerStatus.scala 마지막으로 디펜던시를 관리하는 build.sbt 입니다.

 

build.sbt

name := "kafka-multi-thread-consumer"
version := "0.1"
scalaVersion := "2.13.1"

libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka-clients" % "2.3.1"
)

build.sbt를 통해 library dependency를 정의합니다. 

 

ConsumerStatus.scala

object ConsumerStatus {
  var isRunning = true
  var consumerInfo = scala.collection.mutable.Set[String]()
}

object class인 ConsumerStatus는 Singleton class처럼 1개의 상태만 존재할 수 있습니다. isRunning 변수를 통해 consumer polling을 종료하는데 사용됩니다.

 

ConsumerWorker.scala

class ConsumerWorker(val props: Properties, val topic: String, val id: Int) extends Runnable {

  @Override
  override def run(): Unit = {
    val threadName = "Thread" + id
    ConsumerStatus.consumerInfo += threadName
    val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
    consumer.subscribe(util.Collections.singletonList(topic))
    while (ConsumerStatus.isRunning) {
      val records = consumer.poll(Duration.ofSeconds(1))
      records.forEach(item => {
        println(threadName + " " + new String(item.value()))
      })
    }
    consumer.close()
    println("Shutdown Thread" + id)
    ConsumerStatus.consumerInfo -= threadName
  }
}

실제로 polling하고 데이터를 처리하는데 사용되는 구문입니다. Runnable class로 정의된 ConsumerWorker는 지속적으로 polling하여 데이터를 처리(print)하며, Status상태에 따라 consumer를 종료합니다.

 

Main.scala

object Main extends App {

  // Define configurations for consume
  val threadCount = 2
  val topic: String = "test-topic"
  val consumerId: String = "test-consumer"
  val properties = new Properties()
  properties.put("group.id", consumerId)
  properties.put("bootstrap.servers", "localhost:9092")
  properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
  properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")

  val executor = Executors.newCachedThreadPool()
  // Submit n thread kafka consumers
  for (threadNo <- 0 until threadCount) {
    executor.submit(new ConsumerWorker(properties, topic, threadNo))
  }

  // Shutdown hook makes graceful shutdown
  sys.ShutdownHookThread {
    ConsumerStatus.isRunning = false
    while (ConsumerStatus.consumerInfo.nonEmpty){
      Thread.sleep(1000)
    }
    println("Shutdown application")
  }
}

실제로 application이 실행되는 Main object는 consumer의 각종 configuration을 정의, Thread 생성 그리고 shutdown hook을 통해 consumer의 정상적인 종료를 수행합니다.

 

특이사항

1) newCachedThreadPool 사용

newCachedThreadPool은 처리할 스레드를 추가하면 스레드를 증가하여 생성하는 thread pool입니다. 최대 Integer.MAX까지 생성할 수 있으며 Thread의 작업이 종료되면 Thread는 종료됩니다.

 

newCachedThreadPool을 통해 시작되는 ConsumerWorker는 내부에서 무한루프로 consumer.poll()을 통해 데이터를 지속적으로 들고옵니다. 만약 ConsumerStatus.isRunning이 false가 발생하게 되면 모든 작업(println)을 처리한 뒤에 안전하게 close()까지 수행하여 종료하게 됩니다. 

 

wakeup Exception 등을 통해 처리할 수 도 있지만, 위와 같은 방식도 훌륭히 종료 처리 가능합니다.

 

2) shutdownHook을 통한 consumer close 처리

application종료시 안전하게 자원을 종료할 의무가 있습니다. 이러한 상황에서 shutdown hook으로 신호를 받아 각 consumer thread를 안전하게 종료할 수 있습니다.

 

전체 소스코드

github repository에서 전체 소스코드를 다운받을 수 있습니다.

https://github.com/AndersonChoi/scala-kafka-multi-thread-consumer

 

AndersonChoi/scala-kafka-multi-thread-consumer

scala multi thread consumer application example. Contribute to AndersonChoi/scala-kafka-multi-thread-consumer development by creating an account on GitHub.

github.com

 

태그