빅데이터/Kafka
커넥트 REST API 확장 플러그인 : Connect Rest Extension Plugin
AndersonChoi
2022. 10. 4. 20:59
Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework. Hence we must provide ability for users to plug resources that help achieve the required capabilities.
While security is prime use cases for this extension. Its not limited to that. Some of the common use cases are
- Build a custom Authentication filter
- Build a custom Authorization filter
- Complex extensions can even provide filters that rewrite/validate the connector requests to enforce additional constraints on the connector configurations
https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin
Debezium 사용 예제
@POST
@Path("/connectors/{connector-name}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnector(@PathParam("connector-name") String connectorName) throws ProcessingException, IOException;
@POST
@Path("/connectors/{connector-name}/tasks/{task-number}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnectorTask(@PathParam("connector-name") String connectorName, @PathParam("task-number") int taskNumber) throws ProcessingException, IOException;
// 새로운 endpoint
@GET
@Path("/debezium/transforms")
@Produces("application/json")
List<TransformsInfo> listTransforms() throws ProcessingException, IOException;
@GET
@Path("/debezium/topic-creation")
@Produces("application/json")
Boolean isTopicCreationEnabled() throws ProcessingException, IOException;
추가적인 rest endpoint를 활용한다.
@GET
@Path("/transforms")
public List<TransformsInfo> listTransforms() {
return this.getTransforms();
}
private synchronized List<TransformsInfo> getTransforms() {
if (this.transforms.isEmpty()) {
for (PluginDesc<Transformation<?>> plugin : herder.plugins().transformations()) {
if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(plugin.className())) {
this.transforms.add(new TransformsInfo(HasHeaderKey.class.getName(), (new HasHeaderKey<>().config())));
this.transforms.add(new TransformsInfo(RecordIsTombstone.class.getName(), (new RecordIsTombstone<>().config())));
this.transforms.add(new TransformsInfo(TopicNameMatches.class.getName(), (new TopicNameMatches<>().config())));
}
else {
this.transforms.add(new TransformsInfo(plugin));
}
}
}
return Collections.unmodifiableList(this.transforms);
}
@GET
@Path("/topic-creation")
public boolean getTopicCreationEnabled() {
return this.isTopicCreationEnabled;
}
private synchronized Boolean isTopicCreationEnabled() {
Version kafkaConnectVersion = parseVersion(AppInfoParser.getVersion());
String topicCreationProperty = (String) config.get("topic.creation.enable");
if (null == topicCreationProperty) { // when config is not set, default to true
topicCreationProperty = "true";
}
return TOPIC_CREATION_KAFKA_VERSION.compareTo(kafkaConnectVersion) <= 0
&& Boolean.parseBoolean(topicCreationProperty);
}
반응형