본문 바로가기

개발이야기/AWS

AWS MSK(Kafka) 실습 및 예제 코드(Java), 장단점, 가격

Amazon MSK(Managed Streaming for Apache Kafka)는 AWS에서 제공하는 완전 관리형 apache kafka 서비스입니다. 기존에 on-promise에서 사용하던 혹은 EC2로 관리하던 Apache kafka를 SaaS형태로 사용할 수 있습니다. Apache kafka의 특정 버젼을 그대로 사용할 수 있기 때문에 vanila apache kafka의 버젼별 api spec을 따라서 사용할 수 있습니다.

그럼 이번 포스트에서는 AWS MSK cluster를 직접 구성해보고, producer/consumer을 만들어 테스트해보겠습니다.

MSK Cluster 생성

Cluster을 생성하기 위해서 AWS CLI를 사용하거나 혹은 AWS console을 사용할 수도 있습니다.이번 포스트에서는 AWS console(web)을 사용하여 구성해보도록 하겠습니다. 

아래는 2019년 12월 26일 Seoul(ap-northeast-2)기준으로 진행하였습니다.

1) General

Cluster이름과 VPC, Apache kakfa버젼을 선택합니다. Cluster이름은 영어 64글자(특수문자 불가)로 구성가능합니다. MSK는 1.1.1, 2.2.1, 2.3.1 3가지 버젼 중에 취사선택할 수 있습니다. 이번에는 apache kafka 2.3.1로 설정하여 진행해 보았습니다.

 

MSK cluster를 생성하기 위해서는 VPC를 구성해야하는데요. 2)AZ 에서 broker의 Availability zone을 설정하기 위해 사용 됩니다. MSK용 VPC를 설정하기 위해 아래와 같은 test용 vpc와 subnet을 새로 생성하였습니다.

VPC생성
서브넷 생성

VPC:

- 이름 : MSK_TEST_VPC

- IPv4 CIDR : 10.10.0.0/16

 

Subnets:

- broker01 : MSK_TEST_VPC, ap-northeast-2a, 10.10.0.0/24(IPv4 CIDR 블록)

- broker02 : MSK_TEST_VPC, ap-northeast-2a, 10.10.1.0/24(IPv4 CIDR 블록)

- broker03 : MSK_TEST_VPC, ap-northeast-2a, 10.10.2.0/24(IPv4 CIDR 블록)

2) Availability Zones

Kafka broker의 가용성을 위해 broker가 설치될 Availability Zone을 설정해야합니다. 옵션으로 2개의 AZ 혹은 3개의 AZ로 구성할지 설정할 수 있는데, AWS에서는 3으로 설정하여 구성하는 것을 추천합니다. 1)에서 구성한 VPC와 Subnet을 기준으로 아래와 같이 MSK AZ를 구성하였습니다.

az 셋팅

이렇게 subnet을 설정하면 각 subnet마다 broker가 1개이상 할당되며, 1개의 AZ가 down되더라도 나머지 AZ의 broker들 이 살아 있으므로 마치 rack-awarness를 설정하는 듯한 효과를 볼 수 있습니다.

3) Configuration

이번 설정에서는 Kafka configuration을 설정할 수 있습니다. default설정으로 운영할 수도 있고 혹은 2.3.1 kafka에 해당하는 configuration을 조합하여 사용할 수도 있습니다. 이번에는 테스트용도이므로 따로 추가/변경 설정을 하지 않고 default configuration을 사용하도록 하였습니다.

kafka configuration setting

4) Brokers

이번 설정에서는 Kafka broker가 운영되는 broker의 instance(서버)의 스펙과 개수를 정할 수 있습니다.

 

MSK 지원 instance 종류:

- Kafka.m5.large

- Kafka.m5.xlarge

- Kafka.m5.2xlarge

- Kafka.m5.4xlarge

- Kafka.m5.12xlarge

- Kafka.m5.24xlarge

 

m5 인스턴스별 세부정보는 아래와 같습니다.

m5 instances

위 인스턴스 중 운영에 필요한 instance의 종류를 취사선택하여 고르시면 됩니다.

 

이번 포스트에서는 테스트를 위한 운영을 할것이므로 아래와 같이 설정하였습니다.

Choose Broker instance type

5) Tags - optional

Tag는 AWS resource를 구별, 운영하기 위해 사용됩니다. 운영시 필요하다면 tag를 붙이셔서 생성하시면 됩니다.

6) Storage

Kafka를 운영함에 있어 Storage의 중요성은 빼놓을 수 없습니다. Topic의 rention기간에 따라 storage설정을 진행하시면 됩니다. 최소 1GB에서 최대 16384GB까지 설정할 수 있습니다. 이 설정을 통해 각 broker는 Amazon EBS를 발급받게 됩니다.

Storage size

7) Encryption

MSK와 통신할때 TLS(Transport Layer Security) 프로토콜을 사용할 것인지에 대한 설정을 진행할 수 있습니다. 운영상황에 맞게 설정하시면 됩니다. 

Setup Encryption

8) Authentication

7)에서 Enable encryption within the cluster를 선택하셨다면 TLS client를 사용할것인지 여부를 선택할 수 있습니다. 7)에서 encryption에 대한 설정을 하지 않았으므로 해당 옵션은 자동으로 disable입니다.

9) Monitoring

CloudWatch를 통한 monitoring을 어디까지 진행할 것인지에 대한 설정입니다. 총 3가지의 MSK monitoring 구성을 선택할 수 있습니다.

 

3가지 MSK 모니터링 방법:

- Basic(무료) : Includes basic cluster-level and broker-level metrics. 

- Enhanced broker-level(추가금액) : Includes basic monitoring and enhanced broker-level metrics.

- Enhanced topic-level(추가금액) : Includes enhanced broker-level monitoring and enhanced topic-level metrics.

 

만약 위 모니터링 외에 추가로 external 모니터링을 위해 Prometheus를 붙일 수 있는데요. 만약 Prometheus를 통해 추가 모니터링을 하고 싶다면 해당 옵션을 선택하셔서 운영모니터링을 하는 것도 방법입니다.

Set Prometheus for monitoring

10) Advanced settings

이 옵션에서는 MSK에 대한 Security group설정을 수행할 수 있습니다. 운영상황에 따라 선택하시면 됩니다.

MSK 클러스터 발급 완료

위 MSK클러스터 셋팅을 차근차근 진행하셨다면 cluster가 20분 내에 설치, 운영할 수 있는 상태로 만들어 집니다.

MSK발급완료

이로서 AWS의 Fully managed Kafka를 사용할 수 있게 되었습니다. 또한 동시에 기본 cluster 모니터링을 aws console을 통해 확인할 수 있습니다.

MSK 기본 monitoring graph dashboard

Producer, Consumer 개발

Producer, Consumer와 통신하기 위해서는 broker 정보가 필요합니다. Bootstrap broker url 정보는 MSK manage page에서 확인 할 수 있습니다.

Cluster 정보 화면
MSK client

테스트를 위한 Producer, Consumer는 Java8기반의 appllication으로 진행하였습니다.

통신을 위한 라이브러리는 apache kafka client 2.3.1 library를 사용하였습니다. 

Gradle 설정

plugins {
    id 'java'
}

group 'kinesis'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'

}

Producer Code

public class Main {

    private static String TOPIC_NAME = "msk_test_topic";

    public static void main(String[] args) throws Exception{

        Properties configs = new Properties();
        configs.put("bootstrap.servers", "b-3.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:9092,b-2.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:9092,b-1.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:9092");
        configs.put("acks", "all");
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        for (int i = 0; i < 5; i++) {
            String message = "hello " + i;
            producer.send(new ProducerRecord<String, String>(TOPIC_NAME, message));
        }

        producer.flush();
        producer.close();
    }
}

key가 없는 message를 msk_test_topic 이름의 topic에 5번 넣는 코드입니다.

Consumer Code

public class Main {
    private static String TOPIC_NAME = "msk_test_topic";

    public static void main(String[] args) throws Exception{
        Properties configs = new Properties();
        configs.put("bootstrap.servers", "b-3.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:9092,b-2.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:9092,b-1.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:9092");
        configs.put("group.id", "test_group");
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

msk_test_topic에 message가 들어오면 출력하는 코드입니다.

결과

Local에서 상기 Java application(consumer, producer)을 실행시켰을때 정상동작하지 않았습니다. 혹시 zookeeper을 사용하여 topic생성이 안되서 동작이 안되는것인지 확인하기 위해 아래와 같이 shell을 날려보았지만 timeout이 발생하였습니다.

$ bin/kafka-topics --create --zookeeper z-2.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:2181,z-1.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:2181,z-3.mskwonyoung.ap69b5.c4.kafka.ap-northeast-2.amazonaws.com:2181 \
--replication-factor 3 --partitions 1 --topic msk_test_topic
[2019-12-26 15:18:27,017] WARN Client session timed out, have not heard from server in 10026ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
[2019-12-26 15:18:37,128] WARN Client session timed out, have not heard from server in 10004ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
[2019-12-26 15:18:47,234] WARN Client session timed out, have not heard from server in 10001ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
	at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:230)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
	at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:226)
	at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:95)
	at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1581)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)

알고보니 AWS MSK는 on-promise 혹은 local에서 direct로 접속이 불가하였습니다.

 

AWS MSK는 VPC zone내부에 존재하고 있으며 위 스크린샷에 나온 zookeeper, broker url는 vpc/subnet 내부에서 사용되는 private ip였습니다. 만약 kafka client를 사용하여 접속하고 싶다면 해당 VPC내부의 ec2를 발급받아서 접속해야만 합니다. 

 

위와 같이 AWS MSK를 테스트하기 위한 client machine을 생성하는 방법은 아래 url에 자세히 나와 있습니다. 만약 테스트를 진행하고 싶다면 아래 guide를 따라서 ec2를 발급받아서 사용해야만 합니다.

 

AWS Client Machine test guide : https://docs.aws.amazon.com/msk/latest/developerguide/create-client-machine.html

 

AWS MSK와 on-promise 혹은 local의 접속 불가에 관한 stackoverflow QNA

- You cannot access MSK directly from on-premise or local machine : https://bit.ly/37ffyH0

- You have to access your MSK cluster from a client machine on EC2, and cannot do so from a local machine : https://bit.ly/2MvbyKA

Amazon MSK 소감 및 의견

오늘 포스팅에서는 MSK cluster을 구성하고 local machine에서 통신하는 테스트까지 진행해 보았습니다. 이번 포스팅에서는 Local machine과 통신하지 못했는데 다음번에 다시 네트워크 구성을 시도하여 테스트 해보도록 하겠습니다.😭

 

MSK는 따로 EC2를 발급받고 AZ를 설정, Security group을 설정하거나 추가적인 monitoring셋팅을 할 필요없이 클릭 몇번으로 클러스터를 구성할 수 있기에 간편합니다. 또한 kafka운영시에 필요한 추가적은 고급 monitoring 기능들을 옵션으로 제공하기 때문에 운영자가 모니터링을 위한 아키텍쳐를 구성하는데 들이는 시간을 줄일 수 있습니다. 

 

다만, 비용이 다소 비싸고 정해진 instance를 사용해야만 합니다. 또한 특정 AWS MSK에서 지원하는 특정 MSK만 사용해야 하며 강제적으로 VPC/AZ에 broker들이 할당되기 때문에 이로 인한 제약사항도 많습니다. 만약 EC2에 직접 Apache kafka를 설치하고 클러스터를 구성하였다면 조금더 자유로운(?) 네트워크 형태로 구성하고 적당히 작은 EC2를 사용하여 작은 카프카 클러스터를 구축할 수도 있었을 겁니다.

 

그럼에도 불구하고 Amazon MSK는 강력합니다. 자동 복구 및 패치 기능을 통해 클러스터 상태를 지속적으로 모니터링하여 애플리케이션이 중단되지 않도록 자동으로 비정상 브로커를 교체합니다. 또한 zookeeper의 최신버전 패치도 MSK가 자동으로 수행합니다. 추가적인 AWS의 KMS(Key management Service) 혹은 TLS, ACL를 통한 암호화, 액세스제어 설정도 강력합니다. 만약 직접 운영을 위해 개발을 한다면 매우 까다로운 작업임에 틀림없습니다. MSK를 통해 클러스터의 브로커개수를 몇번의 명령어로 최대 100개까지 확장할 수도 있으며 스토리지도 적절히 조정할 수 있습니다.

 

그러므로 MSK를 사용할 이유는 충분해 보입니다. 비즈니스 요구사항에 따라 그리고 운영하고자 하는 목적에 따라 직접 ec2로 apache kafka를 운영할 것인지, 혹은 MSK를 사용할 것인지 선택하면 될것으로 보입니다.

 

추가정보

가격

Amazon MSK의 비용은 사용한 만큼 발생됩니다. 크게 3개의 요금으로 구성됩니다. 아래 요금표는 서울(region) 기준입니다.

 

- 브로커 인스턴스 요금 : 아래 표 참조

- 브로커 스토리지 요금 : GB-월 요금 = 0.114 USD

- 데이터 전송 요금 : 표준 AWS 데이터 전송요금으로 환산됩니다. 아래 표 참조

 

사전정보

- 19개 kafka.m5.12xlarge 활성

- MSK inbound = 85TB

- MSK outbound = 85TB

- 모든 데이터를 동일 region S3 적재

 

계산

- 6.195 USD per hour x 24시간 x 30일 x 19개 = 84,747 USD(브로커 인스턴스 요금)

- Retention date 3일이라고 가정했을 경우 하루 전송량 약 2.83TB > 3일간 저장용량 약 8.5TB > 1 broker당 10TB 가정

- 10,000GB(10TB) x 30일 x 24시간 = 7,200,000 GB per hour

- 7,200,000 GB per hour / 24 hour / 30 day = 10,000 x 0.114 USD = 1,140 USD(스토리지 요금)

- S3로 적재하므로 MSK outbound 비용 = 0 USD(outbound 전송요금)

- 85GB/month inbound = 표준 AWS 데이터 전송요금을 기준으로 인터넷 → EC2 비용은 무료 = 0 USD(inbound 전송요금)

 

- S3적재하는 Kafka consumer application의 instance = i3.4xlarge(16core, 122Gb) x 5 = 5,358 USD(Consumer 인스턴스 요금)
- 인터넷 → MSK 연동시 사용되는 was application의 instace 비용 고려 = c5.2xlarge(8core, 16GB) x 12 =  4,497 USD(Producer 인스턴스 요금)

 

- 총비용 = 84,747 USD(브로커 인스턴스 요금) + 1,140 USD(스토리지 요금) + 5,358 USD(Consumer 인스턴스 요금) + 4,497 USD(Producer 인스턴스 요금) = 95,742USD

반응형