전체 글 477

카프카 source connector가 exactly-once를 지원하는 방법

KIP-618에서 Source connector의 exactly-once(일부) 지원을 건의하였고 이 내용은 3.3.0에서 처음 적용되었습니다. https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors KIP-618: Exactly-Once Support for Source Connectors - Apache Kafka - Apache Software FoundationStatus Current state: Accepted Discussion thread: here Voting thread: here JIRA: KAFKA-10000 - 이슈 세부사항 가져오는 중... 상태 , ..

빅데이터/Kafka 2025.11.23
카프카는 복제본(replica)를 자동으로 다른 브로커로 재배치하지 않습니다

브로커가 0, 1, 2, 3, 4번으로 구성된 클러스터가 있고, 특정 토픽의 파티션은 0번이 리더이며 1번과 2번에 복제본이 있다고 가정합니다. - 브로커 : 0,1,2,3,4- 토픽의 파티션 : 0(리더), 1(팔로워), 2(팔로워) 이후 브로커 2번을 graceful shutdown하거나 예상치 못한 장애로 인해 오래 오프라인 상태가 되는 상황을 떠올려볼 수 있습니다.“브로커 2번이 장시간 오프라인이면, 카프카가 자동으로 복제본을 3번이나 4번 브로커로 옮겨주지 않을까?” 복제본이 자동으로 이동한다면 가용성과 내구성이 유지될 것처럼 보입니다. 그러나 실제 Kafka는 어떻게 동작할까요.카프카의 동작브로커 2번이 1분, 10분, 1시간, 그 이상을 오프라인 상태로 유지하더라도 Kafka는 오직 두 가지..

빅데이터/Kafka 2025.11.21
KIP-932 Queues for Kafka 사용해보기(KafkaShareConsumer)

일반적으로 사용되는 아파치 카프카는 이벤트 스트림으로 파티션단위로 데이터를 처리하기 때문에 컨슈머 개수를 파티션 개수만큼 실행시켜 운영하는 것이 일반적이다. Queues for Kafka는 이와 다르게 파티션 개수보다 더 많은 컨슈머를 운영하기 위한 기능이다. https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka KIP-932: Queues for Kafka - Apache Kafka - Apache Software FoundationStatus Current state: Accepted Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qt..

빅데이터/Kafka 2025.09.12
Openapi API 사용시 ⚠️선결제하면 안되는 이유.. - 돈낭비..

2024년 8월에 개인 프로젝트도 할겸 겸사겸사 openapi platform에서 선결제를 했었다. 그 당시 생각으로는 당장 사용하지 않더라도 차차 api key를 활용하면서 안전하게 돈이 빠져나가면 되겠다는 생각에 50달러(당시 7만원 정도)를 결제했었다.그리고 어쩌다저쩌다 하다보니 2025년이 되었고, 다시 프로젝트를 할 일이 생겨서 예전에 충전했던 50달러 사용해야지~ 하고 들어갔더니 왠걸~ 내가 사용할 수 있는 금액이 0.00 달러로 찍혀있는게 아닌가;;;그래서 내가 이걸 어디 낭비해서 다썻나..? 하고 payment history 내역을 찾아봐도 아무리 쓴 내역이 없다. 그러다가 들어간 곳은 credit grants. 여기서 수상한 state를 볼 수 있었는데 EXPIRE 라는 글자가 있는게 아..

개발이야기 2025.09.10
윈도우즈 wsl2 환경에서 로컬 카프카 브로커 연동하기

윈도우즈 wsl 환경에서 ipv6를 사용할 경우 localhost 또는 127.0.0.1 접근이 안될 경우가 있습니다. 이 경우 접근을 위해 다음과 같이 셋팅을 할 경우 접근이 가능합니다. 준비물- wsl2- Ubuntu- Apache kafka 3.9.0- Intellij CE 1) Apache kafka 3.9.0 다운로드 및 압축 풀기https://kafka.apache.org/downloads 에서 3.9.0 바이너리를 다운로드 합니다.이후 다운로드 받은 압축파일을 푼다. 2) Ubuntu 터미널 실행 및 다운로드 환경 접근dvwy@dvwy:~$ cd /mnt/c/Users/choco/Downloads/kafka_2.12-3.9.0/kafka_2.12-3.9.0dvwy@dvwy:/mnt/c/Use..

빅데이터/Kafka 2025.05.25
Sent auto-creation request for Set(__consumer_offsets) to the active controller 에러 해결 방법

로컬에서 테스트 도중에.. 발생한 에러이다.Sent auto-creation request for Set(__consumer_offsets) to the active controllerSent auto-creation request for Set(__consumer_offsets) to the active controllerSent auto-creation request for Set(__consumer_offsets) to the active controllerSent auto-creation request for Set(__consumer_offsets) to the active controllerSent auto-creation request for Set(__consumer_offsets) to ..

빅데이터/Kafka 2025.04.18
카프카4.0 부터는 eager rebalancing protocol이 삭제됩니다.

https://issues.apache.org/jira/browse/KAFKA-18839 [KAFKA-18839] Drop support for eager rebalancing in Streams - ASF JIRAIn 3.1 we deprecated the EAGER protocol in Kafka Streams (see KAFKA-13439). This ticket covers actually dropping this protocol in 4.0. Note that KAFKA-8575 covers the actual task cleanup we can do once we no longer have to support eager rebalancing, whichissues.apache.org 이 이슈(..

빅데이터/Kafka 2025.02.28
[local macOS 환경] apache kafka(3.5.0기준) + redpanda/console 로 편하게 테스트 하기

로컬 환경에서 개발을 하다보면 항상 shell script로 사용하지만 좀 불편할때가 많습니다. redpanda에서는 console을 통해 apache kafka와 연동하는 웹 콘솔을 오픈소스로 제공하고 있습니다.  https://github.com/redpanda-data/console GitHub - redpanda-data/console: Redpanda Console is a developer-friendly UI for managing your Kafka/Redpanda workloads. ConsoleRedpanda Console is a developer-friendly UI for managing your Kafka/Redpanda workloads. Console gives you a s..

빅데이터/Kafka 2025.02.26
카프카에서 데이터 삭제는 어떻게 이루어 지는가>

카프카의 데이터의 삭제는 로그 세그먼트 단위로 삭제가 이루어진다. 노드(구 브로커)의 로그매니저는 시간(time) 또는 용량(size)에 따라 삭제여부를 결정한다. 시간 기반 정책에서는 레코드의 timestamp에 따라 달라진다. 해당 세그먼트 파일에 존재하는 가장 큰 timestamp 값(레코드 순서와는 무관)을 토대로 찾아낸다. 용량 기반 정책은 기본적으로 설정되지 않는다. 만약 설정된다면, 로그매니저는 가장 오래된 세그먼트 파일을 용량이 다 찼을때 부터 차례 대로 삭제하게 된다. 만약, 시간과 용량 두개의 설정이 동시에 설정된다면 먼저 도달하는 정책의 기준에 따라 삭제가 이루어진다. 언제든지 삭제가 될 수 있도록 copy-on-wirte 방식으로 세그먼트 목록을 사용한다. 이를 통해 삭제가 되는 동..

빅데이터/Kafka 2025.02.13
kafka 4.0부터는 스칼라 2.12가 더 이상 사용되지 않습니다.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218 KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0) - Apache Kafka - Apache Software FoundationStatus Current state: Adopted Discussion thread: link Vote thread: link JIRA:  KAFKA-12930 - 이슈 세부사항 가져오는 중... 상태  (3.0) KAFKA-12895 - 이슈 세부사항 가져오는 중... 상태  (4.0) Please keep the discussion on the macwiki.apache..

빅데이터/Kafka 2025.02.13
standalone 카프카(kraft모드 in local) 실행 스크립트(1줄)

standalone 카프카(kraft모드 in local) 실행을 위한 준비와 실행$ bin/kafka-storage.sh random-uuidcKUMbEGERui8cHUhwdc6XA$ bin/kafka-storage.sh format -t cKUMbEGERui8cHUhwdc6XA -c config/kraft/server.propertiesFormatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.$ bin/kafka-server-start.sh config/kraft/servblog.voidmainvoid.net지난 포스팅에서 kraft모드를 로컬에서 실행할 때 차례대로 실행해야 하는 코드를 올린적이 있었는데, 불편해서 한줄로 요약한 버전을 올..

빅데이터/Kafka 2025.02.11
trino에서 java.sql.SQLException: Unrecognized connection property 'url' 에러가 나는 이유와 해결 방법

spark jdbc를 통해 trino를 접근하기 위해 아래와 같은 코드를 짤 수 있다.import org.apache.spark.sql.{DataFrame, SparkSession}import java.util.Properties// Spark 세션 생성val spark = SparkSession.builder() .appName("Spark Trino JDBC Example") .getOrCreate()// JDBC URL 설정val jdbcUrl = "jdbc:trino://://"// Trino JDBC 속성 설정val connectionProperties = new Properties()connectionProperties.setProperty("user", "")connectionProper..

빅데이터 2024.10.16
Spark 개발시, main/resources 패키지에 hdfs-site.xml, core-site.xml 등을 넣는 이유

main/resources 디렉터리는 Apache Maven이나 SBT와 같은 빌드 도구를 사용하는 프로젝트에서 애플리케이션의 리소스 파일을 저장하는 표준 위치이다. 이 디렉터리에 배치된 파일들은 컴파일된 클래스와 함께 JAR 파일에 포함되며, 런타임에 애플리케이션에서 사용될 수 있다. HDFS 설정 파일 (hdfs-site.xml)hdfs-site.xml 파일은 Hadoop 분산 파일 시스템(HDFS)의 설정을 정의한다. 이 파일에는 HDFS 클러스터의 동작을 제어하는 다양한 구성 옵션이 포함되어 있다. 예를 들어, 네임노드의 주소, 데이터 디렉터리, 복제 수 등의 정보가 포함될 수 있다.main/resources 디렉터리에 배치하는 이유main/resources는 애플리케이션의 모든 리소스 파일(예:..

빅데이터/하둡 2024.08.13
standalone 카프카(kraft모드 in local) 실행을 위한 준비와 실행

$ bin/kafka-storage.sh random-uuidcKUMbEGERui8cHUhwdc6XA$ bin/kafka-storage.sh format -t cKUMbEGERui8cHUhwdc6XA -c config/kraft/server.propertiesFormatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.$ bin/kafka-server-start.sh config/kraft/server.properties$ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 --version3.6.2 카프카 바이너리 다운로드 링크 : https://kafka.apache.org/d..

빅데이터/Kafka 2024.06.23
scala에러 Unable to make private java.nio.DirectByteBuffer 해결 방법

Unable to make private java.nio.DirectByteBuffer(long,int) accessible: module java.base does not "opens java.nio" 와 같은 코드가 떠서 놀랬다. intellij에서 sbt로 빌드한 스칼라코드가 실행되지 않을 때가 있는데, 이 오류는 jvm컴파일 버전 때문에 생기는 이슈이다.  https://stackoverflow.com/questions/70153343/unable-to-make-private-java-nio-directbytebufferlong-int-accessible Unable to make private java.nio.DirectByteBuffer(long,int) accessibleI'm using P..

아파치 플링크는 2.0 버전부터는 더이상 scala API를 지원하지 않습니다.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support FLIP-265 Deprecate and remove Scala API support - Apache Flink - Apache Software FoundationPlease keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). Motivation Apache Flink offers APIs for building your Flink application using the DataStr..

빅데이터 2024.05.10
Redis-go를 interface화 하여 사용하기

golang에서 Redis를 사용할 때는 보통 go-redis 라이브러리를 많이 사용합니다. 이 때, 바로 메서드를 사용하기 보다는 내부 비즈니스 로직이 포함된 RedisClient 인터페이스를 만들어 사용하곤 하는데요. 아래는 그 예시 입니다. redis.go package main import ( "context" "fmt" "github.com/go-redis/redis/v8" // go-redis import ) // RedisClientInterface 인터페이스 정의 type RedisClientInterface interface { Set(key string, value interface{}) error Get(key string) (string, error) } // RedisClient ..

카프카 컨슈머의 auto.offset.reset 옵션을 반드시 earliest로 변경해야 하는 이유

auto.offset.reset는 카프카 컨슈머를 다루는데 있어 아주 중요한 부분입니다. 해당 옵션이 가질 수 있는 값은 다음과 같습니다. earliest : 마지막 커밋 기록이 없을 경우, 가장 예전(낮은 번호 오프셋) 레코드부터 처리 latest : 마지막 커밋 기록이 없을 경우, 가장 최근(높은 번호 오프셋) 레코드부터 처리 none : 커밋 기록이 없을 경우 throws Exception 해당 옵션은 필수 옵션이 아닌 선택 옵션으로서 입력을 하지 않으면 자동으로 latest로 설정됩니다. 일반적으로 컨슈머를 운영할 때 이 옵션을 건드리는 경우는 거의 드문데요. 그러다보니 기본값인 latest로 설정할 경우 우리도 모르게 운영 중 데이터의 유실이 발생할 수 있다는 사실을 놓치기도 합니다. 그러다보..

빅데이터/Kafka 2024.02.05
json value가 null일때 golang은 Unmarshal을 잘 할 수 있을까?

golang은 JSON Format 데이터를 struct로 변환할 수 있는데 이것을 Unmarshal이라고 부릅니다. 예를 들어 다음과 같은 함수로 구현될 수 있습니다. func getStructFromJSON(jsonData string, valuePtr interface{}) error { return json.Unmarshal([]byte(jsonData), valuePtr) } jsonData : json포맷으로된 String 데이터 valuePtr : struct타입 예를 들어 다음과 같이 호출할 수 있습니다. type SampleStruct struct { A int32 `json:"a"` B int32 `json:"b"` } func main() { var valuePtr SampleStru..

JSON은 null을 키값으로 가질 수 있을까?

JSON을 정의할 때 null을 키로 가질 수 있을지 궁금해서 jsonlint.com을 사용하여 테스트를 진행했습니다. https://jsonlint.com/ JSON Online Validator and Formatter - JSON Lint Loading... About the JSONLint Editor JSONLint is a validator and reformatter for JSON, a lightweight data-interchange format. Copy and paste, directly type, or input a URL in the editor above and let JSONLint tidy and validate your messy JSON code. What Is json..

빅데이터 2024.01.18
map[string]interface 데이터를 avro 포맷으로 파일 저장하는 방법

package main import ( "encoding/json" "fmt" "github.com/linkedin/goavro" "os" "time" ) func main() { // Avro 스키마 정의 schemaJSON := `{ "type": "record", "name": "Example", "fields": [ {"name": "username", "type": "string"}, {"name": "age", "type": "int"} ] }` codec, err := goavro.NewCodec(schemaJSON) if err != nil { panic(err) } // 파일 생성 file, err := os.Create("avro_data.avro") if err != nil { pan..

카프카에서 계층 저장소(Tiered storage)가 필요한 이유

2020년 11월 Cofluent Platform 6.0이 릴리즈되면서 카프카의 계층 저장소가 처음으로 대중에게 소개되었습니다. 이전까지 카프카 오픈소스 진영에서 많이 논의 되었지만 실제로 개발자가 사용 가능한 형태로 나온 것은 이때가 처음입니다. 컨플루언트를 사용하는 많은 개발자와 회사에서는 카프카의 계층 저장소를 사용해왔습니다. 이후 2023년 10월, 오픈소스 카프카 3.6.0이 릴리즈 되면서 계층 저장소 얼리 엑세스 모드를 사용할 수 있게 되었습니다. 카프카에서 계층 저장소는 현재의 브로커 구조에서 아주 중요한 역할을 수행하게 될 것입니다. 계층 저장소는 많은 장점이 있겠으나 그 중 핵심은 비용 절감이라고 생각합니다. 비용 절감에는 크게 두가지가 있는데 컴퓨팅 리소스 비용과 휴먼 리소스 비용입니..

빅데이터/Kafka 2023.10.20
신뢰성 있는 카프카 애플리케이션을 만드는 3가지 방법

카카오 공개 기술 세미나인 kakao tech meet에서 '신뢰성 있는 카프카 애플리케이션을 만드는 3가지 방법' 에 대해 발표하는 자리를 가졌습니다. https://www.youtube.com/watch?v=7_VdIFH6M6Q 이벤트 드리븐 아키텍처와 스트림 데이터 파이프라인을 만들때 고민해야 하는 부분을 다룹니다. 프로듀서/컨슈머와 같은 카프카 애플리케이션의 전달 신뢰도를 높이기 위해서 적용할 수 있는 기술들에 대해 설명하고 내부적으로 적용했던 경험을 공유합니다. 카프카에서 다루는 신뢰도에 대해 다시 한 번 고민해보고, 내가 만든 애플리케이션이 고객에게 어떻게 가치를 줄 수 있는지 생각해볼만한 주제도 함께 알려드립니다.

빅데이터/Kafka 2023.09.22
카프카 프로듀서의 acks=all 옵션은 사실(?) 느리지 않다!

카프카 3.0 부터는 카프카 프로듀서의 acks 옵션이 all로 지정됩니다. acks=all이 기본값으로 지정된 이유 중 하나는 프로듀서와 브로커 간 통신을 멱등성(idempotence)있게 만들기 위함입니다. 프로듀서와 브로커는 acks를 통해 레코드가 전송되었는지 확인합니다. 문제는 acks가 유실되었을 경우입니다. 네트워크 상태, 브로커 상태에 따라서 언제든 유실될 수 있는 acks를 정상적으로 전송하기 위해서는 acks를 위한 acks(!)를 만들어야만 합니다. 이는 두 장군 문제 알고리즘과 동일하다고 볼 수 있습니다. 결과적으로 프로듀서의 중복 전달을 제거하기 위해 카프카 3.0부터는 enable.idempotence가 true로 변경되었고 이에 딸려오는 추가 옵션인 acks가 all로 설정되게..

빅데이터/Kafka 2023.08.08
기존에 생성된 compact topic의 cleanup.policy를 변경하는 방법

현재 cleanup.policy 확인 $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic compact- test --describe Topic: compact-testPartitionCount: 1ReplicationFactor: 1Configs: cleanup.policy=compact,segment.bytes=1073741824 Topic: compact-testPartition: 0Leader: 0Replicas: 0Isr: 0 kafka-configs.sh를 사용하여 토픽 수정 $ ./kafka-configs.sh --bootstrap-server localhost:9092 --alter --entit y-type topics --ent..

빅데이터/Kafka 2023.06.30
Compacted topic에 null key 레코드를 전송하면?

Compacted topic에 null key를 전송(produce)하면 어떻게 될까요? 1) Compacted 토픽 생성 $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic compact-test --config "cleanup.policy=compact" --create Created topic compact-test. $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic compact-test --describe Topic: compact-testPartitionCount: 1ReplicationFactor: 1Configs: cleanup.policy=compact,segment.b..

빅데이터/Kafka 2023.06.30