빅데이터/Kafka 125

특정 시점(날짜+시간)의 레코드부터 가져오도록 설정하기.

카프카의 토픽에 저장된 레코드는 0.11.0.0 이후부터 timestamp가 저장됩니다. 그러므로 kafka-consumer-group.sh 스크립트로 오프셋을 직접 지정할 수 있는데 --to-datetime을 사용하면 특정 시점(시간) 데이터부터 가져가도록 설정할 수 있습니다. Example) 토픽의 레코드가 다음과 같이 있다고 가정합니다. ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1614748497034, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = ..

빅데이터/Kafka 2021.03.03
아파치 카프카를 데이터 레이크로 사용할 수 있을까?

데이터 레이크란? 데이터 레이크의 기본 개념은 기업단위의 서비스 데이터들을 한곳의 저장공간에 모아 두는 것입니다. 이렇게 저장된 데이터로부터 BI(Business Intelligence) App으로 조회하거나 시각화, 머신러닝을 수행하였습니다. 데이터 웨어하우스와 다르게 '일단 저장'하고 나서 이후에 스키마를 적용하는 'Schema-on-Read'에 초점을 잡고 있습니다. 데이터 레이크라는 단어가 나온 이래로 지속 발전을 거듭했고 데이터 레이크 기술은 다음과 같이 발전 하였습니다. - 1세대 데이터 레이크 : HDFS, 맵리듀스, 피그, 하이브, 임팔라, 플룸, 스쿱 - 2세대 데이터 레이크 : 클라우드 네이티브로 성장하였고 오브젝트 스토리지(S3), 스파크, 플링크, 프레스토, 스트림셋 스트리밍 데이터..

빅데이터/Kafka 2021.02.25
macOS에서 카프카 버로우 빌드 및 실행하기.

1. golang 설치 $ brew install go 2. burrow clone from github $ git clone https://github.com/linkedin/Burrow.git 3. go build, install $ cd to the source directory. $ go mod tidy $ go install 3. run $ $GOPATH/bin/Burrow --config-dir=/path/containing/config 버로우를 실행할 때 주의할점은 --config-dir은 말그대로 디렉토리를 설정해야한다. 파일을 설정하면 안된다. burrow.toml이 포함된 디렉토리를 설정한다. 로컬호스트에서 카프카, 주키퍼를 실행할 경우 아래와 같이 burrow.toml을 설정한다. [..

빅데이터/Kafka 2020.11.29
카프카 토픽의 오프셋 최대 크기는 얼마일까?

카프카의 토픽에는 파티션이 있습니다. 프로듀서가 레코드를 파티션에 저장하면 각 레코드에는 오프셋이라고 불리는 고유한 번호가 붙게 되는데요. 오프셋 번호가 최대값에 도달하면 어떻게 처리해야하는지 의문을 가질 수 있습니다. 오프셋 번호의 최대값에 대해서 확인해 보았는데요. 오프셋은 int64로 지정되어 있었습니다. private static final Field.Int64 OFFSET = new Field.Int64("offset", "The offset found"); 자바에서 int64의 최대값은 9,223,372,036,854,775,807 입니다. 9,223,372,036,854,775,807는 하루에 1조개의 record를 쌓더라도 25,269년이 걸리는 어마어마한 값입니다. 게다가 파티션을 1개만..

빅데이터/Kafka 2020.11.17
카프카 커넥터 빌드시 JDK11이 아닌 JDK8로 그래들 빌드해야합니다.

카프카 커넥터를 직접 개발해야할 때가 있습니다. 이때 jar를 만들어서 카프카 커넥트에 커스텀 커넥터를 추가하곤 하는데요. 커넥터를 위한 jar을 만들때 JDK11이 아닌 JDK8 GRADLE JVM으로 반드시 빌드해야합니다. JDK11기반 GRADLE JVM으로 빌드하면 kafka 2.6.0 기준 카프카 커넥트에서는 정상적으로 커넥터를 추가하지 못합니다. 카프카 커넥터용 우버 JAR만드는 방법 jar { from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } }

빅데이터/Kafka 2020.11.02
토픽의 메시지 값을 직렬화/역직렬화가 정상적으로 이루어지지 않는 경우 테스트

토픽의 메시지 값 또는 메시지 키는 직렬화하여 저장되어 있다. 만약 직렬화와 역직렬화 포맷이 다르면 어떻게 될까? - test 토픽 - 직렬화 : StringStringSerializer - 역직렬화 : UUIDDeserializer 의도적으로 직렬화 포맷과 역직렬화 포맷을 다르게 하여 테스트를 진행합니다. 1. 프로듀서 public class SimpleProducer { private static String TOPIC_NAME = "test"; private static String BOOTSTRAP_SERVERS = "localhost:9092"; public static void main(String[] args) { Properties configs = new Properties(); conf..

빅데이터/Kafka 2020.10.15
정말정말 간단한 스프링 카프카 컨슈머 애플리케이션 예제

스프링 카프카는 카프카를 스프링에서 쉽게 사용할 수 있도록 하는 라이브러리입니다. 스프링 카프카를 통해 컨슈머를 만드는 가장 간단한 코드를 공유합니다. 준비물 - 그래들 - 스프링부트 - 인텔리제이 디렉토리구조 ├── build.gradle ├── settings.gradle └── src └── main ├── java │ └── com │ └── test │ └── Main.java └── resources └── application.yaml build.gradle plugins { id 'java' } group 'org.example' version '1.0-SNAPSHOT' repositories { mavenCentral() } dependencies { compile 'org.springf..

빅데이터/Kafka 2020.10.13
Reactive를 품은 스프링 카프카 시청 정리 자료

아래 영상자료를 보고 정리한 글입니다. 1. 카프카 그리고 스프링 카프카는 현대의 메시지드리븐아키텍처의 핵심 플랫폼. 다양한 종류로 사용 가능 스프링에서 카프카를 사용하는 방법은 아주 쉬워졌음 마치 webMVC에서 request패턴 사용한것처럼 사용. 하지만 동시성문제는 어떻게 해결할까? 파티션을 나눠서 병렬처리는 가능. 리액티브프로그래밍의 본질은 어떤 대상을 Async하게 다루는 것. Async하게 다루는것을 스트림으로 처리하겠다. Flux로 비동기 처리하는 것이 핵심. 프로듀서를 생성한 코드 컨슈머 코드 스프링카프카에서 리액트 사용 방법 2. 적용 프로젝트 관찰대상 서버가 다수가 되면서 모든 데이터를 처리하는 것이 중요 최악의 경우를 고민해야함.. - 1,000대이상의 서버에서 동시다발적으로 이벤트 ..

빅데이터/Kafka 2020.09.26
카프카 커넥트 JMX + 로그스태시로 모니터링 하기

카프카 커넥트(분산모드)는 기본 JMX포트를 통해 커넥트 애플리케이션의 상태를 모니터링 수집 애플리케이션에 데이터를 전송할 수 있습니다. 오늘은 카프카 커넥트의 JMX포트를 열고 로그스태시로 JMX로 지표를 수집하는 방법에 대해 알아보겠습니다. 1. 카프카 커넥트 JMX포트 열기 $ export JMX_PORT=10000 우선 JMX포트를 열기위해 환경변수로 선언합니다. 2. 카프카 커넥트 실행 및 JMX 포트 확인 $ bin/connect-distributed.sh ../config/connect-distributed.properties 실행이 완료되면 설정한 10000번 포트로 listen 여부를 확인합니다. $ netstat -anv | grep 10000 tcp46 0 0 *.10000 *.* L..

빅데이터/Kafka 2020.09.24
kafka connect 로그를 logstash로 수집하기 + grok 설정(multiline)

카프카 커넥트를 운영하다보면 로그를 파일로 쌓고 + 로그스태시를 통해 엘라스틱서치에 쌓고 싶을때가 있습니다. 이를 위해 아래와 같이 설정합니다. 1. connect-log4j.properties router.logs.dir=./connect-logs log4j.rootLogger=INFO, connectAppender log4j.logger.org.apache.zookeeper=ERROR log4j.logger.org.I0Itec.zkclient=ERROR log4j.logger.org.reflections=ERROR log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.connectAppender.F..

빅데이터/Kafka 2020.09.23
스프링 카프카 메시지 리스너 2가지 구현 방법

스프링 카프카에서 메시지 리스너를 구현하는 2가지 방법이 있습니다. 첫번째는 @KafkaListener를 사용하는것이고 두번째는 listenerContainer를 Bean등록하는 것입니다. @KafkaListener로 구현 가장 간단한 방법입니다. @KafkaListener를 통해 선언할 경우 파라미터를 오버로딩해서 알맞는 listenerContainer를 자동으로 주입합니다. package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBoot..

빅데이터/Kafka 2020.07.23
카프카 컨슈머 파티셔너 종류 및 정리(2.5.0 기준)

카프카 컨슈머는 토픽의 파티션과 매칭하여 레코드를 가져옵니다. 파티션을 매칭하는 기준은 컨슈머 파티션 어사이너의 기준을 따릅니다. 컨슈머 파티션어사이너 인터페이스는 아래 링크에서 확인할 수 있습니다. https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html ConsumerPartitionAssignor (kafka 2.5.0 API) kafka.apache.org RangeAssignor 기본 설정되는 파티션 어사이너입니다. 토픽의 파티션을 숫자기준으로 나열하고, 컨슈머의 이름을 사전순으로 나열한 뒤에 배정하는 정확히 반으로 나누어 배정합니다. 만약 딱 반으로 안나뉘어지는 홀수개의 ..

빅데이터/Kafka 2020.07.23
카프카 프로듀서 파티셔너 종류 및 정리(2.5.0 기준)

카프카 프로듀서는 레코드를 전송하기 위해 파티셔너를 제공합니다. 파티셔너 종류와 각 파티셔너에 대한 설명을 정리해보았습니다. 카프카 버젼이 올라감에 따라 파티셔너의 종류도 달라졌는데 여기서는 2.5.0 버젼의 파티셔너를 정리해보겠습니다. 프로듀서 파티셔너 인터페이스에 대한 설명은 아래 링크에서 확인할 수 있습니다. https://kafka.apache.org/25/javadoc/?org/apache/kafka/clients/producer/Partitioner.html kafka 2.5.0 API kafka.apache.org 메시지 키가 있을 때 메시지 키가 있을 경우에는 파티셔너의 종류와 관계없이 동일하게 동작합니다. 메시지 키의 해쉬값을 구해서 해당 해쉬값과 파티션을 매칭하여 적재합니다. 이로 인해..

빅데이터/Kafka 2020.07.23
컨슈머 스레드가 많다고 처리량이 높을까? 아닐까? 컨텍스트 스위칭으로 인한 예외 상황

오늘 카프카 컨슈머를 입수하던 도중 매우 신기한 현상을 발견해서 기록하게 되었습니다. 컨슈머스레드를 여러개로 HDFS에 적재하고 있었는데, 일부 파티션의 데이터가 0이 되어 일부 컨슈머 스레드가 쉬는동안 총 처리량이 늘어난 것입니다. 그래프를 보면서 왜 그렇게 되었는지 확인해보겠습니다. 입수 상태 - 24 core, 48G memory 물리장비 - 1 JVM application - 20 consumer thread - sync offset commit - save at Hadoop hdfs(append) 입수 관련 그래프 컨슈머 랙은 카프카 버로우를 사용하여 수집중에 있습니다. 랙 추이를 보면 알겠지만 파티션당 데이터 양의 차이가 일부 발생하였습니다. 10개 파티션은 18:20 부근에 끝났고 나머지 1..

빅데이터/Kafka 2020.07.22
kafka connector distributed 모드로 fileSourceConnector 실행

저번 포스트에 이어 distributed로 동작하는 kafka connector를 실습해보겠습니다. https://blog.voidmainvoid.net/356 Kafka file source connector standalone 모드 실행 Kafka에는 커낵터가 있습니다. 다양한 커낵터 클래스를 사용하여 컨슈머나 프로듀서 작성 없이 source로 부터 데이터를 카프카로 보내거나 받을 수 있습니다. 오늘은 FileStreamSource를 사용해서 file을 blog.voidmainvoid.net 준비물 - 카프카 바이너리 - 실행중인 카프카 클러스터 distributed vs standalone The difference is in the class which is started and the confi..

빅데이터/Kafka 2020.07.21
Kafka file source connector standalone 모드 실행

Kafka에는 커낵터가 있습니다. 다양한 커낵터 클래스를 사용하여 컨슈머나 프로듀서 작성 없이 source로 부터 데이터를 카프카로 보내거나 받을 수 있습니다. 오늘은 FileStreamSource를 사용해서 file을 카프카로 보내보겠습니다. 준비물 - 카프카 바이너리파일 - 카프카 클러스터 Standalone 모드 준비 Standalone모드란 1개의 프로세스로 동작하는 애플리케이션을 뜻합니다. 커낵터는 distribute모드와 standalone모드를 지원합니다. 실제 환경에서는 distribute모드를 사용하는 것을 권장합니다. standalone모드를 실행하기 위해 아래와 같이 준비합니다. connect-file-source.properties name=local-file-source conne..

빅데이터/Kafka 2020.07.21
Kafka-client 사용시 Failed to load class "org.slf4j.impl.StaticLoggerBinder" 에러 해결 방법

kafka-client를 java application에 추가하여 실행하면 아래와 같은 오류와 함께 log가 정상적으로 찍히지 않습니다. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 위 에러는 slf4j의 디펜던시 이슈로 StaticLoggerBinder가 없어서 그런것인데요. 아래와 같이 그래들에 디펜던시를 추가하면 로그를 확인하실 수 있습니다. dependencies { co..

빅데이터/Kafka 2020.07.13
Kafka ConsumerRecord의 timestamp는 0.10.0.0 이후부터 사용가능합니다.

카프카의 ConsumerRecord를 살펴보면 메시지 키와 메시지 값 이외에 추가로 timestamp가 있는 것을 확인할 수 있습니다. timestamp는 Kafka 프로듀서가 ProducerRecord를 생성하면서 timestamp 값을 Unix time으로 입력합니다. 만약 다른 timestamp를 넣고 싶다면 직접 설정하여 넣을수도 있습니다. 해당 건은 KAFKA-3025에서 논의했던 내용으로 자세한 내용은 해당 jira에서 확인할 수 있습니다. https://issues.apache.org/jira/browse/KAFKA-3025 [KAFKA-3025] KIP-31&KIP-32 (part 1): Add timestamp field to message, configs, and Producer/Con..

빅데이터/Kafka 2020.07.08
telegraf사용시 kafka로 데이터 json형태로 보내는 방법

telegraf는 metric수집에 최적화되어있습니다. 다만, 그대로 사용하게 되면 influxdb에서 사용하는 데이터형태(csv와 유사)로 데이터가 전송되는데요. kafka에서 데이터를 잘 활용하기 위해 json을 사용한다면 consumer에서도 스키마에 대한 걱정없이(?) 잘 사용할 수 있습니다. telegraf에서 data형태를 json형태로 바꾸는 예제를 아래에 설명드리도록 하겠습니다. telegraf에 대한 설명은 아래 링크를 확인해주세요. https://blog.voidmainvoid.net/260 Fluentd vs Telegraf 차이점 알아보기 Telegraf와 fluentd는 아주 유사해보인다. 둘다 configuration파일 기반으로 작동하며 plugin을 통해 개발자가 custom..

빅데이터/Kafka 2020.07.08
카프카의 토픽 데이터를 REST api로 주고받자 - Kafka rest proxy 사용

confluent에서는 rest proxy라고 불리는 카프카 클러스터를 위한 RESTful interface application을 오픈소스로 제공하고 있습니다. 기존에 Kafka connect, Kafka client로 데이터를 전달하는 것과는 사뭇 다르게 REST api를 사용한다는점이 독특한데요. 직접 코드를 짜지 않고 범용적으로 사용되는 http을 사용해서 데이터를 넣고 뺄 수 있다는 점이 독특합니다. 오늘은 rest proxy를 local에 설치하고 실행해보겠습니다. 준비물 - local kafka cluster - git - terminal - postman 다운로드 및 실행 rest proxy를 사용하기 위해서는 rest proxy가 포함된 confluent의 community package..

빅데이터/Kafka 2020.06.25
AWS MSK 사용시 인스턴스 유형별 최대 토픽 개수

AWS MSK는 카프카를 SaaS형태로 사용가능한 AWS서비스중 하나입니다. 완전관리형 SaaS는 아니지만 다양한 옵션을 지정할 수 있으며, 추가적인 모니터링 도구를 제공해주기 때문에 아주 편리하게 사용할 수 있습니다. MSK에서는 총 7개의 인스턴스유형을 제공하는데, 각 인스턴스 유형별로 생성할 수 있는 토픽의 개수를 살펴보도록 하겠습니다. MSK Bestcase 문서에 따르면 적정크기의 클러스터에 대한 내용이 나와 있습니다. 여기서는 MSK클러스터를 생성할 때 브로커 노드의 유형 및 수에 대한 상세한 내용이 나와 있는데 그 중 브로커당 파티션수에 대한 내용이 아래와 같이 정리되어 있습니다. 위 그림에서 보는 것 처럼 t3.small이 가장 작고 m5.24xlarge가 가장 큰 파티션 개수를 가질 수 ..

빅데이터/Kafka 2020.06.24
자바 멀티스레드 카프카 컨슈머 애플리케이션 구현 코드

이번 포스트에서는 자바 멀티스레드 카프카 컨슈머 애플리케이션을 구현해보도록 하겠습니다. 준비물 - Intellij 또는 eclipse - gradle project - jdk 1.8 - 약 10분 구현방법 이전 포스트(카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala) 바로가기)에서 scala로 구현한 바가 있습니다. 이번에는 자바로 구현하고, consumer.wakeup()을 사용해서 consumer를 안전하게 해제시키는 것을 아래 코드에서 구현해보겠습니다. wakeup() 메서드는 java shutdown hook을 통해 호출하여 안전하게 종료되도록 설정해보겠습니다. 기존 구현방식 처럼 newCachedThreadPool 메서드를 사용해서 multiple thread를 생성해보겠습니다...

빅데이터/Kafka 2020.06.09
Kafka-client client.dns.lookup 옵션 정리

Kafka-client 2.1.0 이후 버젼에서는 client.dns.lookup옵션을 사용하여 dns관련 설정을 사용할 수 있습니다. 오늘 포스팅에서는 해당 옵션이 어떤 역할을 하고 어떻게 동작하는지 알아보도록 하겠습니다. 먼저, apache kafka document의 설명을 보도록 하겠습니다. client.dns.lookup document client.dns.lookup: Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, they will all be attempted to connect to before ..

빅데이터/Kafka 2020.04.13
카프카 클러스터 클러스터ip DNS 연동방법. use_all_dns_ips 사용(in AWS, route53)

Kafka-client(consumer, producer)를 사용하기 위해서는 다양한 설정이 필요하지만 카프카 브로커와 통신하기 위해서는 bootstrap.servers 옵션은 반드시 필요한 옵션중 하나입니다. Bootstrap.servers 이 옵션은 카프카 클러스터에 연결하기 위해 클라이언트가 사용하는 브로커들의 host:port 목록을 설정해야 합니다. 특이한점은 모든 브로커의 host와 port를 적지 않아도 된다는 점입니다. 왜냐면 최초로 연결된 하나의 broker의 host:port로 부터 통신을 위한 정보를 가져오기 때문입니다. Route 53에서 kafka cluster DNS 설정하기 route53은 aws에서 제공하는 DNS 웹서비스 입니다. aws route53에 이미 등록되어 있는 ..

빅데이터/Kafka 2020.03.20
AWS에 카프카 클러스터 설치하기(ec2, 3 brokers)

보통 테스트할때 맥북 또는 윈도우 컴퓨터의 1대 장비에 설치하곤하는데요. 고 가용성 테스트를 하기 위해서는 반드시 3대 이상의 클러스터를 설치해야 완벽한 카프카클러스터로서 테스트가 가능합니다. 또한 테스트가 아니더라도 운영을 위해 ec2에 설치하는 경우도 있습니다. 이번 포스트에서는 AWS에 카프카 클러스터(3대)를 설치해보겠습니다. 실행 목차 aws에 카프카 클러스터(3대)를 설치하기 위해서는 아래와 같은 단계가 필요합니다. 1) AWS로 EC2 3대 발급 2) 방화벽 설정 및 /etc/hosts 설정 3) Zookeeper 설치 4) Kafka 설치 위와 같은 단계를 통해 클러스터를 구축해보고 local 컴퓨터에서 console producer와 consumer를 통해 클러스터가 정상동작하는지 테스트..

빅데이터/Kafka 2020.03.18
카프카 버로우 = consumer lag 모니터링 오픈소스 애플리케이션

카프카 컨슈머 Lag 모니터링 필수요소 카프카 lag은 토픽의 가장 최신 오프셋과 컨슈머 오프셋간의 차이입니다. Kafka-client 라이브러리를 사용해서 Java 또는 scala와 같은 언어로 카프카 컨슈머를 구현할수 있는데요. 이때 구현한 kafkaConsumer 객체를 통해 현재 lag 정보를 가져올 수 있습니다. 만약 lag을 실시간으로 모니터링하고 싶다면 데이터를 Elasticsearch나 InfluxDB와 같은 저장소에 넣은뒤에 Grafana 대시보드를 통해 확인할 수도 있습니다. Github URL https://github.com/linkedin/Burrow linkedin/Burrow Kafka Consumer Lag Checking. Contribute to linkedin/Burro..

빅데이터/Kafka 2020.03.07
카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala)

Kafka-client library를 사용하여 JVM위에 올라가는 consumer/producer를 작성할 수 있습니다. 이번 포스팅에서는 scala로 Kafka consumer를 멀티쓰레드로 실행하는 애플리케이션 예제 코드를 공유, 설명 드리겠습니다. 제약조건 - Kafka consumer - Multi thread(2개 이상) 지원 - Scala 코드 Scala를 실행하는 멀티쓰레드 카프카 컨슈머 애플리케이션의 파일은 크게 4개로 나뉘어져 있습니다. 먼저, Scala application을 실행하는 Main.scala와 실제로 Consumer역할을 하게 되는 Runnable Thread인 ConsumerWorker.scala, Consumer의 상태를 기록할 ConsumerStatus.scala 마..

빅데이터/Kafka 2020.02.24