Kafka-client(consumer, producer)를 사용하기 위해서는 다양한 설정이 필요하지만 카프카 브로커와 통신하기 위해서는 bootstrap.servers 옵션은 반드시 필요한 옵션중 하나입니다.
Bootstrap.servers
이 옵션은 카프카 클러스터에 연결하기 위해 클라이언트가 사용하는 브로커들의 host:port 목록을 설정해야 합니다. 특이한점은 모든 브로커의 host와 port를 적지 않아도 된다는 점입니다. 왜냐면 최초로 연결된 하나의 broker의 host:port로 부터 통신을 위한 정보를 가져오기 때문입니다.
Route 53에서 kafka cluster DNS 설정하기
route53은 aws에서 제공하는 DNS 웹서비스 입니다. aws route53에 이미 등록되어 있는 제 voidmainvoid.net host에 A유형으로 DNS를 multiple ip값으로 등록해 보았습니다.
등록이 완료되면 local pc에서 정상적으로 dns가 동작하는지 host 명령어로 확인합니다.
$ host kafka.voidmainvoid.net
kafka.voidmainvoid.net has address 54.180.98.4
kafka.voidmainvoid.net has address 15.164.97.6
kafka.voidmainvoid.net has address 54.180.93.146
위와 같이 1dns에 multiple ip가 등록된 것을 확인할 수 있습니다.
Kafka-client bootstrap DNS 설정
이제 kafka-client의 bootstrap.servers 옵션에 dns를 넣어 정상동작하는지 테스트해보도록 하겠습니다. Producer는 console-producer를 사용하고 consumer application에 해당 옵션을 넣어 테스트 하였습니다.
public class Main {
private static String TOPIC_NAME = "test_log";
public static void main(String[] args) throws Exception {
Properties configs = new Properties();
configs.put("bootstrap.servers", "kafka.voidmainvoid.net:9092");
configs.put("group.id", "test-consume");
configs.put("client.dns.lookup", "use_all_dns_ips");
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
위 설정에서 중요한점은 client.dns.lookup 옵션을 지정해야한다는 점입니다. use_all_dns_ips 옵션을 지정함으로서 해당 bootstrap ip과 connect 시도시 return되는 multiple ip에 대해 모두 connect시도합니다.
테스트 결과 정상적으로 Consumer application이 broker와 연결된 것을 확인할 수 있습니다.
[2020-03-20 15:36:05,737] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka.voidmainvoid.net:9092]
...
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
...
...
[2020-03-20 15:39:13,987] TRACE [Consumer clientId=consumer-test-consume-1, groupId=test-consume] Found least loaded node kafka.voidmainvoid.net:9092 (id: -1 rack: null) with no active connection (org.apache.kafka.clients.NetworkClient)
[2020-03-20 15:39:13,987] DEBUG [Consumer clientId=consumer-test-consume-1, groupId=test-consume] Sending FindCoordinator request to broker kafka.voidmainvoid.net:9092 (id: -1 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-20 15:39:14,165] DEBUG [Consumer clientId=consumer-test-consume-1, groupId=test-consume] Initiating connection to node kafka.voidmainvoid.net:9092 (id: -1 rack: null) using address kafka.voidmainvoid.net/15.164.97.6 (org.apache.kafka.clients.NetworkClient)
[2020-03-20 15:39:14,191] TRACE [Consumer clientId=consumer-test-consume-1, groupId=test-consume] Found least loaded connecting node kafka.voidmainvoid.net:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient)
use_all_dns_ips 옵션은 kafka-client 2.1.0 이후 버젼에서 사용가능합니다.
'빅데이터 > Kafka' 카테고리의 다른 글
아파치 카프카 입문과 활용 강의자료 슬라이드 (2) | 2020.06.23 |
---|---|
자바 멀티스레드 카프카 컨슈머 애플리케이션 구현 코드 (0) | 2020.06.09 |
Kafka-client client.dns.lookup 옵션 정리 (0) | 2020.04.13 |
AWS에 카프카 클러스터 설치하기(ec2, 3 brokers) (1) | 2020.03.18 |
카프카 버로우 = consumer lag 모니터링 오픈소스 애플리케이션 (0) | 2020.03.07 |
카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala) (0) | 2020.02.24 |