본문 바로가기

빅데이터/Kafka

Kafka file source connector standalone 모드 실행

Kafka에는 커낵터가 있습니다. 다양한 커낵터 클래스를 사용하여 컨슈머나 프로듀서 작성 없이 source로 부터 데이터를 카프카로 보내거나 받을 수 있습니다. 오늘은 FileStreamSource를 사용해서 file을 카프카로 보내보겠습니다.

 

준비물

- 카프카 바이너리파일

- 카프카 클러스터

Standalone 모드 준비

Standalone모드란 1개의 프로세스로 동작하는 애플리케이션을 뜻합니다. 커낵터는 distribute모드와 standalone모드를 지원합니다. 실제 환경에서는 distribute모드를 사용하는 것을 권장합니다.

standalone모드를 실행하기 위해 아래와 같이 준비합니다.

connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

위 설정파일에서 어떤 커낵터 클래스를 사용할 인지, 어떤 파일을 어느 토픽으로 보낼것인지 설정합니다.

connect-standalone.properties

bootstrap.servers=ec2-kafka:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets

위 설정 파일에서 연동할 카프카 브로커를 선언하고 어떤 방식으로 데이터를 변환하여 보낼 것인지 정의합니다.


테스트 파일 생성

$ touch test.txt

카프카로 보낼 텍스트 데이터의 파일을 생성합니다.

Standalone 모드 실행

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

위 명령어로 1개 프로세스의 커낵터를 실행할 수 있습니다.

이제 토픽으로 데이터가 정상적으로 전송되는지 확인하기 위해 console consumer를 켭니다.

$ ./kafka-console-consumer.sh --bootstrap-server ec2-kafka:9092 --topic connect-test --from-beginning

Standalone 모드 테스트

테스트를 하기 위해서 아래와 같이 텍스트를 입력합니다. 

$ echo world >> test.txt
$ echo kafka >> test.txt
$ echo hello >> test.txt

위와 같이 입력하면 아래와 같이 console consumer에 json형태의 데이터가 출력됨을 확인할 수 있습니다.

{"schema":{"type":"string","optional":false},"payload":"world"}
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"hello"}