본문 바로가기

빅데이터/Kafka

카프카의 토픽 데이터를 REST api로 주고받자 - Kafka rest proxy 사용

confluent에서는 rest proxy라고 불리는 카프카 클러스터를 위한 RESTful interface application을 오픈소스로 제공하고 있습니다. 기존에 Kafka connect, Kafka client로 데이터를 전달하는 것과는 사뭇 다르게 REST api를 사용한다는점이 독특한데요. 직접 코드를 짜지 않고 범용적으로 사용되는 http을 사용해서 데이터를 넣고 뺄 수 있다는 점이 독특합니다. 오늘은 rest proxy를 local에 설치하고 실행해보겠습니다.

 

준비물

- local kafka cluster
- git
- terminal
- postman

다운로드 및 실행

rest proxy를 사용하기 위해서는 rest proxy가 포함된 confluent의 community package를 다운로드 받아야합니다. 아래 명령어를 통해 패키지를 다운로드 할 수 있습니다.

$ curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.0-2.12.zip
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  384M  100  384M    0     0  7754k      0  0:00:50  0:00:50 --:--:-- 19.8M

다운로드한 파일의 압축을 풀어서 etc/kafka-rest/kafka-rest.properties 파일로 설정을 수정할 수 있습니다. kafka-rest.properties에는 카프카 클러스터 주소가 들어가야하는데, 기본 설정은 아래와 같이 되어있으므로 필요시에 카프카 클러스터, 주키퍼 주소를 변경해야합니다. 만약 스키마 레지스트리를 사용중이라면 스키마레지스트리도 등록합니다.

#id=kafka-rest-test-server
#schema.registry.url=http://localhost:8081
#zookeeper.connect=localhost:2181
bootstrap.servers=PLAINTEXT://localhost:9092
#
# Configure interceptor classes for sending consumer and producer metrics to Confluent Control Center
# Make sure that monitoring-interceptors-<version>.jar is on the Java class path
#consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
#producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor

이제 다시 압축을 풀었던 디렉토리로 돌아가서 아래와 같이 실행하면 rest proxy 가 실행됩니다.

$ bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties

토픽 생성 및 전송

rest proxy를 통해 전송할수 있는 데이터 타입은 다양한데 이 포스트에서는 json데이터를 기반으로 전송해보겠습니다. rest proxy에서는 토픽을 생성할 때 데이터 타입을 지정해서 생성해야 합니다. 아래 POST http request를 통해 json 데이터타입을 지원하는 토픽을 생성하고 데이터를 전송할 수 있습니다. 다만, 이 때 카프카 클러스터의 auto.create.topics.enable 옵션이 true로 되어 있어야 토픽이 생성됩니다.

$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      --data '{"records":[{"value":{"name":"Anderson"}}]}' "http://localhost:8082/topics/person"
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

Content-Type과 Accept 헤더를 지정하고, --data 옵션으로 전송할 record의 json 데이터를 넣어줍니다. 그리고 신규로 생성할 토픽명은 http:// 경로의 가장 마지막에 넣어줍니다. 이번에 생성한 토픽명은 person 입니다.

아래 GET http request를 통해 토픽 리스트를 확인할 수도 있습니다.

$ curl -X GET "http://localhost:8082/topics/"
["person"]

해당 토픽에 대한 상세한 정보를 알고싶다면 아래와 같이 topics 뒤에 토픽명을 붙여줍니다.

$ curl -X GET -H "Accept: application/vnd.kafka.v2+json" "http://localhost:8082/topics/person"
{
   "name":"person",
   "configs":{
      "message.downconversion.enable":"true",
      "file.delete.delay.ms":"60000",
      "segment.ms":"604800000",
      "min.compaction.lag.ms":"0",
      "retention.bytes":"-1",
      "segment.index.bytes":"10485760",
      "cleanup.policy":"delete",
      "max.compaction.lag.ms":"9223372036854775807",
      "follower.replication.throttled.replicas":"",
      "message.timestamp.difference.max.ms":"9223372036854775807",
      "segment.jitter.ms":"0",
      "preallocate":"false",
      "message.timestamp.type":"CreateTime",
      "message.format.version":"2.5-IV0",
      "segment.bytes":"1073741824",
      "unclean.leader.election.enable":"false",
      "max.message.bytes":"1048588",
      "retention.ms":"604800000",
      "flush.ms":"9223372036854775807",
      "delete.retention.ms":"86400000",
      "leader.replication.throttled.replicas":"",
      "min.insync.replicas":"1",
      "flush.messages":"9223372036854775807",
      "compression.type":"producer",
      "index.interval.bytes":"4096",
      "min.cleanable.dirty.ratio":"0.5"
   },
   "partitions":[
      {
         "partition":0,
         "leader":0,
         "replicas":[
            {
               "broker":0,
               "leader":true,
               "in_sync":true
            }
         ]
      }
   ]
}

위 response json으로 해당 토픽에 대한 모든 정보를 확인할 수 있습니다.

토픽 소비(Consumer)

이제 토픽도 만들어졌고 데이터도 전송되었으니 토픽으로 부터 데이터를 가져와보겠습니다. 토픽의 데이터를 가져오기 위해서는 우선 명시적으로 group.id를 등록해야합니다. 그리고 토픽을 가져올 instance명도 같이 등록합니다.

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "person_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/person_consumer
{"instance_id":"person_consumer_instance","base_uri":"http://localhost:8082/consumers/person_consumer/instances/person_consumer_instance"}

- group.id : person_consumer
- instance : person_consumer_instance

이제 group.id와 instance를 등록하였으므로 해당 group.id와 instance로 구독(subscription)하도록 아래와 같이 수행합니다.

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["person"]}' \
 http://localhost:8082/consumers/person_consumer/instances/person_consumer_instance/subscription

이제 마지막 단계로 records http api를 통해 데이터를 들고오도록 하겠습니다. 아래 GET http request를 통해 데이터를 가져올 수 있습니다.

$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/person_consumer/instances/person_consumer_instance/records
[{"topic":"person","key":null,"value":{"name":"Anderson"},"partition":0,"offset":0}]

정상적으로 person 토픽으로 부터 json value를 가져온 모습을 확인하실 수 있습니다. 추가적으로 파티션 번호와 offset번호도 확인할 수 있습니다.

결론

Confluent에서 제공하는 rest proxy는 간단하면서도 강력합니다. Consumer, Producer, Admin 등 Kafka-client에서 사용하던 다양한 기능들을 rest api를 통해 쉽게 연동할 수 있습니다. Kafka-client로 직접 구현하는 것이 비해 상세한 옵션을 사용하지 못하것이 아쉽고, 안정성 측면에서 rest proxy를 사용하는게 더 낫다고는 절대 말할 수 없지만, Kafka-client를 사용하지 못하는 일부 환경에서 rest proxy는 카프카와 연동하기 위핸 최후의 수단이라고 말할 수 있을것 같습니다. 

 

rest proxy는 카프카의 생태계의 범위를 넓혀주는 역할도 충실히 하고 있으며, 사용도 매우 쉽습니다. 사내에 카프카 클러스터를 운영하고 있다면 추가적으로 rest proxy를 연동하여 producer/cosumer역할 외에 토픽 조회와 같은 기능을 사용하는데 활용한다면 아주 유용할 것 같습니다. rest proxy는 confluent license를 따르므로 해당 라이센스에 대한 내용을 확인하여 사내에 적용하는 것이 중요할 것 같습니다.