전체 글 477

아파치 카프카 Exactly-once 처리의 진실과 거짓

아파치 카프카와 같은 분산 이벤트 스트리밍 플랫폼을 사용하거나 메시지 브로커를 활용하다 보면 항상 마주치는 문제는 마로 메시지 전달 시멘틱(message delivery semantic)입니다. 메시지 전달 시멘틱은 A지점에서 B지점으로 데이터를 전송할 때 어느 만큼의 신뢰도로 데이터를 전송하는지에 대한 정의입니다. 즉, 특정한 장애 상황(또는 임계치를 벗어난 상황)에서도 보증하는 데이터 전달 신뢰도라고 볼 수 있습니다. 메시지 전달 시멘틱은 크게 세가지로 나뉩니다. 적어도 한번(at least once), 많아도 한번(at most once), 정확히 한번(exactly once). '적어도 한번'은 데이터가 전달될 때 유실이 발생하지는 않지만 중복이 발생할 가능성이 있음을 뜻합니다. 아파치 카프카는 ..

빅데이터/Kafka 2023.06.20
[개발자를 넘어 기술 리더로 가는 길]읽고 정리

타냐 라일리 https://www.linkedin.com/in/tanyareilly/ 타냐 라일리는 중간급 엔지니어가 되었을 때 매니저가 되고 싶지 않았고 '스태프 엔지니어의 길'로 가고 싶었다. 기술 전략 수립, 조직을 성공적으로 이끌기 위한 방법에 대해 고민했고 이에 대한 내용을 정리했다. 두 가지 진로 개발자는 크게 두가지 진로가 있다. 1) 매니저 - 명확하게 소통 - 위기 상황에서 침착함 유지 - 동료들이 더 나은 일을 할 수 있도록 도움 - 많은 사례들이 나와 있음 2) 스태프 엔지니어(기술 리더) - 불분명한 길 타냐 라일리가 생각하는 스태프 엔지니어의 필요한 역할 3가지 1) 빅 픽처 관점의 사고력 : 현재 상황에 대해 인지하고 기업에 필요한 것이 무엇인지 파악하여 n년 단위 프로젝트 진행..

개발이야기 2023.06.19
모던 데이터 플로우: 데이터 파이프라인을 잘 운영하는 방법

https://www.confluent.io/events/kafka-summit-london-2022/modern-data-flow-data-pipelines-done-right/ Kafka Summit London 2022 Keynote | Jay Kreps, CEO, Confluent featuring Avi Perez, Wix.com Confluent is building the foundational platform for data in motion so any organization can innovate and win in a digital-first world. www.confluent.io 메이븐에서 카프카 자바 라이브러리의 사용율이 급격하게 올라가는 것을 볼 수 있음. 그만큼 카프카의 사..

빅데이터/Kafka 2023.06.12
golang struct type을 JSON으로 Print 하기

package main import ( "encoding/json" "fmt" ) type Person struct { Name string Age int } func main() { myStruct := Person{"dvwy", 145} fmt.Println("=======") fmt.Println(myStruct) fmt.Println(prettyPrint(myStruct)) } func prettyPrint(i interface{}) string { s, _ := json.MarshalIndent(i, "", "\t") return string(s) } 결과는 다음과 같습니다. {dvwy 145} { "Name": "dvwy", "Age": 145 } 출처 : https://stackoverflo..

아파치 카프카 브로커 설정에서 listener와 advertised.isteners의 차이?

Kafka 브로커의 설정에서 listener와 advertised.listeners 옵션은 다음과 같은 특징이 있다. listener는 Kafka 브로커가 클라이언트로부터 듣고있는 네트워크 인터페이스와 포트를 나타낸다. 예를 들어, listener를 PLAINTEXT://localhost:9092로 설정하면 브로커는 localhost의 9092 포트에서 PLAINTEXT 프로토콜을 사용하여 들어오는 클라이언트 연결을 수신한다. advertised.listeners는 Kafka 브로커가 클라이언트에게 알려주는 네트워크 인터페이스와 포트를 나타낸다. 클라이언트는 이 정보를 사용하여 브로커에 연결한다. 이 정보는 일반적으로 브로커가 외부에 노출되는 경우 사용된다. 예를 들어, advertised.listene..

빅데이터/Kafka 2023.03.26
spring boot HttpServletRequest 사용 쿠키 구현시 HttpSession.getAttribute(String) is null 에러 발생 대응

spring boot에서 쿠키/세션 로그인 기능 구현시 다음과 같은 에러를 만났습니다 ...HttpSession.getAttribute(String)" is null 분명히 다음과 같이 attribute를 제대로 설정했는데 왜그랬을까요? HttpSession session = request.getSession(true); session.setAttribute("LOGIN_MEMBER", 123); 이유는 다음과 같습니다. 제가 가진 특수한 상황 때문이였는데요. - server : localhost:8080 - client : localhost:3000 - client는 server로 axios 요청을 함 위 상황은 여러가지 문제가 있었고 해결방안은 다음과 같습니다. 1) CORS 문제 크로스 도메인에 대..

golang prviate repository에서 디펜던시 가져오는 방법

golang private repository를 하위 모듈로 가져올 때 그냥 import를 실행하고 나면 다음과 같은 이슈가 종종 발생하곤합니다. $ go get github.com/AndersonChoi/my-private-repo go: downloading github.com/AndersonChoi/my-private-repo v0.0.0-20230306053459-dec1333da9d3 go: github.com/AndersonChoi/my-private-repo@v0.0.0-20230306053459-dec1333da9d3: verifying module: github.com/AndersonChoi/my-private-repo@v0.0.0-20230306053459-dec1333da9d3: re..

goroutine 함수 여러번 실행 결과값 기다리는 2가지 방법 - js callback 처럼

WaitGroup을 사용하는 방법 1) sync.WaitGroup을 사용하여 대기 2) go func 사용하여 goroutine 함수 호출 위 2가지 방법을 사용하면 여러번 실행된 결과값을 병렬 처리한 뒤 한개의 결과값으로 추출할 수 있습니다. 예제 코드는 다음과 같습니다. package main import ( "fmt" "math/rand" "strconv" "sync" "time" ) func main() { result := 0 count := 10 var wg sync.WaitGroup // WaitGroup 선언 wg.Add(count) // Wait 개수 지정 for i := 1; i

go gin framework graceful shutdown 예제

https://gin-gonic.com/docs/examples/graceful-restart-or-stop/ Graceful restart or stop Do you want to graceful restart or stop your web server? There are some ways this can be done. We can use fvbock/endless to replace the default ListenAndServe. Refer issue #296 for more details. router := gin.Default() router.GET("/", handler) // [...] end gin-gonic.com gin은 golang에서 많이 쓰이는 웹프레임워크입니다. graceful..

windows의 WSL환경에서 아파치 카프카 설치, 실행하는 방법

1) git 설치 https://git-scm.com/download/win Git - Downloading Package Download for Windows Click here to download the latest (2.39.2) 32-bit version of Git for Windows. This is the most recent maintained build. It was released 10 days ago, on 2023-02-14. Other Git for Windows downloads Standalone Installer 32-bit Git for Win git-scm.com 상기 URL에서 windows용 git을 설치합니다. 2) WSL 환경 설치 Windows Subsystem..

빅데이터/Kafka 2023.02.24
intellij에서 golang 프로젝트 인식이 잘 안될때

intellij에서 golang을 최초로 실행 시키고 난뒤에 $ go mod init myapplication 프로젝트가 정상적으로 라이브러리들을 Import시키지 못할 때가 있습니다. 그럴 경우에는 cmd + , 를 눌러서 Preferences로 간 뒤 Go Modules의 Enable Go modules integration을 체크하면 싱크됩니다. 싱크가 완료된 go.mod 파일 모습 module myapplication go 1.19 require ( github.com/gin-gonic/gin v1.9.0 github.com/prometheus/client_golang v1.14.0 ) require ( github.com/beorn7/perks v1.0.1 // indirect github.co..

RestHighLevelClient로 구현한 idempotence 데이터 적재

SHA-256이란? http://wiki.hash.kr/index.php/SHA256 SHA256 - 해시넷 SHA-256은 SHA(Secure Hash Algorithm) 알고리즘의 한 종류로서 256비트로 구성되며 64자리 문자열을 반환한다. SHA-256은 미국의 국립표준기술연구소(NIST; National Institute of Standards and Technology)에 의해 공표된 wiki.hash.kr SHA-256 해시 함수는 어떤 길이의 값을 입력하더라도 256비트의 고정된 결과값을 출력한다. 일반적으로 입력값이 조금만 변동하여도 출력값이 완전히 달라지기 때문에 출력값을 토대로 입력값을 유추하는 것은 거의 불가능하다. 아주 작은 확률로 입력값이 다름에도 불구하고 출력값이 같은 경우가 ..

ElasticsearchClient 7.17.7 기준 java client example code

1) host, port 주입 RestClient restClient = RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http")).build(); BulkRequest.Builder br = new BulkRequest.Builder(); ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); ElasticsearchClient esClient = new ElasticsearchClient(transport); 2) String으로 된 JSON 데이터 정의 Str..

커넥트 REST API 확장 플러그인 : Connect Rest Extension Plugin

Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework. Hence we must provide a..

빅데이터/Kafka 2022.10.04
카프카 커넥터의 태스크에 Priority를 부여할 수 없을까?

관련 지라 : KAFKA-5741 https://issues.apache.org/jira/browse/KAFKA-5741 [KAFKA-5741] Prioritize threads in Connect distributed worker process - ASF JIRA Connect's distributed worker process uses the DistributedHerder to perform all administrative operations, including: starting, stopping, pausing, resuming, reconfiguring connectors; rebalancing; etc. The DistributedHerder uses a single threaded exec..

빅데이터/Kafka 2022.10.04
토픽을 GlobalKTable 구체화된 뷰(view) 키-값 저장소로 사용시 특이점 및 주의사항

https://blog.voidmainvoid.net/442 카프카 스트림즈 KTable로 선언한 토픽을 key-value 테이블로 사용하기 카프카 스트림즈의 KTable은 토픽의 데이터를 key-value형태로 사용할 수 있도록 구체화된 뷰(Materialized View)를 제공합니다. 구현방법은 다음과 같습니다. 0. 카프카 스트림즈 디펜던시 추가 dependencies blog.voidmainvoid.net 토픽의 데이터를 키-값 저장소로 사용하여 데이터를 조회할 수 있는 방법을 앞서 다루어 보았습니다. 아래는 응용하여 만든 GlobalKTable의 뷰 입니다. StreamsBuilder builder = new StreamsBuilder(); GlobalKTable addressTable = b..

빅데이터/Kafka 2022.09.15
카프카 커넥트/커넥터 내부 살펴보기 - 2.8기준, sinkTask 위주로

카프카 커넥트는 data sink/source를 위한 파이프라인을 운영하기 위해 만들어진 모듈입니다. 크게 두가지 Connect 타입을 지원하고 있습니다. standalone부터 distributed까지 코드를 보면서 내부 구조를 살펴보겠습니다. 우선 살펴봐야할 것은 cli입니다. ConnectDistributed.java 또는 ConnectStandalone.java에서 시작합니다. 커넥트를 실행할 때는 다음과 같은 명령어로 실행하기 때문에 위 자바 파일이 시작점이라 볼 수 있습니다. // standalone일 경우 $ bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.proper..

빅데이터/Kafka 2022.08.30
ConnectException: Exiting WorkerSinkTask due to unrecoverable exception 이슈란?

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtim..

빅데이터/Kafka 2022.08.03
WINDOW STORE CHANGE LOG ADDITIONAL RETENTION MS CONFIG 옵션 분석

/** {@code windowstore.changelog.additional.retention.ms} */ @SuppressWarnings("WeakerAccess") public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. A..

빅데이터/Kafka 2022.06.15
초급자를 위한 [아파치 카프카 애플리케이션 개발]온라인 강의를 출시하였습니다.

2022년 5월! 온라인 교육 사이트인 인프런에서 [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! 강의를 출시하였습니다. 강의 링크 : https://bit.ly/3PsEAt8 [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! - 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고 실습하는 시간 www.inflearn.com - 강의 이름 : [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! - 강의 설명 : 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그..

빅데이터/Kafka 2022.06.02
카프카를 활용한레이싱카 센서 실시간 수집 데이터 파이프라인 구축

오늘은 카레이싱에서 자동차에서 출력되는 여러 지표들을 수집하는 데이터 파이프라인을 만들어 보겠습니다. F1, WRC를 비롯한 다양한 카레이싱 팀에서는 데이터의 수집과 활용이 우승으로 가는 지름길인 것은 이미 널리 알려져 있습니다. 2021 F1 의 월드 컨스트럭터 챔피언인 메르세데스-AMG 페트로나스 포뮬러 원팀은 데이터를 잘 활용하는 팀 중 하나입니다. 페트로나스 포뮬러팀에서 수집하는 F1 자동차의 센서는 5만개에 달하며 한경기당 300기가바이트, 모든 경기 동안에는 테라바이트에 달하는 데이터가 쌓이게 된다고 합니다. F1 경기는 총 22번 경기를 뛰게 되는데 자체 프랙티스 세션까지 합치면 몇백 테라바이트 이상의 데이터가 모이게 됩니다. 이렇게 모아진 데이터는 다음 경기를 위해 분석하고 경주용 F1 머..

빅데이터/Kafka 2022.05.02
Kafka Producer JMX exporter 사용하기

이 글은 오픈소스 아파치 카프카 공식 자바 라이브러리를 사용하여 개발할 때 JMX exporter를 사용하여 producer의 지표를 수집하기 위한 글입니다. 1) KafkaProducer 애플리케이션 개발 build.gradle 코드 plugins { id 'java' } group 'com.example' version '1.0' repositories { mavenCentral() } dependencies { compile 'org.apache.kafka:kafka-clients:2.5.0' compile 'org.slf4j:slf4j-simple:1.7.30' } task uberJar(type: Jar) { from sourceSets.main.output dependsOn configurat..

빅데이터/Kafka 2022.04.19
대규모 데이터의 카프카 프로듀서 성능 향상 방법

카프카 프로듀서는 acks, linger.ms, batch.size 조절을 통해 성능 향상을 달성 할 수 있습니다. 각 옵션별로 한계점과 성능 향상 방법을 알아보겠습니다. 여기서는 대규모 데이터가 들어오는 것을 가정하였습니다. 가정사항 - Record의 메시지 크기 10Kbytes - 레코드 유입량 : 1000tps acks acks는 카프카 프로듀서로 전송한 레코드가 정상적으로 리더 또는 팔로워 파티션에 적재되었는지 확인하는 역할을 합니다. 0으로 설정할 경우 모든 전송 케이스에 대해 성공으로 처리하고 1의 경우 리더 파티션에 적재되었을 경우 성공으로 처리합니다. all(-1)로 설정할 경우에는 리더와 팔로워 파티션(min.insync.replicas)에 적재가 완료되었을 경우 성공으로 처리하지만 al..

빅데이터/Kafka 2022.04.14
카프카 커넥트의 태스크 밸런싱 로직, DistributedHerder(양치기) 그리고 IncrementalCooperativeAssignor 내부 동작 소개

Herder; 명사 1. 양치기, 목부 카프카 커넥트는 워커, 커넥터, 태스크로 이루어져 있습니다. 워커는 카프카 커넥트 프로세스를 뜻하며 커넥터와 태스크를 실행시키기 위한 프로세스입니다. 커넥터는 태스크를 실행하는 관리도구로서 여러 태스크를 하나의 파이프라인으로 라이프 사이클을 관리합니다. 태스크는 데이터를 실질적으로 처리하는 부분이라고 볼 수 있습니다. 커넥터에는 1개 이상의 태스크가 포함되며 각 태스크는 프로듀서 또는 컨슈머 역할을 수행합니다. 일반적으로 분산모드 커넥트를 운영할 때 커넥터를 실행할 경우 태스크는 여러 워커에서 분산해서 실행됩니다. 예를 들어 5개의 워커로 이루어진 분산 모드 커넥트를 실행하고 7개의 태스크를 가진 커넥터를 실행하면 다음과 같이 할당됩니다. [worker-0] - [..

빅데이터/Kafka 2022.03.23