본문 바로가기

빅데이터/Kafka

Kafka opensource 분석을 통한 replication assignment 로직 확인


Replication과 partition 개념은 Apache Kafka에서 fault tolerance 동작을 위한 핵심개념이다. 각 broker에 어떻게 partition들이 replication 되는지 github에 올라가 있는 Kafka의 opensource를 분석하여 알아 보자.

Partition과 Replication

# Partition : 어떤 토픽에 대해서 producer/consumer가 병렬처리방식으로 분산저장되는 단위

# Replication : 높은 가용성(High Availibility)을 얻기 위해 각각의 partition을 각기 다른 브로커에 복제하는 역할


<그림. 하나의 topic에 대해서 3개의 partition 단위에 분산저장되는 경우>


Kafka에서 Topic을 생성하게 되면, 설정 한 partition, replication의 각각 개수에 따라 partition과 각 pratition의 replication들이 broker들에 저장된다. 


Kafka Opensource 분석

Topic에 대한 partition이 저장되는 broker 위치를 정하는 로직은 assignReplicasToBrokers()를 통해 수행된다.


Kafka - AdminUtils.scala(129line 바로가기)

129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
  // 각 브로커에 replication을 assign
  def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
                              nPartitions: Int,
                              replicationFactor: Int,
                              fixedStartIndex: Int = -1,
                              startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
    if (nPartitions <= 0) // partition이 0 이하라면 exception
      throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
    if (replicationFactor <= 0) // replcation 이 0 이하라면 exception
      throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
    if (replicationFactor > brokerMetadatas.size) // replication이 broker 개수보다 많으면 exception
      throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
    if (brokerMetadatas.forall(_.rack.isEmpty)) // rack information이 없는 경우
      assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
        startPartitionId)
    else {
      if (brokerMetadatas.exists(_.rack.isEmpty)) // broker들 중에 rack information이 빠져있을 때 exception
        throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
      assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
        startPartitionId)
    }
  }
cs


위 코드를 통해, partition, replication, broker 간에 몇 가지 특징을 살펴 볼 수 있다.


    1. partition과 replication은 각각 1이상 
      → 1 topic이 존재하는 조건

    2. replication 개수 > broker 개수 == false
      → broker개수와 replication 개수가 같을 수는 있음
      → 그러나, replication 개수가 broker 개수보다 크면 한 broker에 두개 이상 동일한 replication topic이 들어가므로 불가능

    3. broker들은 모두 rack정보를 들고 있거나 혹은 모두 들고있지 않아야 한다.
      → Rack정보를 포함한 broker는 기존 replication assignment와 다르게 동작
      → 예를 들어 aws의 multi region instance에서 rack정보를 분산저장하면 한개의 region이 내려가더라도 fault tolerance하게 동작 가능
      → Rack을 통한 replication assignment는 0.10.0 version에서 추가되었고, 관련 정보는 [KAFKA-1215]ticket에서 확인 가능하다.

replication assignment(no rack awareness)는 아래와 assignReplicasToBrokerRackUnaware()를 사용하여 분산처리된다.


Kafka - AdminUtils.scala(151line 바로가기)

151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
  private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                 replicationFactor: Int,
                                                 brokerList: Seq[Int],
                                                 fixedStartIndex: Int,
                                                 startPartitionId: Int): Map[Int, Seq[Int]] = {
    val ret = mutable.Map[Int, Seq[Int]]()
    val brokerArray = brokerList.toArray
    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    var currentPartitionId = math.max(0, startPartitionId)
    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
        nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
      for (j <- 0 until replicationFactor - 1)
        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1
    }
    ret
  }
cs


assignReplicasToBrokerRackUnaware가 어떻게 동작하는지는 test code중 testReplicaAssignment()를 통해 간단히 확인 할 수 있다.


Kafka - AdminRackAwareTest.scala(198line 바로가기)

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
  @Test
  def testReplicaAssignment() {
    val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
  // 5개의 기본 broker 생성
    val expectedAssignment = Map(
        0 -> List(0, 1, 2), // 처음에는 replication이 broker따라 순차적으로(++1) 생성
        1 -> List(1, 2, 3),
        2 -> List(2, 3, 4),
        3 -> List(3, 4, 0),
        4 -> List(4, 0, 1), // 마지막 broker가 될때 까지 partition이 순차적으로 생성
        5 -> List(0, 2, 3), // 한바퀴 돌면 첫째, 둘째 replication 거리가 2만큼 멀어짐
        6 -> List(1, 3, 4),
        7 -> List(2, 4, 0),
        8 -> List(3, 0, 1),
        9 -> List(4, 1, 2))
  // 10개 partition, 3개 replication 생성일 때
    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0)
    assertEquals(expectedAssignment, actualAssignment)
  }
cs


 → Topic의 partition은 broker를 한바퀴 순환할때까지 replication이 순차적으로 생성됨(0번 ~ 4번 partition에서 확인 가능)

 → 한바퀴 순환하고 나면 다음 partition은 첫째 replication과 둘째 replication이 2만큼 shift(5번 ~ 9번 partition에서 확인 가능)

  n바퀴 순환하고 나면 다음 partition은 첫째 replication과 둘째 replication이 (n+1)만큼 shift


결론

Kafka contributor의 comment(바로가기)에 따르면 Replica assignment의 goal은 아래와 같다.(Rack unawareness일 경우)

Fault tolerance 설계를 위해 고민한 흔적이 보이는 comment 이다.


1. Spread the replicas evenly among brokers. 

2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.

3. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.

4. Assign the remaining replicas of each partition with an increasing shift.

의견

Kafka는 유지보수 적인 관점에 대해 신경을 많이 쓴 것으로 보인다. 대규모 시스템을 운영하다 보면 node의 shutdown은 확률적으로 일어 날 수 있으므로 이를 대비하는 것은 당연하다. 이번 Posting을 통해 개인적으로 부담스럽게 느껴지던 opensource에 대한 진입장벽이 많이 허물어지게 되는 계기가 되었다. 동시에 Kafka에 대한 기본개념을 잡는데 많은 도움이 되었다.