본문 바로가기

빅데이터/Kafka

couchbase 카프카 싱크 커넥트 사용 방법

카프카 커넥트는 카프카 클러스터와 기타 데이터베이스간 파이프라인을 반복적으로 만드는데 특화되어 있습니다. 카프카 커넥터는 파이프라인의 구현체 인데요. 싱크 커넥터와 소스 커넥터로 이루어져 있습니다.

 

여기서는 카우치 베이스싱크 커넥터를 살펴봅니다. 카우치베이스 싱크 커넥터는 카프카의 토픽을 카우치베이스로 저장 로직이 담긴 커넥터입니다. 카우치베이스 싱크 커넥터는 at least once 전달을 지원하며 중복이 발생했을 경우에는 재입수가 필요할 수도 있습니다. 

 

카프카 싱크 커넥터를 사용하려면 깃허브 레포지토리에 있는 배포판을 다운받아서 커넥터에 포함시켜 사용할 수 있습니다.

- 카우치베이스 커넥터 깃헙 : https://github.com/couchbase/kafka-connect-couchbase

- 카우치베이스 배포판 다운로드 : https://docs.couchbase.com/kafka-connector/current/release-notes.html

 

카우치 베이스 커넥트 설치

1) 카우치 베이스 커넥터 다운로드

커넥터는 Realease Note에서 다운로드 가능

https://docs.couchbase.com/kafka-connector/current/release-notes.html

다운로드 받으면 다음과 같이 파일이 위치합니다.

$ tree
.
├── assets
│   └── couchbase_logo.png
├── doc
│   ├── LICENSE
│   └── README.adoc
├── etc
│   ├── migrate-config-3-to-4.sh
│   ├── quickstart-couchbase-sink.json
│   ├── quickstart-couchbase-sink.properties
│   ├── quickstart-couchbase-source.json
│   └── quickstart-couchbase-source.properties
├── lib
│   ├── HdrHistogram-2.1.12.jar
│   ├── LatencyUtils-2.0.3.jar
│   ├── core-io-2.2.0.jar
│   ├── dcp-client-0.37.0.jar
│   ├── java-client-3.2.0.jar
│   ├── jsoup-1.14.2.jar
│   ├── kafka-connect-couchbase-4.1.3.jar
│   ├── metrics-core-4.0.7.jar
│   ├── metrics-jmx-4.0.7.jar
│   ├── micrometer-core-1.5.5.jar
│   ├── micrometer-registry-jmx-1.5.5.jar
│   ├── reactive-streams-1.0.3.jar
│   ├── reactor-core-3.4.6.jar
│   ├── slf4j-api-1.7.30.jar
│   └── therapi-runtime-javadoc-0.12.0.jar
└── manifest.json

위 파일들 중 kafka-connect-couchbase-4.1.3.jar는 커넥터 파일이고 나머지는 실행에 필요한 라이브러리 파일들입니다.

 

2-1) 커넥터 파일 위치 설정

$ tree
.
└── connector-plugins
    └── kafka-connect-couchbase-4.1.3.jar

커넥터 파일은 커넥트에서 설정한 디렉토리에 커넥터 jar파일들과 함께 둡니다.

 

 

2-2) 라이브러리 파일 위치 설정

$ tree
.
├── LICENSE
├── NOTICE
├── libs
│   ├── activation-1.1.1.jar
│   ├── aopalliance-repackaged-2.6.1.jar
│   ├── argparse4j-0.7.0.jar
│   ├── audience-annotations-0.5.0.jar
│   ├── commons-cli-1.4.jar
│   ├── commons-lang3-3.8.1.jar
│   ├── connect-api-2.7.0.jar
│   ├── connect-basic-auth-extension-2.7.0.jar

라이브러리 파일은 카프카 커넥트를 실행하는 카프카 바이너리 디렉토리 내부에 위치합니다. 카프카 커넥트를 실행시 여기 있는 jar파일을 포함하여 실행됩니다.

 

카우치 베이스 싱크 커넥트 설정

아래 설정 중 bold체는 필수 설정입니다. 나머지는 기본값으로 자동 설정됩니다.

https://docs.couchbase.com/kafka-connector/current/sink-configuration-options.html

 

Sink Configuration Options | Couchbase Docs

Reference of the sink connector options.

docs.couchbase.com

 

Connection 설정

- couchbase.seed.nodes : 카우치베이스 서버 노드들을 콤마로 나누어 입력합니다

- couchbase.username : 카우치베이스 유저 id 값을 입력합니다

- couchbase.password : 카우칩에이스 유저의 password 값을 입력합니다

- couchbase.bucket : 카우치베이스에 연동할 버킷 이름을 입력합니다

- couchbase.network 

- couchbase.bootstrap.timeout

 

Security 설정

- couchbase.enable.tls

- couchbase.enable.hostname.verification

- couchbase.trust.store.path

- couchbase.trust.store.password

- couchbase.trust.certificate.path

- couchbase.client.certificate.path

- couchbase.client.certificate.passowrd

 

Logging 설정

- couchbase.log.redaction

- couchbase.log.document.lifecycle

 

Sink Behavior 설정

- couchbase.default.collection

- couchbase.topic.to.collection

- couchbase.sink.handler

- couchbase.document.id

- couchbase.remove.document.id

- couchbase.document.expiration

 

Durability 설정

- couchbase.durability

- couchbase.persist.to

- couchbase.replicate.to

 

N1ql Sink Handler 설정

- couchbase.n1ql.operation

- couchbase.n1ql.where.fields

- couchbase.nq1l.create.document

 

Sub Document Sink Handler 설정

- couchbase.subdocument.path

- couchbase.subdocument.operation

- couchbase.subdocument.create.path

- couchbase.subdocument.create.document

 

Couchbase Java SDK 설정

- couchbase.env.*

 

카우치 베이스 싱크 커넥터 파이프라인 추가

POST http://localhost:8083/connectors

{
  "name": "couchbase-sink-test",
  "config": {
    "connector.class": "com.couchbase.connect.kafka.CouchbaseSinkConnector",
    "tasks.max": "1",
    "topics": "sink-test",
    "couchbase.seed.nodes": "local-couchbase",
    "couchbase.bucket": "test",
    "couchbase.username": "admin",
    "couchbase.password": "admin",
    "value.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

org.apache.kafka.connect.json.JsonConverter로 컨버터를 설정할 경우 String으로 들어온 json 값을 json으로 포맷을 변경하여 카우치 베이스에 데이터를 저장합니다.

반응형