Kafka에는 커낵터가 있습니다. 다양한 커낵터 클래스를 사용하여 컨슈머나 프로듀서 작성 없이 source로 부터 데이터를 카프카로 보내거나 받을 수 있습니다. 오늘은 FileStreamSource를 사용해서 file을 카프카로 보내보겠습니다.
준비물
- 카프카 바이너리파일
- 카프카 클러스터
Standalone 모드 준비
Standalone모드란 1개의 프로세스로 동작하는 애플리케이션을 뜻합니다. 커낵터는 distribute모드와 standalone모드를 지원합니다. 실제 환경에서는 distribute모드를 사용하는 것을 권장합니다.
standalone모드를 실행하기 위해 아래와 같이 준비합니다.
connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
위 설정파일에서 어떤 커낵터 클래스를 사용할 인지, 어떤 파일을 어느 토픽으로 보낼것인지 설정합니다.
connect-standalone.properties
bootstrap.servers=ec2-kafka:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
위 설정 파일에서 연동할 카프카 브로커를 선언하고 어떤 방식으로 데이터를 변환하여 보낼 것인지 정의합니다.
테스트 파일 생성
$ touch test.txt
카프카로 보낼 텍스트 데이터의 파일을 생성합니다.
Standalone 모드 실행
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
위 명령어로 1개 프로세스의 커낵터를 실행할 수 있습니다.
이제 토픽으로 데이터가 정상적으로 전송되는지 확인하기 위해 console consumer를 켭니다.
$ ./kafka-console-consumer.sh --bootstrap-server ec2-kafka:9092 --topic connect-test --from-beginning
Standalone 모드 테스트
테스트를 하기 위해서 아래와 같이 텍스트를 입력합니다.
$ echo world >> test.txt
$ echo kafka >> test.txt
$ echo hello >> test.txt
위와 같이 입력하면 아래와 같이 console consumer에 json형태의 데이터가 출력됨을 확인할 수 있습니다.
{"schema":{"type":"string","optional":false},"payload":"world"}
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"hello"}
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 프로듀서 파티셔너 종류 및 정리(2.5.0 기준) (1) | 2020.07.23 |
---|---|
컨슈머 스레드가 많다고 처리량이 높을까? 아닐까? 컨텍스트 스위칭으로 인한 예외 상황 (0) | 2020.07.22 |
kafka connector distributed 모드로 fileSourceConnector 실행 (2) | 2020.07.21 |
Kafka-client 사용시 Failed to load class "org.slf4j.impl.StaticLoggerBinder" 에러 해결 방법 (2) | 2020.07.13 |
Kafka ConsumerRecord의 timestamp는 0.10.0.0 이후부터 사용가능합니다. (0) | 2020.07.08 |
telegraf사용시 kafka로 데이터 json형태로 보내는 방법 (1) | 2020.07.08 |