저번 포스트에 이어 distributed로 동작하는 kafka connector를 실습해보겠습니다.
https://blog.voidmainvoid.net/356
준비물
- 카프카 바이너리
- 실행중인 카프카 클러스터
distributed vs standalone
The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics.
connector distributed 모드 준비
카프카 distrubuted모드는 프로세스가 띄어지는 즉시 source 또는 sink되지 않습니다. http를 받는 서버가 실행되고 서버가 스레드를 실행하여 동작하는 것입니다. 아래 연동할 카프카 클러스터에 대해서만 준비합니다.
config/connect-distributed.properties
bootstrap.servers=ec2-kafka:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
connector distributed 모드 실행
$ bin/connect-distributed.sh config/connect-distributed.properties
커낵터가 정상적으로 실행되는지 아래와 같이 curl명령을 통해 확인할 수 있습니다.
$ curl -s localhost:8083
{"version":"2.5.0","commit":"66563e712b0b9f84","kafka_cluster_id":"3_7eP3U_RAiiIFhd2hBp6g"}
$ curl -s localhost:8083/connector-plugins
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},
{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
connector distributed 테스트 및 확인
커낵터에 신규 커낵터를 등록하기 위해서 아래와 같이 명령어를 실행합니다.
$ curl --request POST 'localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "file-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"topic": "connect-test",
"file": "~/test.txt"
}
}'
위 명령어가 정상적으로 실행되었다면 아래 명령어로 커낵터가 등록되었음을 확인할 수 있습니다.
$ curl -s localhost:8083/connectors/file-source-connector
{
"name":"file-source-connector",
"config":{
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"file":"~/test.txt",
"tasks.max":"1",
"name":"file-source-connector",
"topic":"connect-test"
},
"tasks":[
{
"connector":"file-source-connector",
"task":0
}
],
"type":"source"
}
이제 test.txt파일에 데이터를 넣어보겠습니다.
$ echo hello >> test.txt
$ echo kafka >> test.txt
$ echo 123 >> test.txt
토픽의 데이터를 확인합니다.
$ ./kafka-console-consumer.sh --bootstrap-server ec2-kafka:9092 --topic connect-test --from-beginning 130 ↵
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"123"}
console-consumer를 통해 정상적으로 데이터가 들어갔음을 확인할 수 있습니다.
참고자료
https://kafka.apache.org/documentation/#connect
Connector REST api
The following are the currently supported REST API endpoints:
- GET /connectors - return a list of active connectors
- POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
- GET /connectors/{name} - get information about a specific connector
- GET /connectors/{name}/config - get the configuration parameters for a specific connector
- PUT /connectors/{name}/config - update the configuration parameters for a specific connector
- GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
- GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
- GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
- PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
- PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
- POST /connectors/{name}/restart - restart a connector (typically because it has failed)
- POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
- DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration
- GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued
- PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector
Kafka Connect also provides a REST API for getting information about connector plugins:
- GET /connector-plugins - return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars
- PUT /connector-plugins/{connector-type}/config/validate - validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.
The following is a supported REST request at the top-level (root) endpoint:
- GET / - return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to.
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 컨슈머 파티셔너 종류 및 정리(2.5.0 기준) (1) | 2020.07.23 |
---|---|
카프카 프로듀서 파티셔너 종류 및 정리(2.5.0 기준) (1) | 2020.07.23 |
컨슈머 스레드가 많다고 처리량이 높을까? 아닐까? 컨텍스트 스위칭으로 인한 예외 상황 (0) | 2020.07.22 |
Kafka file source connector standalone 모드 실행 (2) | 2020.07.21 |
Kafka-client 사용시 Failed to load class "org.slf4j.impl.StaticLoggerBinder" 에러 해결 방법 (2) | 2020.07.13 |
Kafka ConsumerRecord의 timestamp는 0.10.0.0 이후부터 사용가능합니다. (0) | 2020.07.08 |