본문 바로가기

빅데이터/Kafka

Kafka | MirrorMaker2 가 release되었습니다.

MirrorMaker 2.0은 KIP-382로 등록되어 있던 주제였으며, 2018년 10월 12일에 최초로 jira(KAFKA-7500)가 생성되었습니다. release될 때까지 약 1년이 걸렸는데요. 이번 포스팅에서는 MirrorMaker2.0이 왜 나왔고, 어떤 기능을 포함하고 있는지 살펴보겠습니다.


개발동기

MirrorMaker1은 꽤 오랜기간 사용되어 왔지만 몇가지 이슈가 있었습니다.

 

Legacy MirrorMaker1의 문제점:

- Topic들이 kafka default configuration기준으로 생성되었기 때문에 repartition하기 위해 재작업이 필요함

- ACL, configuration의 변경이 source/sink cluster사이에 sync되지 않음

- Record들은 DefaultPartitioner에 의해서만 repartition됨

- 새로운 topic을 위한 whitelist를 수정한다던가 기타 mirror maker 설정을 변경할때 재시작 필요

- Exactly-once delivery를 보장하지 않음. Record가 일부 중복될 가능성이 있음

- Cluster상호간 동시 mirrorMaker 운영이 불가

- Rebalancing이 일어날때 latency가 치솟는 현상이 존재

 

위와 같은 이유로 MirrorMaker1은 운영하기 까다로웠으며 fail-over 시나리오나 disaster recovery에 있어서도 적절하지 않았습니다. Apache에서는 이러한 단점을 파악하고 아래와 같은 점을 위주로 legacy MirrorMaker과 다른 아키텍쳐로 개발을 진행하게 되었습니다.

 

신규 MirrorMaker의 요구사항:

- Kafka connect framework와 관련 생태계와 어울어져야함

- source, sink connector가 존재해야함

- high-level driver을 통해 connector들을 제어할 수 있어야함

- new topic, partiton에 대해 감지 할 수 있어야함

- 두개 클러스터 사이에 topic configuration에 대한 정보를 자동으로 sync할 수 있어야함

- Cluster상호간 동시 MirrorMaker 운영이 가능해야함

- Metric정보를 제공해야함

- legacy MirrorMaker모드로 운영 가능하게 되야함

- 리밸런싱이 일어나지 않도록 해야함

 

MirrorMaker2의 변경사항

Remote Topics, Partitions

Source cluster에서 Remote cluster로 topic을 mirroring할 때 몇가지 고려사항이 있습니다.

 

Topic mirroring 고려사항:

- 두 cluster 간에 각 partition의 record의 ordering(순서)가 동일해야함

- 두 cluster 간의 topic partition개수가 동일해야함

- Mirroring을 받는 cluster의 topic은 특정 topic으로부터만 record를 받아야함

- Mirroring을 받는 cluster의 partition은 특정 topic의 partition으로부터만 record를 받아야함

- Mirroring을 받는 cluster의 partition number과 보내는 cluster의 partition number은 동일해야함

 

MirrorMaker2는 위와 같은 고려사항을 만족시키면서 동작합니다. 뿐만아니라 양 cluster간 크로스 mirroring(이를 active/active replication이라고 부릅니다)시에 topic에 대한 renaming 규칙을 적용할 수도 있습니다. 

"active/active" replication

위와 같은 동작은 legacy MirrorMaker에서는 지원하지 않습니다. 

 

Aggregation

MirrorMaker2가 downstream consumer로 동작을 할 경우 2개 이상의 cluster로 부터 topic을 mirroring할 수 있습니다. 필요에 따라서 MirrorMaker을 통해 2개이상의 cluster에서 부터 topic을 merge하여 1개의 aggregate cluster에 mirroring할 수도 있습니다.

 

Cycle detection

2개의 cluster가 양쪽으로 mirroring을 진행할 경우, 서로간에 mirroring하는 configure(whitelist 등)을 공유할 수도 있습니다. 이 동작이 무한반복되는 것을 막기 위해 topic이름에 cluster name prefix가 붙는 경우, mirroring을 진행하지 않습니다.

 

Config, ACL Sync

MirrorMaker2는 source topic들을 모니터링하며 변경사항이 있을 경우 remote topic의 configuration을 수정합니다. 만약 partition 정합성이 맞지 않는다면 자동으로 partition개수를 맞출것입니다. 

 

Internal Topics

MirrorMaker2는 각 source cluster에 heartbeat topic에 mirroring관련 정보를 저장합니다.

 

Heartbeat topic에 저장되는 record schema:

- Target cluster(String)

- Source cluster(String) 

- Timestamp(long)

 

이와 동시에 checkpoint도 기록하게 되는데 아래와 같은 정보를 저장합니다.

 

Checkpoint topic에 저장되는 field:

- consumer group id (String)
- topic (String) 
- partition (int)
- upstream offset (int)
- downstream offset (int)
- metadata (String)
- timestamp

 

마지막으로 offset sync topic에는 cluster-to-cluster의 topic별, partition별 offset mapping정보를 저장합니다.

 

Offset sync topic에 저장되는 정보:

- topic (String)
- partition (int)
- upstream offset (int)
- downstream offset (int)

 

MirrorMaker2 Connector 종류

- MirrorSourceConnector

- MirrorSinkConnector

- MirrorCheckpointConnector

- MirrorHeartBeatConnector

 

MirrorMaker2 configuration 예제

./config/connect-mirror-source.properties 의 예제

name = local-mirror-source
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
 
 
# for demo, source and target clusters are the same
source.cluster.alias = upstream
source.cluster.bootstrap.servers = localhost:9092
target.cluster.bootstrap.servers = localhost:9092
 
 
# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

위와 같은 properties를 standalone모드 mirrormaker2로 실행시키려면 아래와 같은 명령어를 사용하면 된다.

$ ./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-mirror-source.properties

 

MirrorMaker2를 운영하는 방법

MirrorMaker2를 운영하는 4가지 방법:

- As a dedicated MirrorMaker cluster
- As a Connector in a distributed Connect cluster
- As a standalone Connect worker
- In legacy mode using existing MirrorMaker scripts

 

 

As a dedicated MirrorMaker cluster

이 방법은 MirrorMaker2를 운영하기 위해 connect cluster를 필요로 하지 않습니다. 다만 Connect worker들을 제어하기 위해 high-level driver을 통해 작업해야합니다.

 

1) Kafka cluster information을 설정

# mm2.properties
clusters = us-west, us-east
us-west.bootstrap.servers = host1:9092
us-east.bootstrap.servers = host2:9092

1-1) 필요할 경우, MirrorMaker properties도 추가

topics = .*
groups = .*
emit.checkpoints.interval.seconds = 10

1-2) 추가적으로 특정 cluster 혹은 connector의 default properties를 override할 수 있음

us-west.offset.storage.topic = mm2-offsets
us-west->us-east.emit.heartbeats.enabled = false

2) MirrorMaker2 실행

$ ./bin/connect-mirror-maker.sh mm2.properties

 

As a standalone Connect worker

이 모드에서는 Single Connect worker을 통해 MirrorSourceConnector을 돌릴 수 있습니다. 이 모드에서는 multinode cluster을 지원하지 않으며 매우 작은 workload 혹은 테스트시에 적합합니다.

 

1) worker configuration 생성

# worker.properties
bootstrap.servers = host2:9092

2) connector configuration 생성

# connector.properties
name = local-mirror-source
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
topics=.*
 
source.cluster.alias = upstream
source.cluster.bootstrap.servers = host1:9092
target.cluster.bootstrap.servers = host2:9092
 
# use ByteArrayConverter to ensure that in and out records are exactly the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

3) single connect worker 실행

$ ./bin/connecti-standalone.sh worker.properties connector.properties

 

As a Connector in a distributed Connect cluster

 

이 모드는 connect cluster가 존재할 경우, MirrorMaker connector를 설정하여 운영할 수 있습니다.

 

4개의 connector:

- MirrorSourceConnector
- MirrorSinkConnector
- MirrorCheckpointConnector
- MirrorHeartbeatConnector

 

Connect REST API를 통해 실행

PUT /connectors/us-west-source/config HTTP/1.1
 
{
    "name": "us-west-source",
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "us-west",
    "target.cluster.alias": "us-east",
    "source.cluster.bootstrap.servers": "us-west-host1:9091",
    "topics": ".*"
}

 

In legacy mode using existing MirrorMaker scripts

Leagacy MirrorMaker1은 deprecated되었지만, script를 실행하여 legacy mode로 실행할 수 있습니다.

$ ./bin/kafka-mirror-maker.sh --consumer consumer.properties --producer producer.properties

 

원글 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0

반응형