빅데이터/Kafka 125

카프카를 활용한레이싱카 센서 실시간 수집 데이터 파이프라인 구축

오늘은 카레이싱에서 자동차에서 출력되는 여러 지표들을 수집하는 데이터 파이프라인을 만들어 보겠습니다. F1, WRC를 비롯한 다양한 카레이싱 팀에서는 데이터의 수집과 활용이 우승으로 가는 지름길인 것은 이미 널리 알려져 있습니다. 2021 F1 의 월드 컨스트럭터 챔피언인 메르세데스-AMG 페트로나스 포뮬러 원팀은 데이터를 잘 활용하는 팀 중 하나입니다. 페트로나스 포뮬러팀에서 수집하는 F1 자동차의 센서는 5만개에 달하며 한경기당 300기가바이트, 모든 경기 동안에는 테라바이트에 달하는 데이터가 쌓이게 된다고 합니다. F1 경기는 총 22번 경기를 뛰게 되는데 자체 프랙티스 세션까지 합치면 몇백 테라바이트 이상의 데이터가 모이게 됩니다. 이렇게 모아진 데이터는 다음 경기를 위해 분석하고 경주용 F1 머..

빅데이터/Kafka 2022.05.02
Kafka Producer JMX exporter 사용하기

이 글은 오픈소스 아파치 카프카 공식 자바 라이브러리를 사용하여 개발할 때 JMX exporter를 사용하여 producer의 지표를 수집하기 위한 글입니다. 1) KafkaProducer 애플리케이션 개발 build.gradle 코드 plugins { id 'java' } group 'com.example' version '1.0' repositories { mavenCentral() } dependencies { compile 'org.apache.kafka:kafka-clients:2.5.0' compile 'org.slf4j:slf4j-simple:1.7.30' } task uberJar(type: Jar) { from sourceSets.main.output dependsOn configurat..

빅데이터/Kafka 2022.04.19
대규모 데이터의 카프카 프로듀서 성능 향상 방법

카프카 프로듀서는 acks, linger.ms, batch.size 조절을 통해 성능 향상을 달성 할 수 있습니다. 각 옵션별로 한계점과 성능 향상 방법을 알아보겠습니다. 여기서는 대규모 데이터가 들어오는 것을 가정하였습니다. 가정사항 - Record의 메시지 크기 10Kbytes - 레코드 유입량 : 1000tps acks acks는 카프카 프로듀서로 전송한 레코드가 정상적으로 리더 또는 팔로워 파티션에 적재되었는지 확인하는 역할을 합니다. 0으로 설정할 경우 모든 전송 케이스에 대해 성공으로 처리하고 1의 경우 리더 파티션에 적재되었을 경우 성공으로 처리합니다. all(-1)로 설정할 경우에는 리더와 팔로워 파티션(min.insync.replicas)에 적재가 완료되었을 경우 성공으로 처리하지만 al..

빅데이터/Kafka 2022.04.14
카프카 커넥트의 태스크 밸런싱 로직, DistributedHerder(양치기) 그리고 IncrementalCooperativeAssignor 내부 동작 소개

Herder; 명사 1. 양치기, 목부 카프카 커넥트는 워커, 커넥터, 태스크로 이루어져 있습니다. 워커는 카프카 커넥트 프로세스를 뜻하며 커넥터와 태스크를 실행시키기 위한 프로세스입니다. 커넥터는 태스크를 실행하는 관리도구로서 여러 태스크를 하나의 파이프라인으로 라이프 사이클을 관리합니다. 태스크는 데이터를 실질적으로 처리하는 부분이라고 볼 수 있습니다. 커넥터에는 1개 이상의 태스크가 포함되며 각 태스크는 프로듀서 또는 컨슈머 역할을 수행합니다. 일반적으로 분산모드 커넥트를 운영할 때 커넥터를 실행할 경우 태스크는 여러 워커에서 분산해서 실행됩니다. 예를 들어 5개의 워커로 이루어진 분산 모드 커넥트를 실행하고 7개의 태스크를 가진 커넥터를 실행하면 다음과 같이 할당됩니다. [worker-0] - [..

빅데이터/Kafka 2022.03.23
kafka consumer와 seekToBeginning를 활용하여 offset reset하기

카프카 컨슈머 클라이언트는 seekToBeginning 함수가 있습니다. 이 함수를 사용하면 특정 파티션의 오프셋을 최소 레코드로 지정할 수 있습니다. /** * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the * first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called. * If no partitions are provided, seek to the first offset for all of the currently as..

빅데이터/Kafka 2022.01.17
스키마 레지스트리 자바 클라이언트(프로듀서,컨슈머) 테스트

1. 스키마 레지스트리 설정, 실행 confluent-7.0.0/etc/schema-registry/schema-registry.properties 설정파일 listeners=http://0.0.0.0:8081 kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092 kafkastore.topic=_schemas debug=false 스키마 레지스트리 실행 $ bin/schema-registry-start etc/schema-registry/schema-registry.properties [2021-12-16 21:36:27,121] INFO SchemaRegistryConfig values: access.control.allow.headers = access...

빅데이터/Kafka 2021.12.16
confluent developer certification 예시 문제 해설

confluent developer certification sample : driver토픽의 1번 파티션에서 모든 input을 다루는 데이터. 즉, 프로듀서가 레코드를 보내면 리더 파티션이 존재하는 브로커와 통신을 한다. 그러므로 정답은 1번 파티션의 리더가 위치한 102번 브로커가 정답. 카프카 컨슈머가 range partition assignment 전략일 경우 어떻게 파티션과 컨슈머가 연결될 것인지에 대한 정답을 찾는 것이다. range partition assignment의 경우 consumer의 member id를 사전식(알파벳순)으로 정렬하여 각 토픽의 파티션을 숫자를 기준으로 컨슈머에 할당한다. 그러므로 이 경우 다음과 같이 연동된다. 그러므로 정답은 c. 토픽a의 파티션0과 토픽b의 파티..

빅데이터/Kafka 2021.12.15
confluent HdfsSinkConnector 파티셔너 설명

confluent에서는 hdfs2를 위한 sink connector를 컨플루언트 커뮤니티 라이센스로 제공합니다. 해당 기능 중 파티셔너 기능에 대해 정리합니다. https://docs.confluent.io/kafka-connect-hdfs/current/overview.html#partitioners-and-storage HDFS 2 Sink Connector for Confluent Platform | Confluent Documentation Home Kafka Connectors HDFS 2 Sink Connector for Confluent Platform The Kafka Connect HDFS 2 Sink connector allows you to export data from Kafka t..

빅데이터/Kafka 2021.11.25
macos에서 podman으로 rest-proxy 실행하기

rest proxy는 카프카와 연동시 프로듀서, 컨슈머의 역할을 http로 수행할 수 있는 기능을 가진 애플리케이션입니다. https://blog.voidmainvoid.net/345 카프카의 토픽 데이터를 REST api로 주고받자 - Kafka rest proxy 사용 confluent에서는 rest proxy라고 불리는 카프카 클러스터를 위한 RESTful interface application을 오픈소스로 제공하고 있습니다. 기존에 Kafka connect, Kafka client로 데이터를 전달하는 것과는 사뭇 다르게.. blog.voidmainvoid.net macos에서 rest proxy를 사용하여 카프카와 연동 테스트 수행하는 방법을 정리합니다. $ podman machine start ..

빅데이터/Kafka 2021.11.16
couchbase 카프카 싱크 커넥트 사용 방법

카프카 커넥트는 카프카 클러스터와 기타 데이터베이스간 파이프라인을 반복적으로 만드는데 특화되어 있습니다. 카프카 커넥터는 파이프라인의 구현체 인데요. 싱크 커넥터와 소스 커넥터로 이루어져 있습니다. 여기서는 카우치 베이스싱크 커넥터를 살펴봅니다. 카우치베이스 싱크 커넥터는 카프카의 토픽을 카우치베이스로 저장 로직이 담긴 커넥터입니다. 카우치베이스 싱크 커넥터는 at least once 전달을 지원하며 중복이 발생했을 경우에는 재입수가 필요할 수도 있습니다. 카프카 싱크 커넥터를 사용하려면 깃허브 레포지토리에 있는 배포판을 다운받아서 커넥터에 포함시켜 사용할 수 있습니다. - 카우치베이스 커넥터 깃헙 : https://github.com/couchbase/kafka-connect-couchbase - 카우..

빅데이터/Kafka 2021.10.29
카프카 스트림즈에서 stateful window 처리를 다루는 방법 그리고 커밋타이밍

카프카 스트림즈는 비상태기반(stateless), 상태기반(stateful) 처리를 지원하는 다양한 메서드를 제공합니다. 스트림즈DSL을 통해 구현할 수도 있고 또는 프로세서API를 사용해서 직접 구현하는 방식도 있습니다. 이 포스팅에서는 스트림즈DSL을 활용하여 상태기반 데이터 처리할 때 어떻게 input, output이 동작하는지 설명합니다. 카프카 스트림즈의 스트림즈DSL에서 window function은 4가지가 있습니다. - 텀블링 윈도우 - 세션 윈도우 - 슬라이딩 윈도우 - 호핑 윈도우 이 중 가장 텀블링 윈도우에 대해 살펴볼건데요. 텀블링 윈도우는 특정 사이즈의 윈도우가 서로 다른 윈도우와 겹치지 않게 지속되는 시간 단위 윈도우를 뜻합니다. 다음은 텀블링 윈도우 예시 사진입니다. 레코드의 ..

빅데이터/Kafka 2021.09.08
카프카 스트림즈로 schedule operation 수행하기(번역)

원문 : https://kafka-tutorials.confluent.io/kafka-streams-schedule-operations/kstreams.html 카프카 스트림즈는 토픽의 데이터를 읽어 상태기반, 비상태기반 처리를 하는 스트리밍 라이브러리입니다. 오늘은 컨플루언트에서 카프카 스트림즈가 스케쥴링 동작을 어떻게 수행하는지에 대한 코드 예시를 번역하였습니다. 1. 프로젝트 초기화 신규 디렉토리를 생성합니다. $ mkdir kafka-streams-schedule-operations && cd kafka-streams-schedule-operations 2. 컨플루언트 플랫폼 가져오기 Dockerfile을 통해 데이터 생성기를 먼저 가져옵니다. 하기 Dockerfile은 Dockerfile-con..

빅데이터/Kafka 2021.07.23
카프카 스트림즈에서 SlidingWindow에 대한 고찰

카프카 스트림즈는 상태기반/비상태기반 데이터 처리에 효과적인 라이브러리입니다. 특히 상태 기반 처리에 큰 도움이 되는데 여러 상태 기반 처리 중 SlidingWindow에 대해 살펴보고자 합니다. SlidingWindow는 window종류 중 하나로서 일정 시간동안의 데이터들의 집합에 대해 연산을 하는 것을 뜻합니다. 마치 베란다의 슬라이딩 윈도우가 옆으로 지나가는 듯한 모습과 비슷합니다. 카프카 스트림즈에서는 aggregation연산 또는 join연산을 할 때 window를 적용할 수 있는데 총 4개의 윈도우를 지원합니다. 각 윈도우 이름과 특징은 다음과 같습니다. - 호핑 윈도우 : 고정적인 사이즈의 윈도우, 윈도우끼리 겹치는 부분이 있음 - 텀블링 윈도우 : 고정적인 사이즈의 윈도우, 윈도우끼리 겹..

빅데이터/Kafka 2021.06.25
카프카 스트림즈 join 사용시 메시지 키 접근하기

카프카 스트림즈에서 KStream 또는 KTable을 사용하여 join을 사용할 때가 있습니다. KStream completedEventsStream = leftStream. join( rightStream, (leftValue, rightValue) -> leftValue + rightValue, JoinWindows.of(windowDuration) ); 상기와 같이 leftStream과 rightStream 2개의 스트림데이터를 조인하는 것은 아주 일반적인 조인 사용 예시인데요. 여기서 lambda 식을 보면 알 수 있다 싶이 leftValue와 rightValue에만 접근이 가능합니다. 즉, 2개의 토픽에서 조인이 되는 조인 key에 대해서는 접근이 불가능하다는 것을 알 수 있습니다. 2개의 토픽..

빅데이터/Kafka 2021.06.21
Cannot get state store TOPIC because the stream thread is STARTING, not RUNNING 에러 해결

KTable을 Materialized View로 사용할 경우 아래와 같은 에러가 발생할 때가 있습니다. Exception in thread "Timer-0" org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store address because the stream thread is STARTING, not RUNNING at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81) at org.apache.kafka.streams.state.internals.Wr..

빅데이터/Kafka 2021.06.16
카프카 스트림즈 KTable로 선언한 토픽을 key-value 테이블로 사용하기

카프카 스트림즈의 KTable은 토픽의 데이터를 key-value형태로 사용할 수 있도록 구체화된 뷰(Materialized View)를 제공합니다. 구현방법은 다음과 같습니다. 0. 카프카 스트림즈 디펜던시 추가 dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0' compile 'org.slf4j:slf4j-simple:1.7.30' implementation "org.apache.kafka:kafka-streams:2.8.0" } 이 예제는 2.8.0을 기준으로 작성하였습니다. 1. 카프카 스트림즈 설정..

빅데이터/Kafka 2021.06.16
kafka spark structured stream 예제코드 및 실행

카프카와 연동하는 스파크 스트림은 2가지 방식으로 구현할 수 있습니다. 1. 구조적 스트림(DataFrame readStream) 2. 스파크 스트리밍(DStream) 그 중 구조적 스트림 방식을 gradle + scala로 구현하기 위한 방법을 설명합니다. build.gradle plugins { id 'idea' id 'java' id 'scala' } group 'com.example' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 targetCompatibility = 1.8 repositories { mavenCentral() jcenter() } ext { scalaVersion = '2.12.14' sparkVersion = '3.1.2' } depe..

빅데이터/Kafka 2021.06.04
카프카 스트림즈 Exactly-once 설정하는 방법과 내부 동작

카프카 스트림즈는 기본적으로 at-least-once 프로세싱을 지원합니다. at-least-once 프로세싱이란 적어도 한번 프로세싱을 지원하는 것으로써 중복은 발생할 수 있고 유실은 절대 발생하지 않는 것입니다. 스트림즈 프로세싱에서라면 특정 레코드에 대해 처리를 하고 유실은 될 수 없고 중복 프로듀스는 될 수 있다는 것입니다. - at least once : 적어도 한번 처리 - at most once : 많아도 한번만 처리 - exactly once : 딱 한번만 처리 카프카는 0.11.0 이후 버전에서 transaction처리를 지원합니다. 이 옵션을 사용하면 스트림즈 API를 사용할 때 Exactly once 프로세싱을 만족하도록 적용할 수 있습니다. 적용하는 방법은 아래와 같습니다. 코드를 ..

빅데이터/Kafka 2021.06.03
카프카 스트림즈 All stream threads have died. 오류 해결 방안

카프카 스트림즈 사용시 토폴로지 내부 에러로 인해 아래와 같은 오류가 발생할 수 있다. All stream threads have died. The instance will be in error state and should be closed. 이에 대응하기 위해 2.8.0에서 추가된 setUncaughtExceptionHandler메서드를 사용하면 스레드를 재시작하여 데이터를 지속 처리할 수 있다. streams.setUncaughtExceptionHandler((exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD); 2.8.0 이전에는 setUncaughtExceptionHandler(fin..

빅데이터/Kafka 2021.05.27
ProducerRecord에 파티션 번호를 지정하면 어떻게 동작할까?

ProducerRecord는 다양한 파라미터를 받고 그 중 파티션번호를 직접 넣어서 사용하는 경우도 있습니다. 이 때 파티션 번호를 넣으면 어떻게 동작할까요? 정답은 KafkaProducer private partition()메서드를 보면 확인할 수 있습니다. /** * computes partition for given record. * if the record has partition returns the value otherwise * calls configured partitioner class to compute the partition. */ private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedV..

빅데이터/Kafka 2021.05.05
카프카를 이벤트 소싱, CQRS로 사용할 수 있을까?

우선 이벤트 소싱과 CQRS에 대한 개념부터 짚고 넘어가겠습니다. 카프카에 대한 개념은 이 동영상을 참고해주세요. 이벤트 소싱 패턴 이벤트 소싱은 배치성 데이터 저장소에 현재 상태만 저장하는 것이 아니라 이벤트 브로커(전용 저장소)에 발생된 모든 이벤트 기록(레코드)들을 기록하는 것을 뜻합니다. 이를 통해 데이터 모델과 비즈니스 도메인을 동기화할 필요가 없어지고 성능, 확장성, 응답성이 향상되어 도메인 태스크를 간소화 할수 있습니다. 기존에는 배치성 데이터 저장소(오라클, mysql 등)에 서비스에 들어오는 명령(또는 이벤트)이 발생할 때 마다 항상 동기화 했었습니다. 간단한 구조와 적은 데이터 접근만 일어날 때는 큰 이슈가 없지만, 데이터과 이벤트가 많아질 수록 데이터에 대한 CRUD와 같은 접근으로 ..

빅데이터/Kafka 2021.05.03
confluent-kafka-go 컨슈머를 구현하는 5가지 방법

confluent-kafka-go는 컨플루언트에서 개발하고 유지보수하는 golang기반 카프카 클라이언트입니다. github.com/confluentinc/confluent-kafka-go confluentinc/confluent-kafka-go Confluent's Apache Kafka Golang client. Contribute to confluentinc/confluent-kafka-go development by creating an account on GitHub. github.com 위 라이브러리를 사용해서 컨슈머 5가지 패턴을 구현해보겠습니다. 컨슈머 구현 사전 작업 코드(컨슈머 초기화) 이 작업은 컨슈머를 이용하기 위한 가장 기본적인 작업입니다. 기본 옵션은 enable.auto.com..

빅데이터/Kafka 2021.04.30
레디슈 큐(queue), 레디스 스트림(streams), 레디스 펍섭(pub/sub) 그리고 카프카와 비교

레디스란? 레디스는 오픈소스 인 메모리 데이터 구조 저장소로서 데이터베이스, 캐시, 메시지 브로커로 역할을 수행한다. 레디스는 String, hash, lists, sets, sorted sets, bitmaps, streams등을 지원한다. 레디스 큐란? 레디스 큐는 레디스의 자료구조 중 List를 이용하여 Queue를 구현한 것이다. 큐는 FIFO(First In First Out)구조로 먼저 들어온 데이터가 먼저 처리되는 것이다. LIST자료구조의 LPUSH, RPOP(또는 RPUSH, LPOP)을 사용하여 구현할 수 있다. 이외에도 BLPOP을 사용하여 블락킹(blocking) pop을 수행할 수 있다. 지정한 시간만큼 기다리고 값이 들어오면 LPOP을 수행하는 것이다. $ lpush my-que..

빅데이터/Kafka 2021.04.25
카프카 스트림즈의 commit.interval.ms옵션

카프카 스트림즈에서 commit.interval.ms 옵션은 프로세서의 frequency 즉, 반복을 얼마만큼 인터벌 간격으로 할 것인가에 대한 것이다. The frequency in milliseconds with which to save the position of the processor. (Note, if processing.guarantee is set to exactly_once, the default value is 100, otherwise the default value is 30000. 예를 들어 KTable로 선언된 변수를 toStream으로 변경하는 구문이 있다고 가정하자. 이 때 30000이면 30초 인터벌을 가지고 변경을 수행한다. 만약 짧게 가져가고 싶다면 0이나 아주 작은 숫..

빅데이터/Kafka 2021.04.15
초~중급자를 위한 [아파치 카프카 애플리케이션 개발] 서적을 출간하였습니다.

2020년 6월 부터 집필을 시작하여 6개월간의 원고 집필과 3개월간의 편집 끝에! 「아파치 카프카 애플리케이션 프로그래밍 with 자바」를 출간하게 되었습니다. 출간일은 4월 14일로 화요일부터 구매 순서대로 배송이 시작됩니다. - 예스24 : www.yes24.com/Product/Goods/99122... - 교보문고 : www.kyobobook.co.kr/product/detai... - 알라딘 : www.aladin.co.kr/shop/wproduct.aspx?... 책을 집필하게 된 계기 사실 개발 블로그와 개발 유튜브를 진행하면서 딱히 책을 쓸 생각은 없었습니다. 책을 쓰는데 들이는 시간과 노력이 생각보다 크다는 것을 알고 있었기 때문입니다. 하지만 제 컨텐츠들을 봐주시고 '카프카를 주제로'..

빅데이터/Kafka 2021.04.11
카프카 스트림즈 suppress() 사용할 때 ClassCastException 문제

카프카 스트림즈에는 suppress()메서드가 있다. 이 메서드를 통해 일정 윈도우 간격 안에 데이터를 모아서 처리할 수 있는데, 이슈가 있다. 값을 모아서 state store(rocksDB)에 저장할 때 제대로 deserialize를 하지못하는것이다. java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) at org.apache.kafka.streams.kstream.internals.suppress..

빅데이터/Kafka 2021.04.06
카프카 스트림즈 애플리케이션 초기화 명령

카프카 스트림즈 애플리케이션을 운영하다가 또는 테스트 중에 오프셋을 초기화 해야할 때가 있습니다. 이때는 아래와 같은 명령어를 사용하면 됩니다. $ ./kafka-streams-application-reset.sh --application-id join-application --input-topics streams-test Reset-offsets for input topics [A.s2olap-kakaotalk] Following input topics offsets will be reset to (for consumer group join-application) Topic: streams-test Partition: 2 Offset: 0 Topic: streams-test Partition: 3 Of..

빅데이터/Kafka 2021.03.31
local kafka single broker 띄우기 with 도커

wurstmeister/kafka-docker는 사실상 표준(디팩토)으로 카프카 도커 이미지로 띄우는 용도로 사용하고 있습니다. 사용 방법은 아래와 같습니다. $ git clone https://github.com/wurstmeister/kafka-docker.git $ cd kafka-docker wurstmeister/kafka-docker에 있는 docker-compose-single-broker.yml 의 내부 설정 중 KAFKA_ADVERTISED_HOST_NAME을 변경합니다. $ vi docker-compose-single-broker.yml version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181"..

빅데이터/Kafka 2021.03.05