본문 바로가기

빅데이터

Tranquility를 사용한 아파치 드루이드 실시간 데이터 적재

Tranquility는 아파치 드루이드에 실시간으로 데이터를 적재하기 위한 용도로 사용됩니다. 아파치 드루이드에 대한 정보는 아래 링크를 사용해주세요.

https://voidmainvoid.tistory.com/440

 

아파치 드루이드 소개 및 아키텍처

Apache Druid is a high performance real-time analytics database. 아파치 드루이드 소개 아파치 드루이드는 기존의 데이터 처리 및 쿼리에 대한 관념을 180도 바꿔주는 OLAP 데이터베이스 입니다. OLA..

blog.voidmainvoid.net

현재 버전(0.20.0)의 드루이드는 카프카와 같은 데이터 소스로 부터 데이터를 직접 적재(ingestion)할 수 있지만 이전 버전(0.8.0)에는 그렇지 않았습니다. 데이터를 적재하기 위해서는 Tranquility라고 불리는 프로젝트를 사용하여 드루이드에 적재해야 했습니다.

 

0.9.2 이후 버전에서는 카프카 또는 키네시스에 대해 드루이드 자체적으로 적재 지원을 하므로 Tranquility를 사용하지 않는 것이 좋습니다.

 

github : https://github.com/druid-io/tranquility

 

druid-io/tranquility

Tranquility helps you send real-time event streams to Druid and handles partitioning, replication, service discovery, and schema rollover, seamlessly and without downtime. - druid-io/tranquility

github.com

Tranquility는 다음과 같은 7가지 모듈을 제공하고 사용자는 필요에 따라 모듈을 선택하여 개발하면 됩니다.

 

- Core : 가장 간단한 데이터 전송 API, 자바 라이브러리를 사용해서 데이터를 전송할 수 있음

- Server : 드루이드로 데이터를 전송하는 HTTP 서버. 이 서버로 POST 메시지를 날리면 드루이드에 적재됨

- Samza : Samza SystemProducer에 Tranquility가 포함됨

- Spark : RDD와 DStream을 Tranquility로 사용할 수 있음

- Storm : Strom Bolt와 Trident State에 Tranquility가 포함됨

- Kafka : 카프카 토픽의 메시지를 Tranquility를 통해 드루이드에 적재 가능

- Flink : Flink Sink에 Tranquility가 포함됨

 

이 중 Core와 Kafka모듈에 대해 자세히 알아보자

Tranquility-core

코어 모듈은 JVM기반 애플리케이션에서 드루이드로 데이터를 전송할 때 유용합니다. 라이브러리로 드루이드로 데이터를 전송하는 API를 만들 수 있습니다. 라이브러리를 추가하고 구현할 수 있는 방법은 다음과 같습니다.

 

- Tranquilizer API : 고레벨 API로서 1개 메시지 단위로 데이터를 전송할 수 있으며 내부에서 배치형태로 드루이드에 전송

- Beam API : 배치형태로 데이터를 보내고 싶을때 사용. 직접 배치 조절 가능

 

Tranquilizer를 사용한 예제 코드

public class JavaExample
{
  private static final Logger log = new Logger(JavaExample.class);

  public static void main(String[] args)
  {
    // Read config from "example.json" on the classpath.
    final InputStream configStream = JavaExample.class.getClassLoader().getResourceAsStream("example.json");
    final TranquilityConfig<PropertiesBasedConfig> config = TranquilityConfig.read(configStream);
    final DataSourceConfig<PropertiesBasedConfig> wikipediaConfig = config.getDataSource("wikipedia");
    final Tranquilizer<Map<String, Object>> sender = DruidBeams.fromConfig(wikipediaConfig)
                                                               .buildTranquilizer(wikipediaConfig.tranquilizerBuilder());

    sender.start();

    try {
      // Send 10000 objects

      for (int i = 0; i < 10000; i++) {
        // Build a sample event to send; make sure we use a current date
        final Map<String, Object> obj = ImmutableMap.<String, Object>of(
            "timestamp", new DateTime().toString(),
            "page", "foo",
            "added", i
        );

        // Asynchronously send event to Druid:
        sender.send(obj).addEventListener(
            new FutureEventListener<BoxedUnit>()
            {
              @Override
              public void onSuccess(BoxedUnit value)
              {
                log.info("Sent message: %s", obj);
              }

              @Override
              public void onFailure(Throwable e)
              {
                if (e instanceof MessageDroppedException) {
                  log.warn(e, "Dropped message: %s", obj);
                } else {
                  log.error(e, "Failed to send message: %s", obj);
                }
              }
            }
        );
      }
    }
    finally {
      sender.flush();
      sender.stop();
    }
  }
}

 

Tranquility-kafka

카프카 모듈은 카프카의 데이터를 드루이드로 넣을 수 있는 툴입니다. 카프카의 파티션, 컨슈머 그룹을 사용해서 고 가용성을 지닌 애플리케이션으로 드루이드로 데이터를 넣을 수 있습니다.

 

카프카 모듈을 사용하기 위해서는 다음과 같이 Tranquility 설정 파일을 입력해야합니다.

 

다음은 반드시 설정해야하는 Tranquility 설정입니다.

- kafka.zookeeper.connect : 카프카와 연동되는 주키퍼 입력

- kafka.group.id : 컨슈머 그룹 아이디 

- consumer.numThreads : 컨슈머 스레드 개수 

- commit.periodMillis : 오프셋 오토 커밋 간격

- kafka.* : 카프카 컨슈머와 관련된 모든 옵션들(heartbeat, session timeout 등을 설정할 수 있음)

 

다음은 데이터 소스레벨 설정입니다.

- topicPattern : 정규식으로 지정한 카프카 토픽 이름

- topicPattern.priority : 만약 동일한 토픽이름이 여러개 있을 때 더 높은 우선순위를 정하기 위함

- useTopicAsDataSource : 토픽이름을 데이터 소스 이름으로 사용할 때 어떤 이름을 사용할 것인지 선택 

- reportDropAsExceptioons : 데이터 유실이 발생했을 경우 애플리케이션 종료 또는 에러 로그 출력 선택 

 

실행하는 방법은 다음과 같습니다.

$ bin/tranquility kafka -configFile conf/tranquility-kafka.json
{
   "dataSources" : [
      {
         "spec" : {
            "dataSchema" : {
               "parser" : {
                  "type" : "string",
                  "parseSpec" : {
                     "timestampSpec" : {
                        "format" : "auto",
                        "column" : "timestamp"
                     },
                     "dimensionsSpec" : {
                        "spatialDimensions" : [
                           {
                              "dims" : [
                                 "lat",
                                 "lon"
                              ],
                              "dimName" : "geo"
                           }
                        ],
                        "dimensions" : [
                           "text",
                           "hashtags",
                           "lat",
                           "lon",
                           "source",
                           "retweet",
                           "lang",
                           "utc_offset",
                           "screen_name",
                           "verified"
                        ]
                     },
                     "format" : "json"
                  }
               },
               "dataSource" : "twitter",
               "granularitySpec" : {
                  "segmentGranularity" : "hour",
                  "type" : "uniform",
                  "queryGranularity" : "none"
               },
               "metricsSpec" : [
                  {
                     "type" : "count",
                     "name" : "tweets"
                  },
                  {
                     "fieldName" : "followers",
                     "type" : "longSum",
                     "name" : "followers"
                  },
                  {
                     "name" : "retweets",
                     "type" : "longSum",
                     "fieldName" : "retweets"
                  },
                  {
                     "fieldName" : "friends",
                     "type" : "longSum",
                     "name" : "friends"
                  },
                  {
                     "name" : "statuses",
                     "type" : "longSum",
                     "fieldName" : "statuses"
                  }
               ]
            },
            "tuningConfig" : {
               "maxRowsInMemory" : "100000",
               "type" : "realtime",
               "windowPeriod" : "PT10M",
               "intermediatePersistPeriod" : "PT10M"
            }
         },
         "properties" : {
            "topicPattern.priority" : "1",
            "topicPattern" : "twitter"
         }
      },
      {
         "spec" : {
            "dataSchema" : {
               "granularitySpec" : {
                  "queryGranularity" : "none",
                  "type" : "uniform",
                  "segmentGranularity" : "hour"
               },
               "dataSource" : "wikipedia",
               "parser" : {
                  "type" : "string",
                  "parseSpec" : {
                     "timestampSpec" : {
                        "format" : "auto",
                        "column" : "timestamp"
                     },
                     "format" : "json",
                     "dimensionsSpec" : {
                        "dimensions" : [
                           "page",
                           "language",
                           "user",
                           "unpatrolled",
                           "newPage",
                           "robot",
                           "anonymous",
                           "namespace",
                           "continent",
                           "country",
                           "region",
                           "city"
                        ]
                     }
                  }
               },
               "metricsSpec" : [
                  {
                     "type" : "count",
                     "name" : "count"
                  },
                  {
                     "type" : "doubleSum",
                     "name" : "added",
                     "fieldName" : "added"
                  },
                  {
                     "name" : "deleted",
                     "type" : "doubleSum",
                     "fieldName" : "deleted"
                  },
                  {
                     "name" : "delta",
                     "type" : "doubleSum",
                     "fieldName" : "delta"
                  }
               ]
            },
            "tuningConfig" : {
               "type" : "realtime",
               "intermediatePersistPeriod" : "PT10M",
               "windowPeriod" : "PT10M",
               "maxRowsInMemory" : 75000
            }
         },
         "properties" : {
            "task.partitions" : "2",
            "task.replicants" : "2",
            "topicPattern" : "wikipedia.*",
            "topicPattern.priority" : "1"
         }
      }
   ],
   "properties" : {
       "zookeeper.connect" : "localhost:2181",
       "zookeeper.timeout" : "PT20S",
       "druid.selectors.indexing.serviceName" : "druid/overlord",
       "druid.discovery.curator.path" : "/druid/discovery",
       "kafka.zookeeper.connect" : "localhost:2181",
       "kafka.group.id" : "tranquility-kafka",
       "consumer.numThreads" : "2",
       "commit.periodMillis" : "15000",
       "reportDropsAsExceptions" : "false"
    }
}

 

카프카 모듈 배포 방법

카프카 모듈은 카프카 컨슈머가 동작하는 방식과 동일합니다. 그렇기 때문에 토픽개수만큼 여러개의 Tranquility 카프카 모듈을 실행함으로서 확장성을 가지고 고 가용성의 드루이드 입수 파이프라인을 운영할 수 있습니다. 만약 설정의 업데이트가 필요하다면 한번에 하나의 인스턴스씩 업그레이드해서 다운타임없이 데이터를 지속적으로 처리할 수 있습니다.(리밸런싱이 발생할때 일부 지연이 발생할수는 있음)

반응형