빅데이터 210

카프카 source connector가 exactly-once를 지원하는 방법

KIP-618에서 Source connector의 exactly-once(일부) 지원을 건의하였고 이 내용은 3.3.0에서 처음 적용되었습니다. https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors KIP-618: Exactly-Once Support for Source Connectors - Apache Kafka - Apache Software FoundationStatus Current state: Accepted Discussion thread: here Voting thread: here JIRA: KAFKA-10000 - 이슈 세부사항 가져오는 중... 상태 , ..

빅데이터/Kafka 2025.11.23
카프카는 복제본(replica)를 자동으로 다른 브로커로 재배치하지 않습니다

브로커가 0, 1, 2, 3, 4번으로 구성된 클러스터가 있고, 특정 토픽의 파티션은 0번이 리더이며 1번과 2번에 복제본이 있다고 가정합니다. - 브로커 : 0,1,2,3,4- 토픽의 파티션 : 0(리더), 1(팔로워), 2(팔로워) 이후 브로커 2번을 graceful shutdown하거나 예상치 못한 장애로 인해 오래 오프라인 상태가 되는 상황을 떠올려볼 수 있습니다.“브로커 2번이 장시간 오프라인이면, 카프카가 자동으로 복제본을 3번이나 4번 브로커로 옮겨주지 않을까?” 복제본이 자동으로 이동한다면 가용성과 내구성이 유지될 것처럼 보입니다. 그러나 실제 Kafka는 어떻게 동작할까요.카프카의 동작브로커 2번이 1분, 10분, 1시간, 그 이상을 오프라인 상태로 유지하더라도 Kafka는 오직 두 가지..

빅데이터/Kafka 2025.11.21
KIP-932 Queues for Kafka 사용해보기(KafkaShareConsumer)

일반적으로 사용되는 아파치 카프카는 이벤트 스트림으로 파티션단위로 데이터를 처리하기 때문에 컨슈머 개수를 파티션 개수만큼 실행시켜 운영하는 것이 일반적이다. Queues for Kafka는 이와 다르게 파티션 개수보다 더 많은 컨슈머를 운영하기 위한 기능이다. https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka KIP-932: Queues for Kafka - Apache Kafka - Apache Software FoundationStatus Current state: Accepted Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qt..

빅데이터/Kafka 2025.09.12
윈도우즈 wsl2 환경에서 로컬 카프카 브로커 연동하기

윈도우즈 wsl 환경에서 ipv6를 사용할 경우 localhost 또는 127.0.0.1 접근이 안될 경우가 있습니다. 이 경우 접근을 위해 다음과 같이 셋팅을 할 경우 접근이 가능합니다. 준비물- wsl2- Ubuntu- Apache kafka 3.9.0- Intellij CE 1) Apache kafka 3.9.0 다운로드 및 압축 풀기https://kafka.apache.org/downloads 에서 3.9.0 바이너리를 다운로드 합니다.이후 다운로드 받은 압축파일을 푼다. 2) Ubuntu 터미널 실행 및 다운로드 환경 접근dvwy@dvwy:~$ cd /mnt/c/Users/choco/Downloads/kafka_2.12-3.9.0/kafka_2.12-3.9.0dvwy@dvwy:/mnt/c/Use..

빅데이터/Kafka 2025.05.25
Sent auto-creation request for Set(__consumer_offsets) to the active controller 에러 해결 방법

로컬에서 테스트 도중에.. 발생한 에러이다.Sent auto-creation request for Set(__consumer_offsets) to the active controllerSent auto-creation request for Set(__consumer_offsets) to the active controllerSent auto-creation request for Set(__consumer_offsets) to the active controllerSent auto-creation request for Set(__consumer_offsets) to the active controllerSent auto-creation request for Set(__consumer_offsets) to ..

빅데이터/Kafka 2025.04.18
카프카4.0 부터는 eager rebalancing protocol이 삭제됩니다.

https://issues.apache.org/jira/browse/KAFKA-18839 [KAFKA-18839] Drop support for eager rebalancing in Streams - ASF JIRAIn 3.1 we deprecated the EAGER protocol in Kafka Streams (see KAFKA-13439). This ticket covers actually dropping this protocol in 4.0. Note that KAFKA-8575 covers the actual task cleanup we can do once we no longer have to support eager rebalancing, whichissues.apache.org 이 이슈(..

빅데이터/Kafka 2025.02.28
[local macOS 환경] apache kafka(3.5.0기준) + redpanda/console 로 편하게 테스트 하기

로컬 환경에서 개발을 하다보면 항상 shell script로 사용하지만 좀 불편할때가 많습니다. redpanda에서는 console을 통해 apache kafka와 연동하는 웹 콘솔을 오픈소스로 제공하고 있습니다.  https://github.com/redpanda-data/console GitHub - redpanda-data/console: Redpanda Console is a developer-friendly UI for managing your Kafka/Redpanda workloads. ConsoleRedpanda Console is a developer-friendly UI for managing your Kafka/Redpanda workloads. Console gives you a s..

빅데이터/Kafka 2025.02.26
카프카에서 데이터 삭제는 어떻게 이루어 지는가>

카프카의 데이터의 삭제는 로그 세그먼트 단위로 삭제가 이루어진다. 노드(구 브로커)의 로그매니저는 시간(time) 또는 용량(size)에 따라 삭제여부를 결정한다. 시간 기반 정책에서는 레코드의 timestamp에 따라 달라진다. 해당 세그먼트 파일에 존재하는 가장 큰 timestamp 값(레코드 순서와는 무관)을 토대로 찾아낸다. 용량 기반 정책은 기본적으로 설정되지 않는다. 만약 설정된다면, 로그매니저는 가장 오래된 세그먼트 파일을 용량이 다 찼을때 부터 차례 대로 삭제하게 된다. 만약, 시간과 용량 두개의 설정이 동시에 설정된다면 먼저 도달하는 정책의 기준에 따라 삭제가 이루어진다. 언제든지 삭제가 될 수 있도록 copy-on-wirte 방식으로 세그먼트 목록을 사용한다. 이를 통해 삭제가 되는 동..

빅데이터/Kafka 2025.02.13
kafka 4.0부터는 스칼라 2.12가 더 이상 사용되지 않습니다.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218 KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0) - Apache Kafka - Apache Software FoundationStatus Current state: Adopted Discussion thread: link Vote thread: link JIRA:  KAFKA-12930 - 이슈 세부사항 가져오는 중... 상태  (3.0) KAFKA-12895 - 이슈 세부사항 가져오는 중... 상태  (4.0) Please keep the discussion on the macwiki.apache..

빅데이터/Kafka 2025.02.13
standalone 카프카(kraft모드 in local) 실행 스크립트(1줄)

standalone 카프카(kraft모드 in local) 실행을 위한 준비와 실행$ bin/kafka-storage.sh random-uuidcKUMbEGERui8cHUhwdc6XA$ bin/kafka-storage.sh format -t cKUMbEGERui8cHUhwdc6XA -c config/kraft/server.propertiesFormatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.$ bin/kafka-server-start.sh config/kraft/servblog.voidmainvoid.net지난 포스팅에서 kraft모드를 로컬에서 실행할 때 차례대로 실행해야 하는 코드를 올린적이 있었는데, 불편해서 한줄로 요약한 버전을 올..

빅데이터/Kafka 2025.02.11
trino에서 java.sql.SQLException: Unrecognized connection property 'url' 에러가 나는 이유와 해결 방법

spark jdbc를 통해 trino를 접근하기 위해 아래와 같은 코드를 짤 수 있다.import org.apache.spark.sql.{DataFrame, SparkSession}import java.util.Properties// Spark 세션 생성val spark = SparkSession.builder() .appName("Spark Trino JDBC Example") .getOrCreate()// JDBC URL 설정val jdbcUrl = "jdbc:trino://://"// Trino JDBC 속성 설정val connectionProperties = new Properties()connectionProperties.setProperty("user", "")connectionProper..

빅데이터 2024.10.16
Spark 개발시, main/resources 패키지에 hdfs-site.xml, core-site.xml 등을 넣는 이유

main/resources 디렉터리는 Apache Maven이나 SBT와 같은 빌드 도구를 사용하는 프로젝트에서 애플리케이션의 리소스 파일을 저장하는 표준 위치이다. 이 디렉터리에 배치된 파일들은 컴파일된 클래스와 함께 JAR 파일에 포함되며, 런타임에 애플리케이션에서 사용될 수 있다. HDFS 설정 파일 (hdfs-site.xml)hdfs-site.xml 파일은 Hadoop 분산 파일 시스템(HDFS)의 설정을 정의한다. 이 파일에는 HDFS 클러스터의 동작을 제어하는 다양한 구성 옵션이 포함되어 있다. 예를 들어, 네임노드의 주소, 데이터 디렉터리, 복제 수 등의 정보가 포함될 수 있다.main/resources 디렉터리에 배치하는 이유main/resources는 애플리케이션의 모든 리소스 파일(예:..

빅데이터/하둡 2024.08.13
standalone 카프카(kraft모드 in local) 실행을 위한 준비와 실행

$ bin/kafka-storage.sh random-uuidcKUMbEGERui8cHUhwdc6XA$ bin/kafka-storage.sh format -t cKUMbEGERui8cHUhwdc6XA -c config/kraft/server.propertiesFormatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.$ bin/kafka-server-start.sh config/kraft/server.properties$ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 --version3.6.2 카프카 바이너리 다운로드 링크 : https://kafka.apache.org/d..

빅데이터/Kafka 2024.06.23
아파치 플링크는 2.0 버전부터는 더이상 scala API를 지원하지 않습니다.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support FLIP-265 Deprecate and remove Scala API support - Apache Flink - Apache Software FoundationPlease keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). Motivation Apache Flink offers APIs for building your Flink application using the DataStr..

빅데이터 2024.05.10
카프카 컨슈머의 auto.offset.reset 옵션을 반드시 earliest로 변경해야 하는 이유

auto.offset.reset는 카프카 컨슈머를 다루는데 있어 아주 중요한 부분입니다. 해당 옵션이 가질 수 있는 값은 다음과 같습니다. earliest : 마지막 커밋 기록이 없을 경우, 가장 예전(낮은 번호 오프셋) 레코드부터 처리 latest : 마지막 커밋 기록이 없을 경우, 가장 최근(높은 번호 오프셋) 레코드부터 처리 none : 커밋 기록이 없을 경우 throws Exception 해당 옵션은 필수 옵션이 아닌 선택 옵션으로서 입력을 하지 않으면 자동으로 latest로 설정됩니다. 일반적으로 컨슈머를 운영할 때 이 옵션을 건드리는 경우는 거의 드문데요. 그러다보니 기본값인 latest로 설정할 경우 우리도 모르게 운영 중 데이터의 유실이 발생할 수 있다는 사실을 놓치기도 합니다. 그러다보..

빅데이터/Kafka 2024.02.05
JSON은 null을 키값으로 가질 수 있을까?

JSON을 정의할 때 null을 키로 가질 수 있을지 궁금해서 jsonlint.com을 사용하여 테스트를 진행했습니다. https://jsonlint.com/ JSON Online Validator and Formatter - JSON Lint Loading... About the JSONLint Editor JSONLint is a validator and reformatter for JSON, a lightweight data-interchange format. Copy and paste, directly type, or input a URL in the editor above and let JSONLint tidy and validate your messy JSON code. What Is json..

빅데이터 2024.01.18
카프카에서 계층 저장소(Tiered storage)가 필요한 이유

2020년 11월 Cofluent Platform 6.0이 릴리즈되면서 카프카의 계층 저장소가 처음으로 대중에게 소개되었습니다. 이전까지 카프카 오픈소스 진영에서 많이 논의 되었지만 실제로 개발자가 사용 가능한 형태로 나온 것은 이때가 처음입니다. 컨플루언트를 사용하는 많은 개발자와 회사에서는 카프카의 계층 저장소를 사용해왔습니다. 이후 2023년 10월, 오픈소스 카프카 3.6.0이 릴리즈 되면서 계층 저장소 얼리 엑세스 모드를 사용할 수 있게 되었습니다. 카프카에서 계층 저장소는 현재의 브로커 구조에서 아주 중요한 역할을 수행하게 될 것입니다. 계층 저장소는 많은 장점이 있겠으나 그 중 핵심은 비용 절감이라고 생각합니다. 비용 절감에는 크게 두가지가 있는데 컴퓨팅 리소스 비용과 휴먼 리소스 비용입니..

빅데이터/Kafka 2023.10.20
신뢰성 있는 카프카 애플리케이션을 만드는 3가지 방법

카카오 공개 기술 세미나인 kakao tech meet에서 '신뢰성 있는 카프카 애플리케이션을 만드는 3가지 방법' 에 대해 발표하는 자리를 가졌습니다. https://www.youtube.com/watch?v=7_VdIFH6M6Q 이벤트 드리븐 아키텍처와 스트림 데이터 파이프라인을 만들때 고민해야 하는 부분을 다룹니다. 프로듀서/컨슈머와 같은 카프카 애플리케이션의 전달 신뢰도를 높이기 위해서 적용할 수 있는 기술들에 대해 설명하고 내부적으로 적용했던 경험을 공유합니다. 카프카에서 다루는 신뢰도에 대해 다시 한 번 고민해보고, 내가 만든 애플리케이션이 고객에게 어떻게 가치를 줄 수 있는지 생각해볼만한 주제도 함께 알려드립니다.

빅데이터/Kafka 2023.09.22
카프카 프로듀서의 acks=all 옵션은 사실(?) 느리지 않다!

카프카 3.0 부터는 카프카 프로듀서의 acks 옵션이 all로 지정됩니다. acks=all이 기본값으로 지정된 이유 중 하나는 프로듀서와 브로커 간 통신을 멱등성(idempotence)있게 만들기 위함입니다. 프로듀서와 브로커는 acks를 통해 레코드가 전송되었는지 확인합니다. 문제는 acks가 유실되었을 경우입니다. 네트워크 상태, 브로커 상태에 따라서 언제든 유실될 수 있는 acks를 정상적으로 전송하기 위해서는 acks를 위한 acks(!)를 만들어야만 합니다. 이는 두 장군 문제 알고리즘과 동일하다고 볼 수 있습니다. 결과적으로 프로듀서의 중복 전달을 제거하기 위해 카프카 3.0부터는 enable.idempotence가 true로 변경되었고 이에 딸려오는 추가 옵션인 acks가 all로 설정되게..

빅데이터/Kafka 2023.08.08
기존에 생성된 compact topic의 cleanup.policy를 변경하는 방법

현재 cleanup.policy 확인 $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic compact- test --describe Topic: compact-testPartitionCount: 1ReplicationFactor: 1Configs: cleanup.policy=compact,segment.bytes=1073741824 Topic: compact-testPartition: 0Leader: 0Replicas: 0Isr: 0 kafka-configs.sh를 사용하여 토픽 수정 $ ./kafka-configs.sh --bootstrap-server localhost:9092 --alter --entit y-type topics --ent..

빅데이터/Kafka 2023.06.30
Compacted topic에 null key 레코드를 전송하면?

Compacted topic에 null key를 전송(produce)하면 어떻게 될까요? 1) Compacted 토픽 생성 $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic compact-test --config "cleanup.policy=compact" --create Created topic compact-test. $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic compact-test --describe Topic: compact-testPartitionCount: 1ReplicationFactor: 1Configs: cleanup.policy=compact,segment.b..

빅데이터/Kafka 2023.06.30
아파치 카프카 Exactly-once 처리의 진실과 거짓

아파치 카프카와 같은 분산 이벤트 스트리밍 플랫폼을 사용하거나 메시지 브로커를 활용하다 보면 항상 마주치는 문제는 마로 메시지 전달 시멘틱(message delivery semantic)입니다. 메시지 전달 시멘틱은 A지점에서 B지점으로 데이터를 전송할 때 어느 만큼의 신뢰도로 데이터를 전송하는지에 대한 정의입니다. 즉, 특정한 장애 상황(또는 임계치를 벗어난 상황)에서도 보증하는 데이터 전달 신뢰도라고 볼 수 있습니다. 메시지 전달 시멘틱은 크게 세가지로 나뉩니다. 적어도 한번(at least once), 많아도 한번(at most once), 정확히 한번(exactly once). '적어도 한번'은 데이터가 전달될 때 유실이 발생하지는 않지만 중복이 발생할 가능성이 있음을 뜻합니다. 아파치 카프카는 ..

빅데이터/Kafka 2023.06.20
모던 데이터 플로우: 데이터 파이프라인을 잘 운영하는 방법

https://www.confluent.io/events/kafka-summit-london-2022/modern-data-flow-data-pipelines-done-right/ Kafka Summit London 2022 Keynote | Jay Kreps, CEO, Confluent featuring Avi Perez, Wix.com Confluent is building the foundational platform for data in motion so any organization can innovate and win in a digital-first world. www.confluent.io 메이븐에서 카프카 자바 라이브러리의 사용율이 급격하게 올라가는 것을 볼 수 있음. 그만큼 카프카의 사..

빅데이터/Kafka 2023.06.12
아파치 카프카 브로커 설정에서 listener와 advertised.isteners의 차이?

Kafka 브로커의 설정에서 listener와 advertised.listeners 옵션은 다음과 같은 특징이 있다. listener는 Kafka 브로커가 클라이언트로부터 듣고있는 네트워크 인터페이스와 포트를 나타낸다. 예를 들어, listener를 PLAINTEXT://localhost:9092로 설정하면 브로커는 localhost의 9092 포트에서 PLAINTEXT 프로토콜을 사용하여 들어오는 클라이언트 연결을 수신한다. advertised.listeners는 Kafka 브로커가 클라이언트에게 알려주는 네트워크 인터페이스와 포트를 나타낸다. 클라이언트는 이 정보를 사용하여 브로커에 연결한다. 이 정보는 일반적으로 브로커가 외부에 노출되는 경우 사용된다. 예를 들어, advertised.listene..

빅데이터/Kafka 2023.03.26
windows의 WSL환경에서 아파치 카프카 설치, 실행하는 방법

1) git 설치 https://git-scm.com/download/win Git - Downloading Package Download for Windows Click here to download the latest (2.39.2) 32-bit version of Git for Windows. This is the most recent maintained build. It was released 10 days ago, on 2023-02-14. Other Git for Windows downloads Standalone Installer 32-bit Git for Win git-scm.com 상기 URL에서 windows용 git을 설치합니다. 2) WSL 환경 설치 Windows Subsystem..

빅데이터/Kafka 2023.02.24
RestHighLevelClient로 구현한 idempotence 데이터 적재

SHA-256이란? http://wiki.hash.kr/index.php/SHA256 SHA256 - 해시넷 SHA-256은 SHA(Secure Hash Algorithm) 알고리즘의 한 종류로서 256비트로 구성되며 64자리 문자열을 반환한다. SHA-256은 미국의 국립표준기술연구소(NIST; National Institute of Standards and Technology)에 의해 공표된 wiki.hash.kr SHA-256 해시 함수는 어떤 길이의 값을 입력하더라도 256비트의 고정된 결과값을 출력한다. 일반적으로 입력값이 조금만 변동하여도 출력값이 완전히 달라지기 때문에 출력값을 토대로 입력값을 유추하는 것은 거의 불가능하다. 아주 작은 확률로 입력값이 다름에도 불구하고 출력값이 같은 경우가 ..

ElasticsearchClient 7.17.7 기준 java client example code

1) host, port 주입 RestClient restClient = RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http")).build(); BulkRequest.Builder br = new BulkRequest.Builder(); ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); ElasticsearchClient esClient = new ElasticsearchClient(transport); 2) String으로 된 JSON 데이터 정의 Str..

커넥트 REST API 확장 플러그인 : Connect Rest Extension Plugin

Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework. Hence we must provide a..

빅데이터/Kafka 2022.10.04