커넥트 REST API 확장 플러그인 : Connect Rest Extension Plugin
Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework. Hence we must provide ability for users to plug resources that help achieve the required capabilities.
While security is prime use cases for this extension. Its not limited to that. Some of the common use cases are
- Build a custom Authentication filter
- Build a custom Authorization filter
- Complex extensions can even provide filters that rewrite/validate the connector requests to enforce additional constraints on the connector configurations
https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin
KIP-285: Connect Rest Extension Plugin - Apache Kafka - Apache Software Foundation
Status Current state: Accepted Discussion thread: here JIRA: KAFKA-6776 - 이슈 세부사항 가져오는 중... 상태 PR : https://github.com/apache/kafka/pull/4931 Released: 2.0.0 Motivation Connect Framework offers REST API that is used to mange t
cwiki.apache.org
Debezium 사용 예제
GitHub - debezium/debezium-ui: A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ.
A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ. - GitHub - debezium/debezium-ui: A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ.
github.com
@POST
@Path("/connectors/{connector-name}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnector(@PathParam("connector-name") String connectorName) throws ProcessingException, IOException;
@POST
@Path("/connectors/{connector-name}/tasks/{task-number}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnectorTask(@PathParam("connector-name") String connectorName, @PathParam("task-number") int taskNumber) throws ProcessingException, IOException;
// 새로운 endpoint
@GET
@Path("/debezium/transforms")
@Produces("application/json")
List<TransformsInfo> listTransforms() throws ProcessingException, IOException;
@GET
@Path("/debezium/topic-creation")
@Produces("application/json")
Boolean isTopicCreationEnabled() throws ProcessingException, IOException;
추가적인 rest endpoint를 활용한다.
GitHub - debezium/debezium: Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/brows
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ. - GitHub - debezium/debezium: Change data capture for a variety of databases. Please log i...
github.com
@GET
@Path("/transforms")
public List<TransformsInfo> listTransforms() {
return this.getTransforms();
}
private synchronized List<TransformsInfo> getTransforms() {
if (this.transforms.isEmpty()) {
for (PluginDesc<Transformation<?>> plugin : herder.plugins().transformations()) {
if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(plugin.className())) {
this.transforms.add(new TransformsInfo(HasHeaderKey.class.getName(), (new HasHeaderKey<>().config())));
this.transforms.add(new TransformsInfo(RecordIsTombstone.class.getName(), (new RecordIsTombstone<>().config())));
this.transforms.add(new TransformsInfo(TopicNameMatches.class.getName(), (new TopicNameMatches<>().config())));
}
else {
this.transforms.add(new TransformsInfo(plugin));
}
}
}
return Collections.unmodifiableList(this.transforms);
}
@GET
@Path("/topic-creation")
public boolean getTopicCreationEnabled() {
return this.isTopicCreationEnabled;
}
private synchronized Boolean isTopicCreationEnabled() {
Version kafkaConnectVersion = parseVersion(AppInfoParser.getVersion());
String topicCreationProperty = (String) config.get("topic.creation.enable");
if (null == topicCreationProperty) { // when config is not set, default to true
topicCreationProperty = "true";
}
return TOPIC_CREATION_KAFKA_VERSION.compareTo(kafkaConnectVersion) <= 0
&& Boolean.parseBoolean(topicCreationProperty);
}