빅데이터 210

local kafka single broker 띄우기 with 도커

wurstmeister/kafka-docker는 사실상 표준(디팩토)으로 카프카 도커 이미지로 띄우는 용도로 사용하고 있습니다. 사용 방법은 아래와 같습니다. $ git clone https://github.com/wurstmeister/kafka-docker.git $ cd kafka-docker wurstmeister/kafka-docker에 있는 docker-compose-single-broker.yml 의 내부 설정 중 KAFKA_ADVERTISED_HOST_NAME을 변경합니다. $ vi docker-compose-single-broker.yml version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181"..

빅데이터/Kafka 2021.03.05
특정 시점(날짜+시간)의 레코드부터 가져오도록 설정하기.

카프카의 토픽에 저장된 레코드는 0.11.0.0 이후부터 timestamp가 저장됩니다. 그러므로 kafka-consumer-group.sh 스크립트로 오프셋을 직접 지정할 수 있는데 --to-datetime을 사용하면 특정 시점(시간) 데이터부터 가져가도록 설정할 수 있습니다. Example) 토픽의 레코드가 다음과 같이 있다고 가정합니다. ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1614748497034, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = ..

빅데이터/Kafka 2021.03.03
kafka console consumer 여러 토픽 컨슘하기

kafka console consumer를 사용하다보면 --topic으로 토픽을 컨슈밍 하기도 하지만 여러 토픽 또는 정규식을 사용해서 확인해보고 싶을 때가 있다. 이 때 --whitelist를 사용하면 된다. Example) hello-topic과 hello.topic 토픽을 컨슈밍 하고 싶다면 $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist 'hello-topic|hello.topic' Example2) 모든 토픽을 컨슈밍 하고 싶다면 $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*'

빅데이터/nosql 2021.02.25
아파치 카프카를 데이터 레이크로 사용할 수 있을까?

데이터 레이크란? 데이터 레이크의 기본 개념은 기업단위의 서비스 데이터들을 한곳의 저장공간에 모아 두는 것입니다. 이렇게 저장된 데이터로부터 BI(Business Intelligence) App으로 조회하거나 시각화, 머신러닝을 수행하였습니다. 데이터 웨어하우스와 다르게 '일단 저장'하고 나서 이후에 스키마를 적용하는 'Schema-on-Read'에 초점을 잡고 있습니다. 데이터 레이크라는 단어가 나온 이래로 지속 발전을 거듭했고 데이터 레이크 기술은 다음과 같이 발전 하였습니다. - 1세대 데이터 레이크 : HDFS, 맵리듀스, 피그, 하이브, 임팔라, 플룸, 스쿱 - 2세대 데이터 레이크 : 클라우드 네이티브로 성장하였고 오브젝트 스토리지(S3), 스파크, 플링크, 프레스토, 스트림셋 스트리밍 데이터..

빅데이터/Kafka 2021.02.25
macOS에서 카프카 버로우 빌드 및 실행하기.

1. golang 설치 $ brew install go 2. burrow clone from github $ git clone https://github.com/linkedin/Burrow.git 3. go build, install $ cd to the source directory. $ go mod tidy $ go install 3. run $ $GOPATH/bin/Burrow --config-dir=/path/containing/config 버로우를 실행할 때 주의할점은 --config-dir은 말그대로 디렉토리를 설정해야한다. 파일을 설정하면 안된다. burrow.toml이 포함된 디렉토리를 설정한다. 로컬호스트에서 카프카, 주키퍼를 실행할 경우 아래와 같이 burrow.toml을 설정한다. [..

빅데이터/Kafka 2020.11.29
하둡 맵리듀스 동작방법

하둡의 맵리듀스에 대한 설명은 다음링크를 참조한다. voidmainvoid.tistory.com/399 하둡 맵리듀스 접근법 맵리듀스는 일괄질의처리기이다. 합리적인 시간 내에 결과를 보여주는 능력을 가지고 있음. 질의를 실행한 이후 수초내에 결과를 받는 시스템과는 다름. 하둡은 기존 병렬프로그래밍과 다른 분 blog.voidmainvoid.net 맵리듀스를 운영하는 2가지 방법을 알아본다. 첫번째는 전통적인 하둡의 맵리듀스를 사용하는 방식과 두번째는 YARN을 사용하는 방식이다. 각 방식에 대해 알아보자. 맵리듀스1 Job : 클아이언트가 수행하는 작업 단위. 하둡은 Job을 맵 태스크와 리듀스 태스크로 나우어 실행한다. Jobtracker : Tasktracker들이 수행할 태스크를 스케쥴링 Taskt..

빅데이터/하둡 2020.11.26
하둡 맵리듀스 접근법

맵리듀스는 일괄질의처리기이다. 합리적인 시간 내에 결과를 보여주는 능력을 가지고 있음. 질의를 실행한 이후 수초내에 결과를 받는 시스템과는 다름. 하둡은 기존 병렬프로그래밍과 다른 분산 프로그래밍 관점에서 바라봐야한다. MPI와 같은 병렬프로그래밍은 사용자로 하여금 직접적인 알고리즘을 짜도록 도와주지만 하둡은 최상위 수준에서만 동작한다. 개발자는 데이터의 흐름을 신경쓰지 않아도 되고 데이터 모델 관점에서만 생각하면된다. 맵리듀스와 같은 분산프로그래밍에서 어려운 점은 원격 프로세스의 실패/성공 여부이다. 프로세스(태스크)가 실패했을 경우를 감지하여 장애가 없는 머신에서 다시 돌려야한다. 하둡의 맵리듀스는 태스크간의 의존성이 없는 아키텍처이기 때문에 실패에 크게 고민하지 않아도 된다. 맵리듀스는 맵의 실패보..

빅데이터/하둡 2020.11.24
카프카 토픽의 오프셋 최대 크기는 얼마일까?

카프카의 토픽에는 파티션이 있습니다. 프로듀서가 레코드를 파티션에 저장하면 각 레코드에는 오프셋이라고 불리는 고유한 번호가 붙게 되는데요. 오프셋 번호가 최대값에 도달하면 어떻게 처리해야하는지 의문을 가질 수 있습니다. 오프셋 번호의 최대값에 대해서 확인해 보았는데요. 오프셋은 int64로 지정되어 있었습니다. private static final Field.Int64 OFFSET = new Field.Int64("offset", "The offset found"); 자바에서 int64의 최대값은 9,223,372,036,854,775,807 입니다. 9,223,372,036,854,775,807는 하루에 1조개의 record를 쌓더라도 25,269년이 걸리는 어마어마한 값입니다. 게다가 파티션을 1개만..

빅데이터/Kafka 2020.11.17
카프카 커넥터 빌드시 JDK11이 아닌 JDK8로 그래들 빌드해야합니다.

카프카 커넥터를 직접 개발해야할 때가 있습니다. 이때 jar를 만들어서 카프카 커넥트에 커스텀 커넥터를 추가하곤 하는데요. 커넥터를 위한 jar을 만들때 JDK11이 아닌 JDK8 GRADLE JVM으로 반드시 빌드해야합니다. JDK11기반 GRADLE JVM으로 빌드하면 kafka 2.6.0 기준 카프카 커넥트에서는 정상적으로 커넥터를 추가하지 못합니다. 카프카 커넥터용 우버 JAR만드는 방법 jar { from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } }

빅데이터/Kafka 2020.11.02
토픽의 메시지 값을 직렬화/역직렬화가 정상적으로 이루어지지 않는 경우 테스트

토픽의 메시지 값 또는 메시지 키는 직렬화하여 저장되어 있다. 만약 직렬화와 역직렬화 포맷이 다르면 어떻게 될까? - test 토픽 - 직렬화 : StringStringSerializer - 역직렬화 : UUIDDeserializer 의도적으로 직렬화 포맷과 역직렬화 포맷을 다르게 하여 테스트를 진행합니다. 1. 프로듀서 public class SimpleProducer { private static String TOPIC_NAME = "test"; private static String BOOTSTRAP_SERVERS = "localhost:9092"; public static void main(String[] args) { Properties configs = new Properties(); conf..

빅데이터/Kafka 2020.10.15
정말정말 간단한 스프링 카프카 컨슈머 애플리케이션 예제

스프링 카프카는 카프카를 스프링에서 쉽게 사용할 수 있도록 하는 라이브러리입니다. 스프링 카프카를 통해 컨슈머를 만드는 가장 간단한 코드를 공유합니다. 준비물 - 그래들 - 스프링부트 - 인텔리제이 디렉토리구조 ├── build.gradle ├── settings.gradle └── src └── main ├── java │ └── com │ └── test │ └── Main.java └── resources └── application.yaml build.gradle plugins { id 'java' } group 'org.example' version '1.0-SNAPSHOT' repositories { mavenCentral() } dependencies { compile 'org.springf..

빅데이터/Kafka 2020.10.13
엘라스틱서치에 중복 id로 값을 보내면?

엘라스틱서치에 중복id로 값을 보내면 버젼이 올라간다. $ curl --location --request PUT 'localhost:9200/books/book/2d12dd2' \ --header 'Content-Type: application/json' \ --data-raw '{ "_id":"1", "tile": "Nesoy Elastic Guide", "author": "Nesoy", "date": "2019-01-15", "pages": 250 }' { "_index": "books", "_type": "book", "_id": "2d12dd2", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "faile..

Reactive를 품은 스프링 카프카 시청 정리 자료

아래 영상자료를 보고 정리한 글입니다. 1. 카프카 그리고 스프링 카프카는 현대의 메시지드리븐아키텍처의 핵심 플랫폼. 다양한 종류로 사용 가능 스프링에서 카프카를 사용하는 방법은 아주 쉬워졌음 마치 webMVC에서 request패턴 사용한것처럼 사용. 하지만 동시성문제는 어떻게 해결할까? 파티션을 나눠서 병렬처리는 가능. 리액티브프로그래밍의 본질은 어떤 대상을 Async하게 다루는 것. Async하게 다루는것을 스트림으로 처리하겠다. Flux로 비동기 처리하는 것이 핵심. 프로듀서를 생성한 코드 컨슈머 코드 스프링카프카에서 리액트 사용 방법 2. 적용 프로젝트 관찰대상 서버가 다수가 되면서 모든 데이터를 처리하는 것이 중요 최악의 경우를 고민해야함.. - 1,000대이상의 서버에서 동시다발적으로 이벤트 ..

빅데이터/Kafka 2020.09.26
카프카 커넥트 JMX + 로그스태시로 모니터링 하기

카프카 커넥트(분산모드)는 기본 JMX포트를 통해 커넥트 애플리케이션의 상태를 모니터링 수집 애플리케이션에 데이터를 전송할 수 있습니다. 오늘은 카프카 커넥트의 JMX포트를 열고 로그스태시로 JMX로 지표를 수집하는 방법에 대해 알아보겠습니다. 1. 카프카 커넥트 JMX포트 열기 $ export JMX_PORT=10000 우선 JMX포트를 열기위해 환경변수로 선언합니다. 2. 카프카 커넥트 실행 및 JMX 포트 확인 $ bin/connect-distributed.sh ../config/connect-distributed.properties 실행이 완료되면 설정한 10000번 포트로 listen 여부를 확인합니다. $ netstat -anv | grep 10000 tcp46 0 0 *.10000 *.* L..

빅데이터/Kafka 2020.09.24
kafka connect 로그를 logstash로 수집하기 + grok 설정(multiline)

카프카 커넥트를 운영하다보면 로그를 파일로 쌓고 + 로그스태시를 통해 엘라스틱서치에 쌓고 싶을때가 있습니다. 이를 위해 아래와 같이 설정합니다. 1. connect-log4j.properties router.logs.dir=./connect-logs log4j.rootLogger=INFO, connectAppender log4j.logger.org.apache.zookeeper=ERROR log4j.logger.org.I0Itec.zkclient=ERROR log4j.logger.org.reflections=ERROR log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.connectAppender.F..

빅데이터/Kafka 2020.09.23
mac에서 하둡 hdfs 설치 및 실행하기

1. homebrew를 통해 hadoop package 설치 $ brew install hadoop hadoop이 설치된 경로는 아래와 같이 명령하여 확인할 수 있다. $ brew info hadoop hadoop: stable 3.3.0 Framework for distributed processing of large data sets https://hadoop.apache.org/ Conflicts with: yarn (because both install `yarn` binaries) /usr/local/Cellar/hadoop/hdfs (366 files, 40.9MB) Built from source /usr/local/Cellar/hadoop/3.3.0 (21,844 files, 963.5MB..

빅데이터 2020.08.19
스프링 카프카 메시지 리스너 2가지 구현 방법

스프링 카프카에서 메시지 리스너를 구현하는 2가지 방법이 있습니다. 첫번째는 @KafkaListener를 사용하는것이고 두번째는 listenerContainer를 Bean등록하는 것입니다. @KafkaListener로 구현 가장 간단한 방법입니다. @KafkaListener를 통해 선언할 경우 파라미터를 오버로딩해서 알맞는 listenerContainer를 자동으로 주입합니다. package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBoot..

빅데이터/Kafka 2020.07.23
카프카 컨슈머 파티셔너 종류 및 정리(2.5.0 기준)

카프카 컨슈머는 토픽의 파티션과 매칭하여 레코드를 가져옵니다. 파티션을 매칭하는 기준은 컨슈머 파티션 어사이너의 기준을 따릅니다. 컨슈머 파티션어사이너 인터페이스는 아래 링크에서 확인할 수 있습니다. https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html ConsumerPartitionAssignor (kafka 2.5.0 API) kafka.apache.org RangeAssignor 기본 설정되는 파티션 어사이너입니다. 토픽의 파티션을 숫자기준으로 나열하고, 컨슈머의 이름을 사전순으로 나열한 뒤에 배정하는 정확히 반으로 나누어 배정합니다. 만약 딱 반으로 안나뉘어지는 홀수개의 ..

빅데이터/Kafka 2020.07.23
카프카 프로듀서 파티셔너 종류 및 정리(2.5.0 기준)

카프카 프로듀서는 레코드를 전송하기 위해 파티셔너를 제공합니다. 파티셔너 종류와 각 파티셔너에 대한 설명을 정리해보았습니다. 카프카 버젼이 올라감에 따라 파티셔너의 종류도 달라졌는데 여기서는 2.5.0 버젼의 파티셔너를 정리해보겠습니다. 프로듀서 파티셔너 인터페이스에 대한 설명은 아래 링크에서 확인할 수 있습니다. https://kafka.apache.org/25/javadoc/?org/apache/kafka/clients/producer/Partitioner.html kafka 2.5.0 API kafka.apache.org 메시지 키가 있을 때 메시지 키가 있을 경우에는 파티셔너의 종류와 관계없이 동일하게 동작합니다. 메시지 키의 해쉬값을 구해서 해당 해쉬값과 파티션을 매칭하여 적재합니다. 이로 인해..

빅데이터/Kafka 2020.07.23
컨슈머 스레드가 많다고 처리량이 높을까? 아닐까? 컨텍스트 스위칭으로 인한 예외 상황

오늘 카프카 컨슈머를 입수하던 도중 매우 신기한 현상을 발견해서 기록하게 되었습니다. 컨슈머스레드를 여러개로 HDFS에 적재하고 있었는데, 일부 파티션의 데이터가 0이 되어 일부 컨슈머 스레드가 쉬는동안 총 처리량이 늘어난 것입니다. 그래프를 보면서 왜 그렇게 되었는지 확인해보겠습니다. 입수 상태 - 24 core, 48G memory 물리장비 - 1 JVM application - 20 consumer thread - sync offset commit - save at Hadoop hdfs(append) 입수 관련 그래프 컨슈머 랙은 카프카 버로우를 사용하여 수집중에 있습니다. 랙 추이를 보면 알겠지만 파티션당 데이터 양의 차이가 일부 발생하였습니다. 10개 파티션은 18:20 부근에 끝났고 나머지 1..

빅데이터/Kafka 2020.07.22
kafka connector distributed 모드로 fileSourceConnector 실행

저번 포스트에 이어 distributed로 동작하는 kafka connector를 실습해보겠습니다. https://blog.voidmainvoid.net/356 Kafka file source connector standalone 모드 실행 Kafka에는 커낵터가 있습니다. 다양한 커낵터 클래스를 사용하여 컨슈머나 프로듀서 작성 없이 source로 부터 데이터를 카프카로 보내거나 받을 수 있습니다. 오늘은 FileStreamSource를 사용해서 file을 blog.voidmainvoid.net 준비물 - 카프카 바이너리 - 실행중인 카프카 클러스터 distributed vs standalone The difference is in the class which is started and the confi..

빅데이터/Kafka 2020.07.21
Kafka file source connector standalone 모드 실행

Kafka에는 커낵터가 있습니다. 다양한 커낵터 클래스를 사용하여 컨슈머나 프로듀서 작성 없이 source로 부터 데이터를 카프카로 보내거나 받을 수 있습니다. 오늘은 FileStreamSource를 사용해서 file을 카프카로 보내보겠습니다. 준비물 - 카프카 바이너리파일 - 카프카 클러스터 Standalone 모드 준비 Standalone모드란 1개의 프로세스로 동작하는 애플리케이션을 뜻합니다. 커낵터는 distribute모드와 standalone모드를 지원합니다. 실제 환경에서는 distribute모드를 사용하는 것을 권장합니다. standalone모드를 실행하기 위해 아래와 같이 준비합니다. connect-file-source.properties name=local-file-source conne..

빅데이터/Kafka 2020.07.21
Kafka-client 사용시 Failed to load class "org.slf4j.impl.StaticLoggerBinder" 에러 해결 방법

kafka-client를 java application에 추가하여 실행하면 아래와 같은 오류와 함께 log가 정상적으로 찍히지 않습니다. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 위 에러는 slf4j의 디펜던시 이슈로 StaticLoggerBinder가 없어서 그런것인데요. 아래와 같이 그래들에 디펜던시를 추가하면 로그를 확인하실 수 있습니다. dependencies { co..

빅데이터/Kafka 2020.07.13
Kafka ConsumerRecord의 timestamp는 0.10.0.0 이후부터 사용가능합니다.

카프카의 ConsumerRecord를 살펴보면 메시지 키와 메시지 값 이외에 추가로 timestamp가 있는 것을 확인할 수 있습니다. timestamp는 Kafka 프로듀서가 ProducerRecord를 생성하면서 timestamp 값을 Unix time으로 입력합니다. 만약 다른 timestamp를 넣고 싶다면 직접 설정하여 넣을수도 있습니다. 해당 건은 KAFKA-3025에서 논의했던 내용으로 자세한 내용은 해당 jira에서 확인할 수 있습니다. https://issues.apache.org/jira/browse/KAFKA-3025 [KAFKA-3025] KIP-31&KIP-32 (part 1): Add timestamp field to message, configs, and Producer/Con..

빅데이터/Kafka 2020.07.08
telegraf사용시 kafka로 데이터 json형태로 보내는 방법

telegraf는 metric수집에 최적화되어있습니다. 다만, 그대로 사용하게 되면 influxdb에서 사용하는 데이터형태(csv와 유사)로 데이터가 전송되는데요. kafka에서 데이터를 잘 활용하기 위해 json을 사용한다면 consumer에서도 스키마에 대한 걱정없이(?) 잘 사용할 수 있습니다. telegraf에서 data형태를 json형태로 바꾸는 예제를 아래에 설명드리도록 하겠습니다. telegraf에 대한 설명은 아래 링크를 확인해주세요. https://blog.voidmainvoid.net/260 Fluentd vs Telegraf 차이점 알아보기 Telegraf와 fluentd는 아주 유사해보인다. 둘다 configuration파일 기반으로 작동하며 plugin을 통해 개발자가 custom..

빅데이터/Kafka 2020.07.08
pyspark사용시 csv로 저장시 json이 따옴표(")로 묶이는 현상 방지하기

pyspark를 사용하여 dataframe을 처리한 이후 csv로 json value를 저장할때 아래와 같이 저장될 때가 있습니다. // 저장되기 원하는 값 {"key":"value"} // 실제로 저장되는 값 "{\"key\":\"value\"}" json value로 그대로 저장하길 원하지만, 따옴표가 escape처리되어 저장됩니다. 이때는 pyspark의 csv저장옵션을 지정해야합니다. pyspark csv 저장옵션 중 escapeQuotes를 false로 두면 해결할 수 있습니다. ... dataframe.write.option("escapeQuotes","false").mode("overwrite").csv("/data/destination") 위 구문을 실행하면 json이 그대로 저장되는것을 ..

빅데이터 2020.07.06
macOS에 pyspark설치, pyspark실행시 jupyterlab 실행시키기

스파크를 다루다보면 jupyterlab을 통해 pyspark를 실행시킬 일이 종종 있습니다. 로컬컴퓨터(macOS)에 설치하는 방법을 설명드리도록 하겠습니다. pyspark와 jupyterlab을 설치하기 위해서 패키지 설치도구인 homebrew를 사용하도록 하겠습니다. homebrew에 대한 자세한 설명은 아래 영상을 참고해주세요. 1. pyspark설치 $ brew install apache-spark 2. jupyterlab설치 $ brew install jupyterlab 3. pyspark환경변수 설정 pyspark를 실행할때 아래 2개의 환경변수를 .bashrc 또는 .zshrc에 넣어도록합니다. pyspark를 실행하면 jupyterlab을 실행하게 됩니다. export PYSPARK_DRI..

빅데이터 2020.07.01