빅데이터 210

카프카 커넥터의 태스크에 Priority를 부여할 수 없을까?

관련 지라 : KAFKA-5741 https://issues.apache.org/jira/browse/KAFKA-5741 [KAFKA-5741] Prioritize threads in Connect distributed worker process - ASF JIRA Connect's distributed worker process uses the DistributedHerder to perform all administrative operations, including: starting, stopping, pausing, resuming, reconfiguring connectors; rebalancing; etc. The DistributedHerder uses a single threaded exec..

빅데이터/Kafka 2022.10.04
토픽을 GlobalKTable 구체화된 뷰(view) 키-값 저장소로 사용시 특이점 및 주의사항

https://blog.voidmainvoid.net/442 카프카 스트림즈 KTable로 선언한 토픽을 key-value 테이블로 사용하기 카프카 스트림즈의 KTable은 토픽의 데이터를 key-value형태로 사용할 수 있도록 구체화된 뷰(Materialized View)를 제공합니다. 구현방법은 다음과 같습니다. 0. 카프카 스트림즈 디펜던시 추가 dependencies blog.voidmainvoid.net 토픽의 데이터를 키-값 저장소로 사용하여 데이터를 조회할 수 있는 방법을 앞서 다루어 보았습니다. 아래는 응용하여 만든 GlobalKTable의 뷰 입니다. StreamsBuilder builder = new StreamsBuilder(); GlobalKTable addressTable = b..

빅데이터/Kafka 2022.09.15
카프카 커넥트/커넥터 내부 살펴보기 - 2.8기준, sinkTask 위주로

카프카 커넥트는 data sink/source를 위한 파이프라인을 운영하기 위해 만들어진 모듈입니다. 크게 두가지 Connect 타입을 지원하고 있습니다. standalone부터 distributed까지 코드를 보면서 내부 구조를 살펴보겠습니다. 우선 살펴봐야할 것은 cli입니다. ConnectDistributed.java 또는 ConnectStandalone.java에서 시작합니다. 커넥트를 실행할 때는 다음과 같은 명령어로 실행하기 때문에 위 자바 파일이 시작점이라 볼 수 있습니다. // standalone일 경우 $ bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.proper..

빅데이터/Kafka 2022.08.30
ConnectException: Exiting WorkerSinkTask due to unrecoverable exception 이슈란?

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtim..

빅데이터/Kafka 2022.08.03
WINDOW STORE CHANGE LOG ADDITIONAL RETENTION MS CONFIG 옵션 분석

/** {@code windowstore.changelog.additional.retention.ms} */ @SuppressWarnings("WeakerAccess") public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. A..

빅데이터/Kafka 2022.06.15
초급자를 위한 [아파치 카프카 애플리케이션 개발]온라인 강의를 출시하였습니다.

2022년 5월! 온라인 교육 사이트인 인프런에서 [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! 강의를 출시하였습니다. 강의 링크 : https://bit.ly/3PsEAt8 [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! - 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고 실습하는 시간 www.inflearn.com - 강의 이름 : [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! - 강의 설명 : 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그..

빅데이터/Kafka 2022.06.02
카프카를 활용한레이싱카 센서 실시간 수집 데이터 파이프라인 구축

오늘은 카레이싱에서 자동차에서 출력되는 여러 지표들을 수집하는 데이터 파이프라인을 만들어 보겠습니다. F1, WRC를 비롯한 다양한 카레이싱 팀에서는 데이터의 수집과 활용이 우승으로 가는 지름길인 것은 이미 널리 알려져 있습니다. 2021 F1 의 월드 컨스트럭터 챔피언인 메르세데스-AMG 페트로나스 포뮬러 원팀은 데이터를 잘 활용하는 팀 중 하나입니다. 페트로나스 포뮬러팀에서 수집하는 F1 자동차의 센서는 5만개에 달하며 한경기당 300기가바이트, 모든 경기 동안에는 테라바이트에 달하는 데이터가 쌓이게 된다고 합니다. F1 경기는 총 22번 경기를 뛰게 되는데 자체 프랙티스 세션까지 합치면 몇백 테라바이트 이상의 데이터가 모이게 됩니다. 이렇게 모아진 데이터는 다음 경기를 위해 분석하고 경주용 F1 머..

빅데이터/Kafka 2022.05.02
Kafka Producer JMX exporter 사용하기

이 글은 오픈소스 아파치 카프카 공식 자바 라이브러리를 사용하여 개발할 때 JMX exporter를 사용하여 producer의 지표를 수집하기 위한 글입니다. 1) KafkaProducer 애플리케이션 개발 build.gradle 코드 plugins { id 'java' } group 'com.example' version '1.0' repositories { mavenCentral() } dependencies { compile 'org.apache.kafka:kafka-clients:2.5.0' compile 'org.slf4j:slf4j-simple:1.7.30' } task uberJar(type: Jar) { from sourceSets.main.output dependsOn configurat..

빅데이터/Kafka 2022.04.19
대규모 데이터의 카프카 프로듀서 성능 향상 방법

카프카 프로듀서는 acks, linger.ms, batch.size 조절을 통해 성능 향상을 달성 할 수 있습니다. 각 옵션별로 한계점과 성능 향상 방법을 알아보겠습니다. 여기서는 대규모 데이터가 들어오는 것을 가정하였습니다. 가정사항 - Record의 메시지 크기 10Kbytes - 레코드 유입량 : 1000tps acks acks는 카프카 프로듀서로 전송한 레코드가 정상적으로 리더 또는 팔로워 파티션에 적재되었는지 확인하는 역할을 합니다. 0으로 설정할 경우 모든 전송 케이스에 대해 성공으로 처리하고 1의 경우 리더 파티션에 적재되었을 경우 성공으로 처리합니다. all(-1)로 설정할 경우에는 리더와 팔로워 파티션(min.insync.replicas)에 적재가 완료되었을 경우 성공으로 처리하지만 al..

빅데이터/Kafka 2022.04.14
카프카 커넥트의 태스크 밸런싱 로직, DistributedHerder(양치기) 그리고 IncrementalCooperativeAssignor 내부 동작 소개

Herder; 명사 1. 양치기, 목부 카프카 커넥트는 워커, 커넥터, 태스크로 이루어져 있습니다. 워커는 카프카 커넥트 프로세스를 뜻하며 커넥터와 태스크를 실행시키기 위한 프로세스입니다. 커넥터는 태스크를 실행하는 관리도구로서 여러 태스크를 하나의 파이프라인으로 라이프 사이클을 관리합니다. 태스크는 데이터를 실질적으로 처리하는 부분이라고 볼 수 있습니다. 커넥터에는 1개 이상의 태스크가 포함되며 각 태스크는 프로듀서 또는 컨슈머 역할을 수행합니다. 일반적으로 분산모드 커넥트를 운영할 때 커넥터를 실행할 경우 태스크는 여러 워커에서 분산해서 실행됩니다. 예를 들어 5개의 워커로 이루어진 분산 모드 커넥트를 실행하고 7개의 태스크를 가진 커넥터를 실행하면 다음과 같이 할당됩니다. [worker-0] - [..

빅데이터/Kafka 2022.03.23
MLOps란 무엇인가? 영상 해설

https://www.youtube.com/watch?v=xZKtofBe18I MLOps란 무엇인가 2021년 AI업계에서 많이 나온 단어, 머신러닝을 실제로 다룰때 서비스에 적용하는 과정에서 필요한 분야. 초창기라 공식적이 정의 없음. 다만, 머신러닝 -> 서비스 전달 과정에 필요한 operation 데이터 브릭스에 따르면 - ModelOps : 모델 개발에 필요한 운영 - DataOps : 데이터 관련 운영 - DevOps : 소프트웨어 관련 운영 머신러닝 모델을 서비스에 올리려는 시도가 많이 진행되고 있음. 그러나 이런 시도에 따라 몇가지 이슈가 발생됨 1) 로컬에서 개발한 모델(주피터에서 개발한 어떤 모델)을 어떻게 서비스에 사용할 수 있을까? 2) 모델의 버전관리는 어떻게하지? 3) 데이터셋 버..

빅데이터 2022.03.17
카산드라 특정 테이블에 TTL 데이터를 넣을 때 적합한 컴팩션 전략은?

질문 카산드라에서 특정 테이블에 일부 데이터는 TTL 2일로 설정되어 있고, 일부 데이터는 60일 TTL로 되어 있다면 가장 적합한 컴팩션 전략은 무엇인가요? 1. LeveledCompactionStrategy (LCS) 2. SizeTieredCompactionStrategy (STCS) 3. TimeWindowCompactionStrategy (TWCS) 답변 가장 적합한 전략은 TimeWindowCompactionStrategy입니다. 만에하나 동일 테이블에 여러 종류의 ttl 시간이 설정되어 있다고 하더라도 TWCS가 적합합니다. unchecked tombstone compaction을 설정함으로서 2일 이상 지난 TTL 데이터는 삭제되도록 하는 것이 좋습니다. ALTER TABLE my_tabl..

카산드라 TimeWindowCompactionStrategy 설명

TimeWindowCompactionStrategy TimeWindowCompactionStrategy(TWCS)는 데이터가 디스크에서 timestamp 단위로 묶일 때 유용합니다. 주로 TTL로 쓰여진 데이터를 다룰 경우 사용됩니다. 데이터가 만료되거나 TTL이 도달한 SSTable의 데이터는 대략적으로 비슷한 시간대에 삭제됩니다. 이 전략을 사용하면 이런 데이터를 완전히 삭제시킵니다. SizeTieredCompactionStrategy(STCS) 또는 LeveledCompactionStrategy(LSC)를 사용할 때 보다 디스크 사용량이 줄어들게 됩니다. TWCS의 기본 컨샙은 윈도우(window) 기준으로 sstable 파일을 생성하는 것입니다. 윈도우는 아래 2가지 주요 옵션에 의해 설정됩니다...

카산드라와 TTL, 툼스톤 그리고 관련 동작(컴팩션)

카산드라에서는 데이터의 TTL 또는 삭제에 의해 데이터가 바로 삭제되는 것이 아니라 툼스톤을 사용하여 데이터를 삭제합니다. 카산드라와 같은 분산 데이터베이스에서 데이터를 바로 삭제하는 것은 매우 어려운 일입니다. 그렇기 때문에 데이터를 삭제하기 전에 툼스톤이라고 불리는 플래그를 사용하여 해당 데이터가 삭제 대기로 옮긴 이후 추후에 데이터를 삭제시킵니다. 관련 동작을 알기 위해서는 우선 컴팩션이 무엇인지 알아보아야 합니다. 컴팩션(Compaction) 카산드라에서 컴팩션은 최고의 성능을 내기 위해 적재된 데이터(SSTable)를 결합하는 행위를 뜻합니다. 반면, 압축은 컴프레션이므로 컴팩션과 햇갈려서는 안됩니다. 카산드라에서 컴팩션은 다음과 같은 전략을 제공합니다. 사용방법에 따라 적합한 전략의 컴팩션을 ..

아파치 카산드라 설정 파일 및 상용 환경 셋팅

카산드라는 설치 방법에 따라 다른 디렉토리에 설정 파일이 있습니다. - 도커 : /etc/cassandra 디렉토리 - tarball : conf 디렉토리 - package : /etc/cassandra 디렉토리 카산드라의 기본 설정파일은 cassandra.yaml입니다. 1개의 노드로 구성할 수 있는 클러스터에 적합하게 설정되어 있습니다. 하지만 2개 이상 노드로 운영할 경우에는 다양한 추가 셋팅을 해야만 합니다. 클러스터로 구성하기 위해서는 일부 설정값을 입력해야만 합니다. - cassandra.yaml : 카산드라의 기본 셋팅 파일 - cassandra-env.sh : 카산드라에서 사용하는 환경 변수 셋팅 - cassandra-rackdc.properties or cassandra-topology...

아파치 카산드라 다이나모, 일관된 해싱, 복제 개념 살펴보기

다이나모 아파치 카산드라는 아마존의 다이나모 분산 키값 저장소의 기술들을 기반으로 만들어졌습니다. 다이나모 시스템은 3개의 주요 기술로 만들어집니다. 1) 파티셔닝된 데이터셋에 coordination 요청 2) Ring기반 맴버쉽 및 failure 감지 3) 로컬 퍼시스턴스 저장소 엔진 카산드라는 처음 2개의 클러스터링 구조를 기반으로 설계되었습니다. 그리고 LSM(Log Structured Merge Tree)를 기반으로 저장소 엔진을 사용합니다. - 일관된 해싱을 통한 데이터셋 파티셔닝 수행 - 버저닝된 데이터와 조정 가능한 consistency를 통한 멀티 마스터 데이터 복제 - 가십 프로토콜(gossip protocol)을 사용한 분산 클러스터 멤버쉽 추가 및 장애 감지 - 범용 하드웨어를 사용한..

아파치 카산드라 살펴보기, 설명, 기본 개념

아파치 카산드라는 오픈소스이며 분산 NoSQL 데이터베이스입니다. 파티션 기반의 wide column 저장소 모델을 활용하며 consistent semantics를 지원합니다. 아파치 카산드라는 페이스북에서 SEDA(Staged event-driven architecture)를 기반으로 설계되었고 아마존의 다이나모 분산 저장소와 구글의 빅테이블 저장소 엔진 모델을 기반으로 디자인되었습니다. 다이나모와 빅테이블 두개 다 스케일러블하고 안전하며 고 가용성의 데이터 저장소 특징을 가지도록 개발되었지만 일부는 완전히 지원되지 않습니다. - 다이나모 : https://www.allthingsdistributed.com/2007/10/amazons_dynamo.html - 빅테이블 : https://static.g..

카산드라 TTL에 따른 데이터 삭제 정리

https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useExpire.html Expiring data with time-to-live Use time-to-live (TTL) to expire data in a column or table. Columns and tables support an optional expiration period called TTL (time-to-live); TTL is not supported on counter columns. Define the TTL value in seconds. Data expires once it exceeds the TTL p docs.datastax.com 카산드라에서는 컬럼단위 또는 테이블 단..

kafka consumer와 seekToBeginning를 활용하여 offset reset하기

카프카 컨슈머 클라이언트는 seekToBeginning 함수가 있습니다. 이 함수를 사용하면 특정 파티션의 오프셋을 최소 레코드로 지정할 수 있습니다. /** * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the * first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called. * If no partitions are provided, seek to the first offset for all of the currently as..

빅데이터/Kafka 2022.01.17
스키마 레지스트리 자바 클라이언트(프로듀서,컨슈머) 테스트

1. 스키마 레지스트리 설정, 실행 confluent-7.0.0/etc/schema-registry/schema-registry.properties 설정파일 listeners=http://0.0.0.0:8081 kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092 kafkastore.topic=_schemas debug=false 스키마 레지스트리 실행 $ bin/schema-registry-start etc/schema-registry/schema-registry.properties [2021-12-16 21:36:27,121] INFO SchemaRegistryConfig values: access.control.allow.headers = access...

빅데이터/Kafka 2021.12.16
confluent developer certification 예시 문제 해설

confluent developer certification sample : driver토픽의 1번 파티션에서 모든 input을 다루는 데이터. 즉, 프로듀서가 레코드를 보내면 리더 파티션이 존재하는 브로커와 통신을 한다. 그러므로 정답은 1번 파티션의 리더가 위치한 102번 브로커가 정답. 카프카 컨슈머가 range partition assignment 전략일 경우 어떻게 파티션과 컨슈머가 연결될 것인지에 대한 정답을 찾는 것이다. range partition assignment의 경우 consumer의 member id를 사전식(알파벳순)으로 정렬하여 각 토픽의 파티션을 숫자를 기준으로 컨슈머에 할당한다. 그러므로 이 경우 다음과 같이 연동된다. 그러므로 정답은 c. 토픽a의 파티션0과 토픽b의 파티..

빅데이터/Kafka 2021.12.15
confluent HdfsSinkConnector 파티셔너 설명

confluent에서는 hdfs2를 위한 sink connector를 컨플루언트 커뮤니티 라이센스로 제공합니다. 해당 기능 중 파티셔너 기능에 대해 정리합니다. https://docs.confluent.io/kafka-connect-hdfs/current/overview.html#partitioners-and-storage HDFS 2 Sink Connector for Confluent Platform | Confluent Documentation Home Kafka Connectors HDFS 2 Sink Connector for Confluent Platform The Kafka Connect HDFS 2 Sink connector allows you to export data from Kafka t..

빅데이터/Kafka 2021.11.25
macos에서 podman으로 rest-proxy 실행하기

rest proxy는 카프카와 연동시 프로듀서, 컨슈머의 역할을 http로 수행할 수 있는 기능을 가진 애플리케이션입니다. https://blog.voidmainvoid.net/345 카프카의 토픽 데이터를 REST api로 주고받자 - Kafka rest proxy 사용 confluent에서는 rest proxy라고 불리는 카프카 클러스터를 위한 RESTful interface application을 오픈소스로 제공하고 있습니다. 기존에 Kafka connect, Kafka client로 데이터를 전달하는 것과는 사뭇 다르게.. blog.voidmainvoid.net macos에서 rest proxy를 사용하여 카프카와 연동 테스트 수행하는 방법을 정리합니다. $ podman machine start ..

빅데이터/Kafka 2021.11.16
카산드라 모델링 분석하기 좋은 테이블 구성하기

기본적으로 생각해야할 부분은 컬럼 기반 데이터베이스라는 점입니다. 이를 기억하고 수행해야 합니다. 1개 필드를 프라이머리 키(파티션 키)로 생성한 테이블 cqlsh:cory> create table log1(uid int, machine varchar, log_time timestamp, log varchar, primary key(uid)); - uid : int - machine : varchar - log_time : timestmp - log : varchar primary key이자 partition key는 uid로 설정 1) 데이터 insert cqlsh:cory> insert into log1(uid, log, log_time, machine) values(1,'gg',toTimestamp..

couchbase 카프카 싱크 커넥트 사용 방법

카프카 커넥트는 카프카 클러스터와 기타 데이터베이스간 파이프라인을 반복적으로 만드는데 특화되어 있습니다. 카프카 커넥터는 파이프라인의 구현체 인데요. 싱크 커넥터와 소스 커넥터로 이루어져 있습니다. 여기서는 카우치 베이스싱크 커넥터를 살펴봅니다. 카우치베이스 싱크 커넥터는 카프카의 토픽을 카우치베이스로 저장 로직이 담긴 커넥터입니다. 카우치베이스 싱크 커넥터는 at least once 전달을 지원하며 중복이 발생했을 경우에는 재입수가 필요할 수도 있습니다. 카프카 싱크 커넥터를 사용하려면 깃허브 레포지토리에 있는 배포판을 다운받아서 커넥터에 포함시켜 사용할 수 있습니다. - 카우치베이스 커넥터 깃헙 : https://github.com/couchbase/kafka-connect-couchbase - 카우..

빅데이터/Kafka 2021.10.29
couchbase 따라하기(in mac)

카우치베이스는 도커 이미지를 활용하여 실행할 수 있습니다. 맥에서 도커 이미지를 사용하기 위해 podman을 설치하고 상용합니다. podman : https://podman.io/ Podman 16 Oct 2021 » Why can't I use sudo with rootless Podman? So why can’t I use sudo with rootless Podman? Matt Heon explains why and how you can safely work around the “need” if you have it in a recent blog post on the Red Hat Enable Sysadmin site, Why podman.io 1) podman 설치 $ brew install po..

빅데이터/nosql 2021.10.29