본문 바로가기

빅데이터/Kafka

카프카 클러스터 클러스터ip DNS 연동방법. use_all_dns_ips 사용(in AWS, route53)

Kafka-client(consumer, producer)를 사용하기 위해서는 다양한 설정이 필요하지만 카프카 브로커와 통신하기 위해서는 bootstrap.servers 옵션은 반드시 필요한 옵션중 하나입니다.

Bootstrap.servers

이 옵션은 카프카 클러스터에 연결하기 위해 클라이언트가 사용하는 브로커들의 host:port 목록을 설정해야 합니다. 특이한점은 모든 브로커의 host와 port를 적지 않아도 된다는 점입니다. 왜냐면 최초로 연결된 하나의 broker의 host:port로 부터 통신을 위한 정보를 가져오기 때문입니다. 

Route 53에서 kafka cluster DNS 설정하기

route53은 aws에서 제공하는 DNS 웹서비스 입니다. aws route53에 이미 등록되어 있는 제 voidmainvoid.net host에 A유형으로 DNS를 multiple ip값으로 등록해 보았습니다. 

A유형으로 1개의 dns에 multiple IPv4를 등록할 수 있습니다.

등록이 완료되면 local pc에서 정상적으로 dns가 동작하는지 host 명령어로 확인합니다. 

$ host kafka.voidmainvoid.net
kafka.voidmainvoid.net has address 54.180.98.4
kafka.voidmainvoid.net has address 15.164.97.6
kafka.voidmainvoid.net has address 54.180.93.146

위와 같이 1dns에 multiple ip가 등록된 것을 확인할 수 있습니다.

Kafka-client bootstrap DNS 설정

이제 kafka-client의 bootstrap.servers 옵션에 dns를 넣어 정상동작하는지 테스트해보도록 하겠습니다. Producer는 console-producer를 사용하고 consumer application에 해당 옵션을 넣어 테스트 하였습니다.

public class Main {
    private static String TOPIC_NAME = "test_log";

    public static void main(String[] args) throws Exception {

        Properties configs = new Properties();
        configs.put("bootstrap.servers", "kafka.voidmainvoid.net:9092");
        configs.put("group.id", "test-consume");
        configs.put("client.dns.lookup", "use_all_dns_ips");
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

 

위 설정에서 중요한점은 client.dns.lookup 옵션을 지정해야한다는 점입니다. use_all_dns_ips 옵션을 지정함으로서 해당 bootstrap ip과 connect 시도시 return되는 multiple ip에 대해 모두 connect시도합니다. 

 

테스트 결과 정상적으로 Consumer application이 broker와 연결된 것을 확인할 수 있습니다.

[2020-03-20 15:36:05,737] INFO ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [kafka.voidmainvoid.net:9092]
    ...
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    ...
    ...
[2020-03-20 15:39:13,987] TRACE [Consumer clientId=consumer-test-consume-1, groupId=test-consume] Found least loaded node kafka.voidmainvoid.net:9092 (id: -1 rack: null) with no active connection (org.apache.kafka.clients.NetworkClient)
[2020-03-20 15:39:13,987] DEBUG [Consumer clientId=consumer-test-consume-1, groupId=test-consume] Sending FindCoordinator request to broker kafka.voidmainvoid.net:9092 (id: -1 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-20 15:39:14,165] DEBUG [Consumer clientId=consumer-test-consume-1, groupId=test-consume] Initiating connection to node kafka.voidmainvoid.net:9092 (id: -1 rack: null) using address kafka.voidmainvoid.net/15.164.97.6 (org.apache.kafka.clients.NetworkClient)
[2020-03-20 15:39:14,191] TRACE [Consumer clientId=consumer-test-consume-1, groupId=test-consume] Found least loaded connecting node kafka.voidmainvoid.net:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient)

use_all_dns_ips 옵션은 kafka-client 2.1.0 이후 버젼에서 사용가능합니다.

반응형