빅데이터 210

카프카의 토픽 데이터를 REST api로 주고받자 - Kafka rest proxy 사용

confluent에서는 rest proxy라고 불리는 카프카 클러스터를 위한 RESTful interface application을 오픈소스로 제공하고 있습니다. 기존에 Kafka connect, Kafka client로 데이터를 전달하는 것과는 사뭇 다르게 REST api를 사용한다는점이 독특한데요. 직접 코드를 짜지 않고 범용적으로 사용되는 http을 사용해서 데이터를 넣고 뺄 수 있다는 점이 독특합니다. 오늘은 rest proxy를 local에 설치하고 실행해보겠습니다. 준비물 - local kafka cluster - git - terminal - postman 다운로드 및 실행 rest proxy를 사용하기 위해서는 rest proxy가 포함된 confluent의 community package..

빅데이터/Kafka 2020.06.25
AWS MSK 사용시 인스턴스 유형별 최대 토픽 개수

AWS MSK는 카프카를 SaaS형태로 사용가능한 AWS서비스중 하나입니다. 완전관리형 SaaS는 아니지만 다양한 옵션을 지정할 수 있으며, 추가적인 모니터링 도구를 제공해주기 때문에 아주 편리하게 사용할 수 있습니다. MSK에서는 총 7개의 인스턴스유형을 제공하는데, 각 인스턴스 유형별로 생성할 수 있는 토픽의 개수를 살펴보도록 하겠습니다. MSK Bestcase 문서에 따르면 적정크기의 클러스터에 대한 내용이 나와 있습니다. 여기서는 MSK클러스터를 생성할 때 브로커 노드의 유형 및 수에 대한 상세한 내용이 나와 있는데 그 중 브로커당 파티션수에 대한 내용이 아래와 같이 정리되어 있습니다. 위 그림에서 보는 것 처럼 t3.small이 가장 작고 m5.24xlarge가 가장 큰 파티션 개수를 가질 수 ..

빅데이터/Kafka 2020.06.24
pyspark 데이터프레임 조건절(when)로 데이터 처리하기

Pyspark를 사용해서 대용량의 데이터를 처리(process)할 때 일부 변환(Transformation)이 필요할 때가 있습니다. 일부 요구사항의 경우 가져온 데이터 프레임의 데이터에 조건절(conditional)을 추가해야할 때가 있습니다. 이때 pyspark에서 제공하는 조건절인 when을 사용하면 아주 유용한데요. 예를 들어 설명하도록 하겠습니다. 만약 아래와 같은 데이터가 있다고 가정합시다. seoul corea 2019 newyork usa 2020 seoul corea 2043 newtork usa 2344 brisbane australia 2033 위 데이터를 보면 corea로 글자가 틀린 것을 확인할 수 있습니다. 첫번째 column이 seoul인 데이터만 두번째 column을 Kore..

빅데이터 2020.06.23
자바 멀티스레드 카프카 컨슈머 애플리케이션 구현 코드

이번 포스트에서는 자바 멀티스레드 카프카 컨슈머 애플리케이션을 구현해보도록 하겠습니다. 준비물 - Intellij 또는 eclipse - gradle project - jdk 1.8 - 약 10분 구현방법 이전 포스트(카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala) 바로가기)에서 scala로 구현한 바가 있습니다. 이번에는 자바로 구현하고, consumer.wakeup()을 사용해서 consumer를 안전하게 해제시키는 것을 아래 코드에서 구현해보겠습니다. wakeup() 메서드는 java shutdown hook을 통해 호출하여 안전하게 종료되도록 설정해보겠습니다. 기존 구현방식 처럼 newCachedThreadPool 메서드를 사용해서 multiple thread를 생성해보겠습니다...

빅데이터/Kafka 2020.06.09
pymongo - find결과로 나온 데이터의 ObjectId()를 string으로 변경하기

def getSpecificId(id): result = objectIdDecoder(list(collection.find({"_id": ObjectId(id)}))) return str(result) def objectIdDecoder(list): results=[] for document in list: document['_id'] = str(document['_id']) results.append(document) return results pymongo를 통해 collection을 find한 다음 받은 ObjectId는 유효한 json type이 아니다. 그러므로 json형태로 사용할 경우에는 ObjectId를 String으로 변환하는 작업을 해야합니다. 이때 ObjectIdDecoder가 Lis..

빅데이터/nosql 2020.05.13
Kafka-client client.dns.lookup 옵션 정리

Kafka-client 2.1.0 이후 버젼에서는 client.dns.lookup옵션을 사용하여 dns관련 설정을 사용할 수 있습니다. 오늘 포스팅에서는 해당 옵션이 어떤 역할을 하고 어떻게 동작하는지 알아보도록 하겠습니다. 먼저, apache kafka document의 설명을 보도록 하겠습니다. client.dns.lookup document client.dns.lookup: Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, they will all be attempted to connect to before ..

빅데이터/Kafka 2020.04.13
Elasticsearch, Logstash, Kibana 버젼별 하위호환표

엘라스틱서치, 로그스태시, 키바나는 각 버젼별 호환성을 따집니다. 그러므로 각 제품별로 하위호환표를 참고하여 버젼업그레이드를 진해해야만 합니다. 아래는 5.x버젼부터 7.x 버젼까지의 각 제품별 하위호환표 입니다. Elasticsearch Kibana X-Pack Beats^* Logstash^* ES-Hadoop (jar)* APM Server App Search 5.0.x 5.0.x 5.0.x 1.3.x-5.6.x 2.4.x-5.6.x 5.0.x-5.6.x 5.1.x 5.1.x 5.1.x 1.3.x-5.6.x 2.4.x-5.6.x 5.0.x-5.6.x 5.2.x 5.2.x 5.2.x 1.3.x-5.6.x 2.4.x-5.6.x 5.0.x-5.6.x 5.3.x 5.3.x 5.3.x 1.3.x-5.6.x ..

카프카 클러스터 클러스터ip DNS 연동방법. use_all_dns_ips 사용(in AWS, route53)

Kafka-client(consumer, producer)를 사용하기 위해서는 다양한 설정이 필요하지만 카프카 브로커와 통신하기 위해서는 bootstrap.servers 옵션은 반드시 필요한 옵션중 하나입니다. Bootstrap.servers 이 옵션은 카프카 클러스터에 연결하기 위해 클라이언트가 사용하는 브로커들의 host:port 목록을 설정해야 합니다. 특이한점은 모든 브로커의 host와 port를 적지 않아도 된다는 점입니다. 왜냐면 최초로 연결된 하나의 broker의 host:port로 부터 통신을 위한 정보를 가져오기 때문입니다. Route 53에서 kafka cluster DNS 설정하기 route53은 aws에서 제공하는 DNS 웹서비스 입니다. aws route53에 이미 등록되어 있는 ..

빅데이터/Kafka 2020.03.20
AWS에 카프카 클러스터 설치하기(ec2, 3 brokers)

보통 테스트할때 맥북 또는 윈도우 컴퓨터의 1대 장비에 설치하곤하는데요. 고 가용성 테스트를 하기 위해서는 반드시 3대 이상의 클러스터를 설치해야 완벽한 카프카클러스터로서 테스트가 가능합니다. 또한 테스트가 아니더라도 운영을 위해 ec2에 설치하는 경우도 있습니다. 이번 포스트에서는 AWS에 카프카 클러스터(3대)를 설치해보겠습니다. 실행 목차 aws에 카프카 클러스터(3대)를 설치하기 위해서는 아래와 같은 단계가 필요합니다. 1) AWS로 EC2 3대 발급 2) 방화벽 설정 및 /etc/hosts 설정 3) Zookeeper 설치 4) Kafka 설치 위와 같은 단계를 통해 클러스터를 구축해보고 local 컴퓨터에서 console producer와 consumer를 통해 클러스터가 정상동작하는지 테스트..

빅데이터/Kafka 2020.03.18
HDFS cilent 사용시 HA구성된 node 연결하기

HDFS client를 사용하여 hdfs data를 직접 접근하곤 합니다. // HDFS 설정 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://172.1.2.3:8020"); // FileSystem 설정 FileSystem dfs = FileSystem.get(conf); Path filenamePath = new Path("/data/test.txt"); System.out.println("File Exists : " + dfs.exists(filenamePath)); // Write data FSDataOutputStream out = dfs.create(filenamePath); out.write("TEST..

빅데이터/하둡 2020.03.11
카프카 버로우 = consumer lag 모니터링 오픈소스 애플리케이션

카프카 컨슈머 Lag 모니터링 필수요소 카프카 lag은 토픽의 가장 최신 오프셋과 컨슈머 오프셋간의 차이입니다. Kafka-client 라이브러리를 사용해서 Java 또는 scala와 같은 언어로 카프카 컨슈머를 구현할수 있는데요. 이때 구현한 kafkaConsumer 객체를 통해 현재 lag 정보를 가져올 수 있습니다. 만약 lag을 실시간으로 모니터링하고 싶다면 데이터를 Elasticsearch나 InfluxDB와 같은 저장소에 넣은뒤에 Grafana 대시보드를 통해 확인할 수도 있습니다. Github URL https://github.com/linkedin/Burrow linkedin/Burrow Kafka Consumer Lag Checking. Contribute to linkedin/Burro..

빅데이터/Kafka 2020.03.07
엘라스틱서치에서 field 와 field.keyword 의 차이(text와 keyword)

엘라스틱서치와 키바나를 사용하여 데이터를 조회하다보면 아래와 같이 index pattern이 색인되어 있는것을 종종 볼 수 있습니다. field명이 그대로 나타나는 경우와 field.keyword로 나타나는 경우 입니다. 분명히 field명 key와 value(string)으로 넣었는데 둘의 차이가 무었일까요? text와 keyword 엘라스틱서치 5버전 이후로는 string type은 text와 keyword타입으로 변경되었습니다. 따로 명시적으로 mapping을 하지 않았다면 아래와 같이 엘라스틱서치에 들어가게 됩니다. Input key/value : { "some_field": "string value" } In elasticsearch: { "some_field": { "type" "text", ..

카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala)

Kafka-client library를 사용하여 JVM위에 올라가는 consumer/producer를 작성할 수 있습니다. 이번 포스팅에서는 scala로 Kafka consumer를 멀티쓰레드로 실행하는 애플리케이션 예제 코드를 공유, 설명 드리겠습니다. 제약조건 - Kafka consumer - Multi thread(2개 이상) 지원 - Scala 코드 Scala를 실행하는 멀티쓰레드 카프카 컨슈머 애플리케이션의 파일은 크게 4개로 나뉘어져 있습니다. 먼저, Scala application을 실행하는 Main.scala와 실제로 Consumer역할을 하게 되는 Runnable Thread인 ConsumerWorker.scala, Consumer의 상태를 기록할 ConsumerStatus.scala 마..

빅데이터/Kafka 2020.02.24
pyspark UDF(User Defined Functions) 만들기 방법 및 예제

pyspark를 활용할때 dataframe의 변환과 같은 작업을 위해 UDF가 필요할 때가 있습니다. UDF란 User Defined Functions의 약자로서 사용자가 직접 개발한 method를 뜻합니다. UDF를 만들기 위한 sample 코드를 공유하고자 합니다. 그리고 pyspark에서 dataframe을 처리하려면 아래와 같이 사용하곤합니다. df = sc.read.csv(target_data,header=False,sep="\u0001") processedDf = df.withColumn(target_column, customMethod(df[target_column])) 그리고 Spark에 UDF를 등록하기 위해 아래와 같이 annotation을 붙여주는 방식으로 사용하여 function을 ..

빅데이터 2020.02.13
pyspark에서 gzip으로 압축되어 있는 파일 읽는 방법

pyspark를 통해 다양한 파일을 읽을 수 있습니다. 보통 .text 또는 .log와 같은 확장자로 되어진 plainText를 읽기도 하지만 압축된 파일인 .gz 과 같은 파일을 읽어야할 때도 있습니다. 이렇게 .gz과 같이 압축된 파일을 pyspark를 통해 읽으려면 어떻게 해야할까요? rdd = sc.textFile("data/label.gz") print rdd.take(10) 정답은 생각보다 간단합니다. textFile method를 사용하여 .text나 .log파일을 읽듯이 그대로 입력하여 읽으면 됩니다. Spark Document에 따르면 아래와 같이 나와 있습니다. All of Spark’s file-based input methods, including textFile, support r..

빅데이터/하둡 2020.02.11
pyspark에러 ImportError: No module named XXXX

pyspark로 개발하다 보면 local에서 돌릴때는 정상적으로 돌아가는데 cluster mode(yarn 등)으로 실행할때 아래와 같이 오류가 발생하는 경우가 있습니다. 사실 spark가 돌아가는 방식에 대해 이해하면 쉬운데, 결론부터 말하자면 worker node에 해당 모듈이 깔려 있지 않아서 발생하는 오류입니다. 위 그은Spark가 Cluster deployment mode로 돌아가는 것을 형상화 합니다. 실제로 돌아가는 Task는 각 worker node에서 수행하기 때문에 각 worker node에 module 이 존재하지 않으면 module을 못찾게 되는 것입니다.

빅데이터/하둡 2020.02.10
카프카 auto.offset.reset 종류 및 사용방법

카프카에서 consumer를 새로 생성하여 topic에서부터 데이터를 가져오기 위해서는 여러 옵션이 필요한데 그 중 하나는 auto.offset.reset입니다다. 이 auto.offset.reset의 역할에 대해 알아보겠습니다. 아래 글은 Kafka-client, Kafka broker 2.4 기준으로 작성하였습니다. auto.offset.reset auto.offset.reset에서 offset은 consumer offset입니다. 만약 이번에 topic에 붙은 consumer의 offset정보가 존재하지 않는다면 auto.offset.reset의 default값(latest)이나 또는 설정한 값을 따라가게 됩니다. auto.offset.reset - latest : 가장 마지막 offset부터 - ..

빅데이터/Kafka 2020.02.06
카프카 장애대응 - Consumer offset 지정하기(by partition)

카프카 consumer로 입수를 진행하다보면 예상치 못하게 데이터가 중복입수 또는 유실될 가능성이 있습니다. - Kafka broker 이슈 - Network 이슈 - Consumer application 이슈 위와 같은 이슈가 발생했을 경우 이슈가 발생했던 시점보다 더 이전의 데이터부터 입수를 진행해야하는데 이때 offset을 지정해야합니다. offset을 지정하는 방법은 아래와 같습니다. 1) Consumer 생성 Consumer 생성은 재입수 하고자 하는 Consumer에 지정하여 신규로 생성. 2) Offset 지정(by console shell script) ./kafka-consumer-groups shell을 통해서 offset을 reset할 수 있습니다. offset reset 옵션: --..

빅데이터/Kafka 2020.01.31
AvroFlumeEvent 포멧 java Decoding source

AvroFlumeEvent포멧은 아래와 같은 특징을 가진다. - Header : Map - Body : ByteBuffer AvroFlumeEvent포멧을 사용하기 위해서 필요한 dependency dependencies { compile 'org.apache.flume:flume-ng-core:1.9.0' } DeserializeValue method : private static Event deserializeValue(byte[] value) throws IOException { Event e; DatumReader reader = new SpecificDatumReader(AvroFlumeEvent.class); ByteArrayInputStream in = new ByteArrayInputStr..

빅데이터 2020.01.31
[local hadoop]localhost port 22: Connection refused 에러 발생시 해결방법 in MacOS

맥북에서 테스트를 위해 local hadoop을 띄우기 위해 테스트를 하다보면 아래와 같은 오류를 발생할 때가 있습니다. $ /usr/local/Cellar/hadoop/3.1.2/sbin/stop-all.sh WARNING: Stopping all Apache Hadoop daemons as a1003855 in 10 seconds. WARNING: Use CTRL-C to abort. Stopping namenodes on [localhost] localhost: ssh: connect to host localhost port 22: Connection refused Stopping datanodes localhost: ssh: connect to host localhost port 22: Conne..

빅데이터/하둡 2020.01.16
Kafka burrow 모니터링 하지 않는 consumer group 수동제거방법

Kafka burrow를 통해 모니터링 하다보면 더이상 모니터링 해도 되지 않는 consumer group이 남아있는 경우가 있습니다. 이런 경우에는 burrow의 http endpoint를 통해서 특정 consumer group을 제거하여 모니터링 대상에서 제거할 수 있습니다. 이번 포스팅에서는 어떻게 삭제하는지 알려드리도록 하겠습니다. Burrow에서 Consumer group 제거 URL path DELETE /v3/kafka/(cluster)/consumer/(group) CURL 예제 만약 cluster이름이 dev이고 consumer group이 di-test 라면 아래와 같이 작성합니다. curl -XDELETE http://localhost:8000/v3/kafka/dev/consumer/..

빅데이터/Kafka 2020.01.15
빅데이터에서 사용하는 포멧 종류 및 설명

빅데이터를 다루다보면 다양한 file format을 만나게 됩니다. 오늘 포스팅에서는 file format들에 대해 알아보도록 하겠습니다. Delimiter separated file 가장 많이 쓰이는 raw text기반의 구분문자로 이루어진 배열입니다. 아래와 같이 평문으로도 저장, 조회할 수 있습니다. 아래는 콤마(,)로 구분자를 지정한 CSV(comma-separated values)의 예제를 보여줍니다. 직업,이름,날짜,성별,지역 학생,David,20190204,M,USA 사업가,James,20180305,M,Canada 비슷한 포멧은 아래와 같은 방식들이 있습니다. - TSV : tab separated value - SSV : space separated value 위와 같은 방식들을 모두 합..

빅데이터 2019.12.19
Kafka | MirrorMaker2 가 release되었습니다.

MirrorMaker 2.0은 KIP-382로 등록되어 있던 주제였으며, 2018년 10월 12일에 최초로 jira(KAFKA-7500)가 생성되었습니다. release될 때까지 약 1년이 걸렸는데요. 이번 포스팅에서는 MirrorMaker2.0이 왜 나왔고, 어떤 기능을 포함하고 있는지 살펴보겠습니다. 개발동기 MirrorMaker1은 꽤 오랜기간 사용되어 왔지만 몇가지 이슈가 있었습니다. Legacy MirrorMaker1의 문제점: - Topic들이 kafka default configuration기준으로 생성되었기 때문에 repartition하기 위해 재작업이 필요함 - ACL, configuration의 변경이 source/sink cluster사이에 sync되지 않음 - Record들은 Def..

빅데이터/Kafka 2019.12.18
스파크 스트리밍-Kafka Data source 소개

KafkaSource는 스파크의 Structured Streaming에서 Apache kafka를 data source로 사용하기 위한 목적이다. 이 library의 source는 아래에서 확인할 수 있다. Spark Kafkasource : https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala 2개의 핵심 function을 아래와 같이 정리할 수 있다. - getOffset() : KafkaOffsetrReader를 사용하여 가장 최근의 offset을 가져온다. - getBatch() : offset의 처음부터 끝까지에 존재..

빅데이터/Kafka 2019.12.03
스트림 프로세싱 with Faust - Windows

Faust는 Windowing을 쉽게 지원할 수 있는 기능을 가지고 있다. Windowing을 통해 지난 10분간 데이터의 분석 혹은 매 5분마다 1시간 간격의 데이터 분석과 같은 내용을 수행할 수 있다. 이번 포스팅에서는 Windowing을 사용하는 방법에 대해 알아보고자 한다. Faust는 Hopping, Tumbling window를 지원한다. Tumbling Windows Tumbling window는 중복되지 않은 데이터에 대해 일정간격으로 분석할때 사용된다. from dataclasses import asdict, dataclass from datetime import timedelta import json import random import faust @dataclass class Clic..

빅데이터 2019.11.22
스트림 프로세싱 with Faust - Table

Faust에는 Table이라는 개념이 있다. Table을 생성하면 스트림데이터와 함께 사용 될 수 있다. from dataclasses import asdict, dataclass import json import random import faust @dataclass class ClickEvent(faust.Record): email: str timestamp: str uri: str number: int app = faust.App("exercise6", broker="kafka://localhost:9092") clickevents_topic = app.topic("com.udacity.streams.clickevents", value_type=ClickEvent) # # TODO: Define a..

빅데이터 2019.11.21
스트림 프로세싱 with Faust - Processors, Operations

Faust에는 중요한개념 2가지 Processor과 Operation이 있습니다. Processors 스트림데이터는 끝없는 데이터의 연속적인 흐름입니다. 1개 이상의 Processor들은 callback형태로 동작하게 됩니다. Faust 기반의 application에서 동작하며, function을 사용하여 추가 library등을 조합하여 사용할 수도 있습니다. def add_default_language(value: MyModel) -> MyModel: if not value.language: value.language = 'US' return value async def add_client_info(value: MyModel) -> MyModel: value.client = await get_http_..

빅데이터 2019.11.21
스트림 프로세싱 with Faust - kafka consumer/producer

Faust는 python기반 스트림프로세싱을 위한 library입니다. 아래 코드는 kafka로부터 데이터를 가지고 오는 code입니다. Producer from dataclasses import asdict, dataclass import json import faust # 모듈 import @dataclass # 데이터 직렬화를 위한 json dataclass 선언 for faust class ClickEventSanitized(faust.Record): timestamp: str uri: str number: int app = faust.App("exercise3", broker="kafka://localhost:9092") # 카프카 브로커 clickevents_topic = app.topic("..

빅데이터 2019.11.21
카프카를 쿠버네티스 위에 올리는게 좋은 선택일까?

아래 포스트는 confluent 블로그 글을 토대로 제 의견과 함께 정리한 글입니다. 위 블로그글은 Gwen Shapira(하둡 애플리케이션 아키텍처, 카프카핵심가이드 저자이자 confluent PM)이 작성한 글입니다. 쿠버네티스위에 카프카를 올려야하나? 쿠버네티스를 사용하는 주요 이유 2가지는 아래와 같습니다. - 개발자와 운영자 모두에게 workflow의 효율을 높임으로서 생산성이 높아짐 - "bin packing(1개 가상/물리 장비에서 여러 application을 돌리는 것)"을 통해 리소스 관리 효율화 가능 그런데, 카프카는 운영하기 그렇게 어렵지 않으며, 가끔 node(가상/물리 장비)의 모든 resource를 필요할때가 있습니다. 그렇기 때문에 쿠버네티스 위에서 카프카를 운영하는 것은 그닥..

빅데이터/Kafka 2019.11.07