본문 바로가기

빅데이터/Kafka

아파치 카프카 테스트용 data generator 소개 - ksql-datagen

아파치 카프카는 대규모 분산 스트리밍 플랫폼으로서 데이터파이프라인을 만들때 주로 사용이 가능하다. 데이터파이프라인을 만듦에 있어서 어떤 용도로 어떻게 동작하는지 확인하기 위해서는 직접 consumer로 데이터를 넣어주어 producer을 개발하거나 혹은 처음엔 cli producer/consumer을 사용하여 data를 topic에 넣어준다.

 

Confluent사에서 제공하는 ksql-datagen을 사용한다면 여러 format으로 data를 auto generate하여 topic에 produce가능하다. ksql-datagen은 ksql을 설명하기 위해 처음 소개되었지만, ksql뿐만 아니라 kafka에서 consumer을 테스트하거나 다양한 format(avro 등)을 테스트하기에도 알맞다. 이번 포스팅에서는 ksql-datagen를 소개하고 사용하는 방법에 대해서 이야기해보고자 한다.

 

KSQL-DATAGEN

ksql-datgen은 custom data를 generate하여 특정 kafka의 topic에 produce하는 역할을 수행한다. command-line tool로서 json, avro 등의 schema를 생성하도록 지원한다. 각 데이터는 random한 값을 가져서 테스트하기에 적합하다.

 

1) ksql-datagen 다운로드

ksql-datagen은 Confluent Platform을 다운로드하여 사용할 수 있다. local macbook에서 사용하기 위해 zip file로 다운로드 받는 것을 추천한다. Java 1.8이상이 컴퓨터에 설치되어 있어야 한다.

 

다운로드 링크 : https://www.confluent.io/download/plans

 

2) ksql-datagen 실행파일 확인

1)에서 다운로드 받은 confluent platform에서 아래 directory로 가면 ksql-datagen 실행파일이 있는 것을 확인 할 수 있다.

/Users/voidmainvoid/Documents/github/confluent-5.3.1/
└── bin
    └── ksql-datagen

3) ksql-datagen의 parameter 설명

ksql-datagen을 통해 데이터를 생성하려면 아래와 같은 파라미터를 통해 어떤 데이터를 생성할 것인지 선언 할 수 있다.

 

기본파라미터

  • topic=<kafka topic name> : 대상 kafka topic 이름
  • quickstart=<quickstart preset> : 전달할 데이터의 종류(orders, users, pageviews 3개 지원)

옵션파라미터

  • bootstrap-server=<kafka-server>:<port> : topic에 전달할 broker ip list
  • format=<record format> : kafka topic에 넣을 파일 포맷(json, avro, delimited 3개 지원)
  • key=<name of key column> : key를 사용할 경우 key column이름
  • schema=<avro schema file> : avro 스키마를 사용할 경우 avro shcema file 경로
  • iterations=<number of records> : topic에 전달할 data 개수
  • maxInterval=<max time between records> : 새로운 data를 전달할때 간격 단위
  • propertiesFile=<path-to-properties-file> : ksql-datagen 프로퍼티 파일
  • schemaRegistryUrl=<schema registry http url> : avro 포맷을 사용할 경우  

 

 

 

4) ksql-datagen 사용하기

Json 포맷으로 users preset으로 데이터 10개를 test-wonyoung이라는 topic에 보내려면 아래와 같이 명령어를 사용하면 된다.

$ ./ksql-datagen format=json topic=test-wonyoung quickstart=users bootstrap-server=localhost:9092 iterations=10
[2019-10-10 14:36:31,495] INFO AvroDataConfig values:
	connect.meta.data = true
	enhanced.avro.schema.support = false
	schemas.cache.config = 1
 (io.confluent.connect.avro.AvroDataConfig:347)
User_2 --> ([ 1505538761957 | 'User_2' | 'Region_3' | 'MALE' ]) ts:1570685792017
User_1 --> ([ 1507698894244 | 'User_1' | 'Region_4' | 'MALE' ]) ts:1570685792293
User_9 --> ([ 1503710570872 | 'User_9' | 'Region_5' | 'MALE' ]) ts:1570685792683
User_5 --> ([ 1514465638574 | 'User_5' | 'Region_1' | 'FEMALE' ]) ts:1570685792977
User_1 --> ([ 1517531975192 | 'User_1' | 'Region_8' | 'OTHER' ]) ts:1570685793200
User_5 --> ([ 1513642309354 | 'User_5' | 'Region_9' | 'MALE' ]) ts:1570685793327
User_7 --> ([ 1495418905616 | 'User_7' | 'Region_4' | 'MALE' ]) ts:1570685793584
User_7 --> ([ 1505359635173 | 'User_7' | 'Region_9' | 'OTHER' ]) ts:1570685793683
User_4 --> ([ 1500563286319 | 'User_4' | 'Region_7' | 'MALE' ]) ts:1570685794158
User_6 --> ([ 1501889487021 | 'User_6' | 'Region_6' | 'MALE' ]) ts:1570685794490

정상적으로 들어오는지 해당 topic을 cli consumer로 확인해본다.

$ /usr/local/Cellar/kafka/2.0.0/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test-wonyoung
{"registertime":1505538761957,"userid":"User_2","regionid":"Region_3","gender":"MALE"}
{"registertime":1507698894244,"userid":"User_1","regionid":"Region_4","gender":"MALE"}
{"registertime":1503710570872,"userid":"User_9","regionid":"Region_5","gender":"MALE"}
{"registertime":1514465638574,"userid":"User_5","regionid":"Region_1","gender":"FEMALE"}
{"registertime":1517531975192,"userid":"User_1","regionid":"Region_8","gender":"OTHER"}
{"registertime":1513642309354,"userid":"User_5","regionid":"Region_9","gender":"MALE"}
{"registertime":1495418905616,"userid":"User_7","regionid":"Region_4","gender":"MALE"}
{"registertime":1505359635173,"userid":"User_7","regionid":"Region_9","gender":"OTHER"}
{"registertime":1500563286319,"userid":"User_4","regionid":"Region_7","gender":"MALE"}
{"registertime":1501889487021,"userid":"User_6","regionid":"Region_6","gender":"MALE"}

 

만약 delimited 포멧으로 pageview preset을 5개 보내고 싶다면 아래와 같이 명령어를 내려 확인할 수 있다.

$ ./ksql-datagen format=delimited topic=test-wonyoung quickstart=pageviews bootstrap-server=localhost:9092 iterations=5
[2019-10-10 14:42:08,385] INFO AvroDataConfig values:
	connect.meta.data = true
	enhanced.avro.schema.support = false
	schemas.cache.config = 1
 (io.confluent.connect.avro.AvroDataConfig:347)
1 --> ([ 1570686128508 | 'User_3' | 'Page_52' ]) ts:1570686128755
11 --> ([ 1570686129010 | 'User_9' | 'Page_94' ]) ts:1570686129011
21 --> ([ 1570686129353 | 'User_1' | 'Page_39' ]) ts:1570686129353
31 --> ([ 1570686129845 | 'User_1' | 'Page_66' ]) ts:1570686129845
41 --> ([ 1570686130002 | 'User_5' | 'Page_80' ]) ts:1570686130002
$ /usr/local/Cellar/kafka/2.0.0/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test-wonyoung
1570686128508,User_3,Page_52
1570686129010,User_9,Page_94
1570686129353,User_1,Page_39
1570686129845,User_1,Page_66
1570686130002,User_5,Page_80

 

의견

Kafka를 사용하면서 항상 목말랐던 것이 어떤 데이터로 테스트할 것인가 였다. 직접 producer로 delimited라던가 json과 같은 데이터를 만들어 내려니 어떤 형식으로 어떻게 만들어야하는지도 감이 안잡힌다. 그러나 ksql-datagen은 여러 preset을 통해 가상의 데이터를 수십만개 이상 실시간으로 만들어내어 여러 테스트를 용이하게 해주어 아주 유용하다. 아파치 kafka를 새로 접하는 개발자, kafka에서 데이터를 넣어 직접 테스트하고 싶은 개발자 혹은 kafka에 대해 공부하는 학생들에게 유용한 tool이 될 것임에 틀림없다.

 

출처 : Generate Custom Test Data by Using the ksql-datagen tool