본문 바로가기

빅데이터/Kafka

스파크 스트리밍-Kafka Data source 소개

KafkaSource는 스파크의 Structured Streaming에서 Apache kafka를 data source로 사용하기 위한 목적이다. 이 library의 source는 아래에서 확인할 수 있다.

 

Spark Kafkasource : https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

 

2개의 핵심 function을 아래와 같이 정리할 수 있다.

 

- getOffset() : KafkaOffsetrReader를 사용하여 가장 최근의 offset을 가져온다.

- getBatch() : offset의 처음부터 끝까지에 존재하는 DataFrame을 가져온다.

 

만약 spark-sql-kafka-0-10 패키지를 사용한다면 아래와 같이 명시적으로 package사용함을 명시해야한다.

 

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.3

위 숫자들은 각각의 버젼을 뜻한다.

1) 0-10 : kafka version

2) 2.12 : scala version

3) 2.4.3 : Spark version

 

Kafka Source Provider 코드

KafkaSrouceProvider는 아래와 같은 옵션이 필수로 들어간다.

 

- subscribe, subscribepattern, or assign

- kafka.bootstrap.server

 

위 옵션을 사용하여 만든 기본코드는 아래와 같다.

kafka_df = spark.readStream.\
  format("kafka").\ # set data ingestion format as Kafka
  option("subscribe", "<topic_name>").\ #This is required although it says option.
  option("kafka.bootstrap.servers", "localhost:9092").\ #You will also need the url and port of the bootstrap server
  load()

 

만약 data의 schema를 확인하고 싶다면 아래와 같은 코드를 사용하면 된다.

 

import logging
from pyspark.sql import SparkSession

def run_spark_job(spark):

    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:<your port>") \
        .option("subscribe", "<your topic name>") \
        .option("startingOffsets", "earliest") \
        .option("maxOffsetsPerTrigger", 10) \
        .option("stopGracefullyOnShutdown", "true") \
        .load()

    # Show schema for the incoming resources for checks
    df.printSchema()
    
if __name__ == "__main__":
    logger = logging.getLogger(__name__)

    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("StructuredStreamingSetup") \
        .getOrCreate()

    logger.info("Spark started")

    run_spark_job(spark)

    spark.stop()

- startingOffsets : 읽어올 offset의 시작위치 설정

- maxOffsetsPerTrigger : trigger interval per offset

- stopGracefullyOnShutdown : true로 할경우 StreamingContext를 바로 shutdown하지 않고 JVM절차에 따라 shutdown한다(default false)

 

Kafka Offset Reader

KafkaOffsetReader class는 Spark에서 Kafka의 offset을 읽어오는 역할을 한다. 이 class는 ConsumerStrategy class를 사용하는데, 어느 topic, partition에서 읽어올지 정의한다. 

만약 WAL(Write Ahead Log)를 통해 고가용성을 유지하고 싶다면 아래와 같은 방식을 사용하면된다.

 

spark.streaming.receiver.writeAheadLog.enable

 

반응형