카프카와 연동하는 스파크 스트림은 2가지 방식으로 구현할 수 있습니다.
1. 구조적 스트림(DataFrame readStream)
2. 스파크 스트리밍(DStream)
그 중 구조적 스트림 방식을 gradle + scala로 구현하기 위한 방법을 설명합니다.
build.gradle
plugins {
id 'idea'
id 'java'
id 'scala'
}
group 'com.example'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenCentral()
jcenter()
}
ext {
scalaVersion = '2.12.14'
sparkVersion = '3.1.2'
}
dependencies {
compile "org.apache.spark:spark-core_2.12:$sparkVersion"
compile "org.apache.spark:spark-sql_2.12:$sparkVersion"
compile "org.apache.spark:spark-sql-kafka-0-10_2.12:$sparkVersion"
compile "org.scala-lang:scala-library:$scalaVersion"
}
여기서 중요한점은 스칼라와 스파크 호환성을 따져보고 작성해야 한다는 것입니다. 스파크 3.1.2는 스칼라 2.12와 호환됩니다.
Main.java
package com.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark Structured Streaming Example")
.master("local[4]")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("kafka.group.id", "test-group")
.load()
import spark.implicits._
val values = df.select($"value".cast("STRING").as("json"))
values.writeStream
.trigger(ProcessingTime("5 seconds"))
.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
localhost의 카프카로부터 test토픽의 레코드를 가져와서 메시지 값을 json형태로 읽고 지속적으로 프린팅하도록 동작합니다. 컨슈머 그룹은 test-group 입니다.
테스트를 위해 스파크를 RUN하고, kafka-console-producer.sh 로 데이터를 추가합니다.
$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
> {"hello":"world"}
> {"hello":"world"}
애플리케이션 로그를 확인하면 마이크로 배치형태로 데이터를 가져와 처리하는 모습을 볼 수 있습니다.
21/06/04 00:18:56 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "53309be6-eacb-4c6f-9053-4e5e9d68e975",
"runId" : "de5e7c32-f6d0-44ae-9dbd-7efb17b093df",
"name" : null,
"timestamp" : "2021-06-04T00:18:55.003Z",
"batchId" : 1,
"numInputRows" : 5,
"inputRowsPerSecond" : 0.9994003597841296,
"processedRowsPerSecond" : 4.317789291882557,
"durationMs" : {
"addBatch" : 1072,
"getBatch" : 0,
"latestOffset" : 19,
"queryPlanning" : 11,
"triggerExecution" : 1158,
"walCommit" : 32
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[test]]",
"startOffset" : {
"test" : {
"2" : 3501800,
"1" : 3511881,
"3" : 3618022,
"0" : 4136345
}
},
"endOffset" : {
"test" : {
"2" : 3501800,
"1" : 3511881,
"3" : 3618022,
"0" : 4136350
}
},
"numInputRows" : 2,
"inputRowsPerSecond" : 0.9994003597841296,
"processedRowsPerSecond" : 4.317789291882557
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@60ca6fef",
"numOutputRows" : 2
}
}
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
| json|
+--------------------+
|{"hello":"world"} |
|{"hello":"world"} |
+--------------------+
흥미롭게도 스파크 구조적 스트림을 사용하면 마이크로 배치로 가져오는 토픽의 각 파티션별 시작, 마지막 오프셋이 로그로 남습니다. 그리고 몇개의 row를 가져와서 처리했는지? 시간은 얼마나 걸렸는지 확인할 수 있습니다.
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 스트림즈 join 사용시 메시지 키 접근하기 (0) | 2021.06.21 |
---|---|
Cannot get state store TOPIC because the stream thread is STARTING, not RUNNING 에러 해결 (0) | 2021.06.16 |
카프카 스트림즈 KTable로 선언한 토픽을 key-value 테이블로 사용하기 (0) | 2021.06.16 |
카프카 스트림즈 Exactly-once 설정하는 방법과 내부 동작 (0) | 2021.06.03 |
카프카 스트림즈 All stream threads have died. 오류 해결 방안 (0) | 2021.05.27 |
ProducerRecord에 파티션 번호를 지정하면 어떻게 동작할까? (0) | 2021.05.05 |