본문 바로가기

빅데이터/Kafka

kafka connector distributed 모드로 fileSourceConnector 실행

저번 포스트에 이어 distributed로 동작하는 kafka connector를 실습해보겠습니다.

https://blog.voidmainvoid.net/356

 

Kafka file source connector standalone 모드 실행

Kafka에는 커낵터가 있습니다. 다양한 커낵터 클래스를 사용하여 컨슈머나 프로듀서 작성 없이 source로 부터 데이터를 카프카로 보내거나 받을 수 있습니다. 오늘은 FileStreamSource를 사용해서 file을

blog.voidmainvoid.net

준비물

- 카프카 바이너리
- 실행중인 카프카 클러스터    

distributed vs standalone

The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics. 

connector distributed 모드 준비

카프카 distrubuted모드는 프로세스가 띄어지는 즉시 source 또는 sink되지 않습니다. http를 받는 서버가 실행되고 서버가 스레드를 실행하여 동작하는 것입니다. 아래 연동할 카프카 클러스터에 대해서만 준비합니다.

config/connect-distributed.properties

bootstrap.servers=ec2-kafka:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

connector distributed 모드 실행

$ bin/connect-distributed.sh config/connect-distributed.properties

커낵터가 정상적으로 실행되는지 아래와 같이 curl명령을 통해 확인할 수 있습니다.

$ curl -s localhost:8083
{"version":"2.5.0","commit":"66563e712b0b9f84","kafka_cluster_id":"3_7eP3U_RAiiIFhd2hBp6g"}
$ curl -s localhost:8083/connector-plugins
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},
{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

connector distributed 테스트 및 확인

커낵터에 신규 커낵터를 등록하기 위해서 아래와 같이 명령어를 실행합니다.

$ curl --request POST 'localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "file-source-connector",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "tasks.max": "1",
        "topic": "connect-test",
        "file": "~/test.txt"
    }
}'

위 명령어가 정상적으로 실행되었다면 아래 명령어로 커낵터가 등록되었음을 확인할 수 있습니다.

$ curl -s localhost:8083/connectors/file-source-connector
{
   "name":"file-source-connector",
   "config":{
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file":"~/test.txt",
      "tasks.max":"1",
      "name":"file-source-connector",
      "topic":"connect-test"
   },
   "tasks":[
      {
         "connector":"file-source-connector",
         "task":0
      }
   ],
   "type":"source"
}

이제 test.txt파일에 데이터를 넣어보겠습니다.

$ echo hello >> test.txt
$ echo kafka >> test.txt
$ echo 123 >> test.txt

토픽의 데이터를 확인합니다.

$ ./kafka-console-consumer.sh --bootstrap-server ec2-kafka:9092 --topic connect-test --from-beginning                     130 ↵
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"123"}

console-consumer를 통해 정상적으로 데이터가 들어갔음을 확인할 수 있습니다.

 

참고자료

https://kafka.apache.org/documentation/#connect

Connector REST api

The following are the currently supported REST API endpoints:

GET /connectors - return a list of active connectors
POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
GET /connectors/{name} - get information about a specific connector
GET /connectors/{name}/config - get the configuration parameters for a specific connector
PUT /connectors/{name}/config - update the configuration parameters for a specific connector
GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
POST /connectors/{name}/restart - restart a connector (typically because it has failed)
POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration
- GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued
PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector

Kafka Connect also provides a REST API for getting information about connector plugins:

- GET /connector-plugins - return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars
- PUT /connector-plugins/{connector-type}/config/validate - validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.

The following is a supported REST request at the top-level (root) endpoint:

 - GET / - return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to.