Tranquility는 아파치 드루이드에 실시간으로 데이터를 적재하기 위한 용도로 사용됩니다. 아파치 드루이드에 대한 정보는 아래 링크를 사용해주세요.
https://voidmainvoid.tistory.com/440
현재 버전(0.20.0)의 드루이드는 카프카와 같은 데이터 소스로 부터 데이터를 직접 적재(ingestion)할 수 있지만 이전 버전(0.8.0)에는 그렇지 않았습니다. 데이터를 적재하기 위해서는 Tranquility라고 불리는 프로젝트를 사용하여 드루이드에 적재해야 했습니다.
0.9.2 이후 버전에서는 카프카 또는 키네시스에 대해 드루이드 자체적으로 적재 지원을 하므로 Tranquility를 사용하지 않는 것이 좋습니다.
github : https://github.com/druid-io/tranquility
Tranquility는 다음과 같은 7가지 모듈을 제공하고 사용자는 필요에 따라 모듈을 선택하여 개발하면 됩니다.
- Core : 가장 간단한 데이터 전송 API, 자바 라이브러리를 사용해서 데이터를 전송할 수 있음
- Server : 드루이드로 데이터를 전송하는 HTTP 서버. 이 서버로 POST 메시지를 날리면 드루이드에 적재됨
- Samza : Samza SystemProducer에 Tranquility가 포함됨
- Spark : RDD와 DStream을 Tranquility로 사용할 수 있음
- Storm : Strom Bolt와 Trident State에 Tranquility가 포함됨
- Kafka : 카프카 토픽의 메시지를 Tranquility를 통해 드루이드에 적재 가능
- Flink : Flink Sink에 Tranquility가 포함됨
이 중 Core와 Kafka모듈에 대해 자세히 알아보자
Tranquility-core
코어 모듈은 JVM기반 애플리케이션에서 드루이드로 데이터를 전송할 때 유용합니다. 라이브러리로 드루이드로 데이터를 전송하는 API를 만들 수 있습니다. 라이브러리를 추가하고 구현할 수 있는 방법은 다음과 같습니다.
- Tranquilizer API : 고레벨 API로서 1개 메시지 단위로 데이터를 전송할 수 있으며 내부에서 배치형태로 드루이드에 전송
- Beam API : 배치형태로 데이터를 보내고 싶을때 사용. 직접 배치 조절 가능
Tranquilizer를 사용한 예제 코드
public class JavaExample
{
private static final Logger log = new Logger(JavaExample.class);
public static void main(String[] args)
{
// Read config from "example.json" on the classpath.
final InputStream configStream = JavaExample.class.getClassLoader().getResourceAsStream("example.json");
final TranquilityConfig<PropertiesBasedConfig> config = TranquilityConfig.read(configStream);
final DataSourceConfig<PropertiesBasedConfig> wikipediaConfig = config.getDataSource("wikipedia");
final Tranquilizer<Map<String, Object>> sender = DruidBeams.fromConfig(wikipediaConfig)
.buildTranquilizer(wikipediaConfig.tranquilizerBuilder());
sender.start();
try {
// Send 10000 objects
for (int i = 0; i < 10000; i++) {
// Build a sample event to send; make sure we use a current date
final Map<String, Object> obj = ImmutableMap.<String, Object>of(
"timestamp", new DateTime().toString(),
"page", "foo",
"added", i
);
// Asynchronously send event to Druid:
sender.send(obj).addEventListener(
new FutureEventListener<BoxedUnit>()
{
@Override
public void onSuccess(BoxedUnit value)
{
log.info("Sent message: %s", obj);
}
@Override
public void onFailure(Throwable e)
{
if (e instanceof MessageDroppedException) {
log.warn(e, "Dropped message: %s", obj);
} else {
log.error(e, "Failed to send message: %s", obj);
}
}
}
);
}
}
finally {
sender.flush();
sender.stop();
}
}
}
Tranquility-kafka
카프카 모듈은 카프카의 데이터를 드루이드로 넣을 수 있는 툴입니다. 카프카의 파티션, 컨슈머 그룹을 사용해서 고 가용성을 지닌 애플리케이션으로 드루이드로 데이터를 넣을 수 있습니다.
카프카 모듈을 사용하기 위해서는 다음과 같이 Tranquility 설정 파일을 입력해야합니다.
다음은 반드시 설정해야하는 Tranquility 설정입니다.
- kafka.zookeeper.connect : 카프카와 연동되는 주키퍼 입력
- kafka.group.id : 컨슈머 그룹 아이디
- consumer.numThreads : 컨슈머 스레드 개수
- commit.periodMillis : 오프셋 오토 커밋 간격
- kafka.* : 카프카 컨슈머와 관련된 모든 옵션들(heartbeat, session timeout 등을 설정할 수 있음)
다음은 데이터 소스레벨 설정입니다.
- topicPattern : 정규식으로 지정한 카프카 토픽 이름
- topicPattern.priority : 만약 동일한 토픽이름이 여러개 있을 때 더 높은 우선순위를 정하기 위함
- useTopicAsDataSource : 토픽이름을 데이터 소스 이름으로 사용할 때 어떤 이름을 사용할 것인지 선택
- reportDropAsExceptioons : 데이터 유실이 발생했을 경우 애플리케이션 종료 또는 에러 로그 출력 선택
실행하는 방법은 다음과 같습니다.
$ bin/tranquility kafka -configFile conf/tranquility-kafka.json
{
"dataSources" : [
{
"spec" : {
"dataSchema" : {
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"format" : "auto",
"column" : "timestamp"
},
"dimensionsSpec" : {
"spatialDimensions" : [
{
"dims" : [
"lat",
"lon"
],
"dimName" : "geo"
}
],
"dimensions" : [
"text",
"hashtags",
"lat",
"lon",
"source",
"retweet",
"lang",
"utc_offset",
"screen_name",
"verified"
]
},
"format" : "json"
}
},
"dataSource" : "twitter",
"granularitySpec" : {
"segmentGranularity" : "hour",
"type" : "uniform",
"queryGranularity" : "none"
},
"metricsSpec" : [
{
"type" : "count",
"name" : "tweets"
},
{
"fieldName" : "followers",
"type" : "longSum",
"name" : "followers"
},
{
"name" : "retweets",
"type" : "longSum",
"fieldName" : "retweets"
},
{
"fieldName" : "friends",
"type" : "longSum",
"name" : "friends"
},
{
"name" : "statuses",
"type" : "longSum",
"fieldName" : "statuses"
}
]
},
"tuningConfig" : {
"maxRowsInMemory" : "100000",
"type" : "realtime",
"windowPeriod" : "PT10M",
"intermediatePersistPeriod" : "PT10M"
}
},
"properties" : {
"topicPattern.priority" : "1",
"topicPattern" : "twitter"
}
},
{
"spec" : {
"dataSchema" : {
"granularitySpec" : {
"queryGranularity" : "none",
"type" : "uniform",
"segmentGranularity" : "hour"
},
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"format" : "auto",
"column" : "timestamp"
},
"format" : "json",
"dimensionsSpec" : {
"dimensions" : [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
}
}
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"name" : "deleted",
"type" : "doubleSum",
"fieldName" : "deleted"
},
{
"name" : "delta",
"type" : "doubleSum",
"fieldName" : "delta"
}
]
},
"tuningConfig" : {
"type" : "realtime",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M",
"maxRowsInMemory" : 75000
}
},
"properties" : {
"task.partitions" : "2",
"task.replicants" : "2",
"topicPattern" : "wikipedia.*",
"topicPattern.priority" : "1"
}
}
],
"properties" : {
"zookeeper.connect" : "localhost:2181",
"zookeeper.timeout" : "PT20S",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"druid.discovery.curator.path" : "/druid/discovery",
"kafka.zookeeper.connect" : "localhost:2181",
"kafka.group.id" : "tranquility-kafka",
"consumer.numThreads" : "2",
"commit.periodMillis" : "15000",
"reportDropsAsExceptions" : "false"
}
}
카프카 모듈 배포 방법
카프카 모듈은 카프카 컨슈머가 동작하는 방식과 동일합니다. 그렇기 때문에 토픽개수만큼 여러개의 Tranquility 카프카 모듈을 실행함으로서 확장성을 가지고 고 가용성의 드루이드 입수 파이프라인을 운영할 수 있습니다. 만약 설정의 업데이트가 필요하다면 한번에 하나의 인스턴스씩 업그레이드해서 다운타임없이 데이터를 지속적으로 처리할 수 있습니다.(리밸런싱이 발생할때 일부 지연이 발생할수는 있음)
'빅데이터' 카테고리의 다른 글
프로메테우스 promQL에서 without 또는 by 사용시 주의사항 (0) | 2021.07.01 |
---|---|
prometheus 자바 클라이언트로 지표 수집하기 (2) | 2021.06.29 |
아파치 드루이드 tranquility로 데이터 추가시 MessageDroppedException 이슈 (0) | 2021.06.24 |
아파치 드루이드 소개 및 아키텍처 (0) | 2021.06.10 |
프로메테우스, 그라파나 사용시 레이블 값 추출, Legend 선택, 여러 variable을 포함하는 쿼리 작성. (0) | 2021.05.04 |
alpine telegraf 도커 생성 (0) | 2021.04.22 |