본문 바로가기

빅데이터/Kafka

kafka connector distributed 모드로 fileSourceConnector 실행

반응형

저번 포스트에 이어 distributed로 동작하는 kafka connector를 실습해보겠습니다.

https://blog.voidmainvoid.net/356

 

Kafka file source connector standalone 모드 실행

Kafka에는 커낵터가 있습니다. 다양한 커낵터 클래스를 사용하여 컨슈머나 프로듀서 작성 없이 source로 부터 데이터를 카프카로 보내거나 받을 수 있습니다. 오늘은 FileStreamSource를 사용해서 file을

blog.voidmainvoid.net

준비물

- 카프카 바이너리
- 실행중인 카프카 클러스터    

distributed vs standalone

The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics. 

connector distributed 모드 준비

카프카 distrubuted모드는 프로세스가 띄어지는 즉시 source 또는 sink되지 않습니다. http를 받는 서버가 실행되고 서버가 스레드를 실행하여 동작하는 것입니다. 아래 연동할 카프카 클러스터에 대해서만 준비합니다.

config/connect-distributed.properties

bootstrap.servers=ec2-kafka:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

connector distributed 모드 실행

$ bin/connect-distributed.sh config/connect-distributed.properties

커낵터가 정상적으로 실행되는지 아래와 같이 curl명령을 통해 확인할 수 있습니다.

$ curl -s localhost:8083
{"version":"2.5.0","commit":"66563e712b0b9f84","kafka_cluster_id":"3_7eP3U_RAiiIFhd2hBp6g"}
$ curl -s localhost:8083/connector-plugins
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},
{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

connector distributed 테스트 및 확인

커낵터에 신규 커낵터를 등록하기 위해서 아래와 같이 명령어를 실행합니다.

$ curl --request POST 'localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "file-source-connector",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "tasks.max": "1",
        "topic": "connect-test",
        "file": "~/test.txt"
    }
}'

위 명령어가 정상적으로 실행되었다면 아래 명령어로 커낵터가 등록되었음을 확인할 수 있습니다.

$ curl -s localhost:8083/connectors/file-source-connector
{
   "name":"file-source-connector",
   "config":{
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file":"~/test.txt",
      "tasks.max":"1",
      "name":"file-source-connector",
      "topic":"connect-test"
   },
   "tasks":[
      {
         "connector":"file-source-connector",
         "task":0
      }
   ],
   "type":"source"
}

이제 test.txt파일에 데이터를 넣어보겠습니다.

$ echo hello >> test.txt
$ echo kafka >> test.txt
$ echo 123 >> test.txt

토픽의 데이터를 확인합니다.

$ ./kafka-console-consumer.sh --bootstrap-server ec2-kafka:9092 --topic connect-test --from-beginning                     130 ↵
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"123"}

console-consumer를 통해 정상적으로 데이터가 들어갔음을 확인할 수 있습니다.

 

참고자료

https://kafka.apache.org/documentation/#connect

Connector REST api

The following are the currently supported REST API endpoints:

GET /connectors - return a list of active connectors
POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
GET /connectors/{name} - get information about a specific connector
GET /connectors/{name}/config - get the configuration parameters for a specific connector
PUT /connectors/{name}/config - update the configuration parameters for a specific connector
GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
POST /connectors/{name}/restart - restart a connector (typically because it has failed)
POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration
- GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued
PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector

Kafka Connect also provides a REST API for getting information about connector plugins:

- GET /connector-plugins - return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars
- PUT /connector-plugins/{connector-type}/config/validate - validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.

The following is a supported REST request at the top-level (root) endpoint:

 - GET / - return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to.

 

반응형
  • James Lee 2021.09.28 15:21 댓글주소 수정/삭제 댓글쓰기

    글 잘 읽었습니다.
    해당 예제로 테스트를 해보던 중 실제 실무에서는 파일을 어떻게 쓰는지 궁금합니다.
    echo 하지는 않을것이고 카프카 커넥트가 카프카 클러스터 쪽에서 수행하고 파일을 쓰는 클라이언트는 다른 머신에 있을 텐데 원격으로 scp 등을 이용해서 파일을 엎어치기를 하는 것인지..
    제가 그렇게 테스트하니까 중간에 한동안 컨슈머에서 파일을 읽지 못하더라구요.
    파일을 열고 계속 write하는 것은 잘 반영이 되었습니다.
    그렇다면 원격으로 파일을 열고 계속 써야 하는 것인지..
    그럼 파일사이즈가 커져서 더 이상 사용 못할 때에는 어떻게 하는지 궁금합니다.

    • 글 잘봐주셔서 감사합니다^^ 실무에서는 파일 커넥터는 잘 안쓰였던거 같고 하둡이나 엘라스틱서치와 같은 데이터베이스와 연동하는 용도로 파이프라인을 생성할 때 커넥터를 사용하는 모습이 많이 보입니다.
      그리고 파일사이즈문제는 파일이름에 적재하는 시점의 timestamp등을 찍는 방법이 있습니다.