본문 바로가기

개발이야기/AWS

AWS kinesis Data stream 실습 및 예제 코드(Java), 장단점, 가격

실시간으로 비디오 및 데이터 스트림을 손쉽게 수집, 처리 및 분석 솔루션

Amazon Kinesis를 사용하면 실시간 스트리밍 데이터를 손쉽게 수집, 처리 및 분석할 수 있으므로 적시에 통찰력을 확보하고 새로운 정보에 신속하게 대응할 수 있습니다.

Amazon Kinesis 기능

- Kinesis Video streams : 비디오 스트림을 캡처 처리 및 저장

- Kinesis Data streams : 데이터 스트림을 캡쳐, 처리 및 저장

- Kinesis Data firehose : 데이터스트림을 AWS데이터 스토어로 로드

- Kinesis Data analytics : SQL 또는 Java를 통해 스트림 데이터를 분석

 

이번 포스팅에서 주요하게 볼 서비스는 Data streams 입니다.

Data streams 아키텍처

Amazon Kinesis Data stream 시작

1. Kinesis리소스 선택하기

Kinesis를 사용하기 위해 리소스를 선택할 수 있습니다. 먼저 Data streaming 파이프라인 용도로 사용할 Data stream을 선택하여 스트림을 생성하고 적재해보겠습니다.  대규모 데이터 레코드 스트림을 실시간 수집, 처리 할 수 있습니다. KCL(Kinesis Client Library)를 사용하여 EC2 instance에서 데이터를 주거나 받을 수 있습니다. 또한 다른 AWS 제품에 데이터를 보낼 수도 있습니다. 또한 스트리밍 데이터의 용량에 따른 관리를 AWS에서 자체적으로 수행하기 때문에 추가적인 관리가 필요없다고 합니다.

2. 스트림 생성

Kinesis data stream을 만들기 위해서는 스트림 이름, 샤드개수를 설정해야 합니다. 스트림 이름은 영어 대문자, 소문자, 숫자, 밑줄, 하이픈 및 마침표로 이루어진 글자로 만들 수 있습니다.

 

샤드는 처리용량의 단위입니다. 각 샤드는 초당 최대 1MB의 record를 1,000개까지 수집할 수 있고, 최대 2MB까지 내보낼 수 있습니다. 처리량과 밀접한 영향이 있으며, 샤드 개수는 최대 200개까지 설정 할 수 있습니다. Kafka의 파티션과 비슷한 역할을 하는데, 샤드내에 들어간 record들은 순서를 보장할 수 있습니다.

 

샤드는 kafka 파티션과 비슷한 역할을 한다.

필요한 샤드 수는 해당 파이프라인에 들어오는 데이터의 양과 consumer개수에 따라 다르므로 AWS에서 제공하는 계산기에 따라 샤드 개수를 정하는 것을 추천드립니다. 만에하나 샤드개수를 잘못 기입하셨더라도 추후에 샤드개수를 수정할 수 있습니다.

 

3. 생성완료

생성이 완료되고 나면 아래와 같이 delivery stream이 생성된 것을 확인할 수 있습니다. Firehose 설정보다 훨씬 간단합니다.

4. Data를 Kinesis stream에 넣기

Data를 Kinesis stream의 데이터 수신 및 처리를 위해서는 아래와 같은 방식을 사용할 수 있습니다.

 

- Kinesis Client Library(KCL) / Kinesis Producer Library(KPL) : 데이터 Produce/Consume 또는 분석할 수 있도록 하는 라이브러리

- Kinesis Firehose : Kinesis stream으로 들어온 데이터를 Firehose를 통해 특정 destination(S3, Redshift 등)에 저장

- Kinesis Analytics : Kinesis stream으로 들어온 데이터를 분석

 

이번 시간에는 AWS SDK Java을 통해 json데이터를 Produce, Consume 해보겠습니다. Sample code는 Gradle을 통해 build하였고 Java 8을 기반으로 하였습니다. 이번에 구현할 때 사용한 library는 amazon-kinesis-client입니다.

 

먼저, 공통으로 사용된 dependency는 아래와 같습니다.

plugins {
    id 'java'
}

group 'kinesis'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.695'
    compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.13.0'

}

4-1. Consumer

package com.di;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.*;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class Main {

    static private String STREAM_NAME = "data_streaming_test";
    public static void main(String args[]) {

        BasicAWSCredentials awsCreds = new BasicAWSCredentials("accessKey",
                "secretKey");
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        clientBuilder.setRegion("ap-northeast-2"); // 서울 region
        clientBuilder.setCredentials(new AWSStaticCredentialsProvider(awsCreds));
        AmazonKinesis kinesisClient = clientBuilder.build();

        String shardIterator;
        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setStreamName(STREAM_NAME);
        getShardIteratorRequest.setShardId("shardId-000000000000");
        getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
        GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
        shardIterator = getShardIteratorResult.getShardIterator();

        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setLimit(25);
        getRecordsRequest.setShardIterator(shardIterator);
        GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest);
        List<Record> records = result.getRecords();
        for (Record r : records) {
            System.out.println(r.getSequenceNumber());
            System.out.println(r.getPartitionKey());
            byte[] bytes = r.getData().array();
            System.out.println(new String(bytes));
        }
    }
}

4-2. Producer

package com.di;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class Main {

    static private String STREAM_NAME = "data_streaming_test";
    public static void main(String args[]) {

        BasicAWSCredentials awsCreds = new BasicAWSCredentials("accessKey",
                "secretKey");
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        clientBuilder.setRegion("ap-northeast-2"); // 서울 region
        clientBuilder.setCredentials(new AWSStaticCredentialsProvider(awsCreds));
        AmazonKinesis kinesisClient = clientBuilder.build();

        // 100번 반복해서 전송
        for (int i = 0; i < 100; i++) {
            sendData(kinesisClient, i);
        }
    }

    static void sendData(AmazonKinesis kinesisClient, int count) {
        String myData = "{\"no\":" + count + "}\n"; // 보내려는 데이터
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(STREAM_NAME);
        List<PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(myData.getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry);
        }
        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
    }
}

5. 데이터 확인

4-2 producer를 먼저 실행하고, 4-1 consumer을 실행하여 data stream에 데이터가 정상적으로 in/out 되었는지 확인한다.

 

Producer log

> Task :Main.main()
Put Result{FailedRecordCount: 0,Records: 
[{SequenceNumber: 49602583503902801763159989595819342641782515596118720546,
ShardId: shardId-000000000002,}, {SequenceNumber: 
49602583503880501017961458972682642626788325751311564818,ShardId: shardId-000000000001,}, {SequenceNumber: 49602583503902801763159989595820551567602130225293426722,ShardId: shardId-000000000002,}, {SequenceNumber: 49602583503858200272762928349545942611794135906504409090,ShardId: shardId-000000000000,}, {SequenceNumber: 
...

Consumer log

> Task :Main.main()
49602583503858200272762928349545942611794135906504409090
partitionKey-3
{"no":0}

49602583503858200272762928349547151537613750535679115266
partitionKey-7
{"no":0}

49602583503858200272762928349558031869990282198251470850
partitionKey-3
{"no":1}

정상적으로 json이 produce 되었고 다시 consumer가 consume되었음을 확인할 수 있다.

 

이와 동시에 해당 Kinesis의 Stream 현황을 AWS console에서 모니터링 할 수 있다.

위와 같은 모니터링 페이지에서 확인 할 수 있는 항목은 아래와 같다.

레코드 가져오기(바이트) — 합계
GetRecords.Bytes

레코드 가져오기 반복자 기간(밀리초) — 최대
GetRecords.IteratorAgeMilliseconds

레코드 가져오기 지연 시간(밀리초) — 평균
GetRecords.Latency

레코드 가져오기(개수) — 합계
GetRecords.Records

레코드 가져오기 성공률(백분율) — 평균
GetRecords.Success

수신 데이터(바이트) — 합계
IncomingBytes

수신 데이터(개수) — 합계
IncomingRecords

레코드 넣기(바이트) — 합계
PutRecord.Bytes

레코드 넣기 지연 시간(밀리초) — 평균
PutRecord.Latency

레코드 넣기 성공률(백분율) — 평균
PutRecord.Success

레코드 넣기(바이트) — 합계
PutRecords.Bytes

레코드 넣기 지연 시간(밀리초) — 평균
PutRecords.Latency

레코드 넣기(개수) — 합계
PutRecords.Records

레코드 넣기 성공률(백분율) — 평균
PutRecords.Success

읽기 처리량 초과(개수/요청) — 평균
ReadProvisionedThroughputExceeded

쓰기 처리량 초과(개수/요청) — 평균
WriteProvisionedThroughputExceeded

 

Amazon Kinesis Data stream 소감 및 의견

오늘 포스팅에서는 Kinesis Data stream을 구현하여 Producer와 Consumer application을 만들어 보았습니다. 모든 과정이 순조로웠고 결과까지 확인하는데 몇시간 걸리지 않았습니다. 이렇게 구축한 파이프라인은 AWS에서 Full management하기 때문에 현업에서 운영으로 사용해도 무방할 정도라고 보셔도 좋습니다. 또한 기본적인 모니터링(data in/out)도 제공하고 있기 때문에 개발자가 운영하기 편리한 구조로 되어 있습니다.

 

On-promise에서 이와 같은 데이터 파이프랑인을 만드려면 서버도 다수 발급받아야하며 들어갈 운영 resource도 상당합니다. 당장 생각해보더라도 Kakfa broker(최소 3대 이상), Security이슈를 생각해야하고 partition개수 관리, OS/application upgrade 하는등 지속적으로 비용이 투입되게 됩니다.

 

Kinesis Data stream는 이와 같이 강력한 기능을 가지고 있으면서도 여러 AWS 제품과의 연동을 자유자제로 지원하기 때문에 상당히 사용하기 편리합니다. 

 

추가정보

가격

Kinesis Data stream은 사용량에 따라 지불하는 요금제를 사용합니다. 요금은 샤드시간, PUT 페이로드 단위 2가지를 합친 가격과 팬아웃, 데이터 보존기간 기반을 포함하여 측정됩니다.

 

- 샤드시간 : 샤드 개수에 따라 사용료 부과

- PUT 페이로드 단위(25KB) : Produce되는 레코드 단위당 요금

- 팬아웃 : 팬아웃을 사용하는 경우 Consumer-샤드 당 요금 + 데이터 GB 요금 발생

- 데이터 보존기간 : 24시간(default) 이상 저장할 경우 추가 요금 부담

 

사전정보

- 한달 85TB 사용

- 샤드 74개(Consumer 30개 가정)

- 보존시간 24시간 기준

 

사전정보에 따른 단위 변환

- 레코드수 : 초당 1600개

- 평균 레코드 크기 : 500kb

- Consumer 개수 : 30개

- 향상된 팬아웃 Consumer 개수 : 0개

 

계산

- 0.50MB x 1,600 record/sec = 800MB/sec

- 800MB/sec / 1MB shard capacity = 800 shards needed

- (800MB/sec x 30 consumer) / 2mb per sec per shard capacity = 12,000 shards needed

- 1,600 record/sec / 1000 factor for records = 1.60 shard needed for record

- Max (800 shards needed for ingress, 12000 shards needed for egress, 1.60 shards needed for records) = 12,000 

- 12,000 shard x 730 hour = 8,8760,000 shard hours/month

- 8,760,000 shard hours per month x 0.0185 USD = 162,060 USD (월간 샤드 비용)

 

- 0.60MB / 25 payload Unit factor ≒ 21PUT

- 21PUT x 1,600 per sec x 2628000 seconds in month = 88,300,800,000 PUT

- 88,300,800,000 PUT x 0.0000000204 USD = 1,801.34 USD (월간 PUT 페이로드 비용)

 

- 연장된 데이터 보존비용: 0  USD

- 향상된 팬아웃 소비자 샤드 시간 비용 : 0 USD

 

- 총 Kinesis 데이터 스트림 비용(monthly) : 163,861.34USD (190,576,473원)