본문 바로가기

빅데이터/Kafka

커넥트 REST API 확장 플러그인 : Connect Rest Extension Plugin

Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework. Hence we must provide ability for users to plug resources that help achieve the required capabilities.

While security is prime use cases for this extension. Its not limited to that. Some of the common use cases are

- Build a custom Authentication filter
- Build a custom Authorization filter
- Complex extensions can even provide filters that rewrite/validate the connector requests to enforce additional constraints on the connector configurations

https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin 

 

KIP-285: Connect Rest Extension Plugin - Apache Kafka - Apache Software Foundation

Status Current state: Accepted Discussion thread: here  JIRA: KAFKA-6776 - 이슈 세부사항 가져오는 중... 상태 PR : https://github.com/apache/kafka/pull/4931 Released: 2.0.0 Motivation Connect Framework offers REST API that is used to mange t

cwiki.apache.org

 

Debezium 사용 예제

https://github.com/debezium/debezium-ui/blob/main/backend/src/main/java/io/debezium/configserver/rest/client/KafkaConnectClient.java

 

GitHub - debezium/debezium-ui: A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ.

A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ. - GitHub - debezium/debezium-ui: A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ.

github.com

@POST
@Path("/connectors/{connector-name}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnector(@PathParam("connector-name") String connectorName) throws ProcessingException, IOException;

@POST
@Path("/connectors/{connector-name}/tasks/{task-number}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnectorTask(@PathParam("connector-name") String connectorName, @PathParam("task-number") int taskNumber) throws ProcessingException, IOException;

 // 새로운 endpoint 
@GET
@Path("/debezium/transforms")
@Produces("application/json")
List<TransformsInfo> listTransforms() throws ProcessingException, IOException;

@GET
@Path("/debezium/topic-creation")
@Produces("application/json")
Boolean isTopicCreationEnabled() throws ProcessingException, IOException;

추가적인 rest endpoint를 활용한다.

https://github.com/debezium/debezium/blob/aa1982adc50cad10cf1c7e4123335a7900e3d16e/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumResource.java#L114

 

GitHub - debezium/debezium: Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/brows

Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ. - GitHub - debezium/debezium: Change data capture for a variety of databases. Please log i...

github.com

@GET
@Path("/transforms")
public List<TransformsInfo> listTransforms() {
    return this.getTransforms();
}

private synchronized List<TransformsInfo> getTransforms() {
    if (this.transforms.isEmpty()) {
        for (PluginDesc<Transformation<?>> plugin : herder.plugins().transformations()) {
            if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(plugin.className())) {
                this.transforms.add(new TransformsInfo(HasHeaderKey.class.getName(), (new HasHeaderKey<>().config())));
                this.transforms.add(new TransformsInfo(RecordIsTombstone.class.getName(), (new RecordIsTombstone<>().config())));
                this.transforms.add(new TransformsInfo(TopicNameMatches.class.getName(), (new TopicNameMatches<>().config())));
            }
            else {
                this.transforms.add(new TransformsInfo(plugin));
            }
        }
    }

    return Collections.unmodifiableList(this.transforms);
}

@GET
@Path("/topic-creation")
public boolean getTopicCreationEnabled() {
    return this.isTopicCreationEnabled;
}

private synchronized Boolean isTopicCreationEnabled() {
    Version kafkaConnectVersion = parseVersion(AppInfoParser.getVersion());
    String topicCreationProperty = (String) config.get("topic.creation.enable");
    if (null == topicCreationProperty) { // when config is not set, default to true
        topicCreationProperty = "true";
    }
    return TOPIC_CREATION_KAFKA_VERSION.compareTo(kafkaConnectVersion) <= 0
            && Boolean.parseBoolean(topicCreationProperty);
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

반응형