본문 바로가기

빅데이터/cassandra

Datastax의 Cassandra Sink Connector(JSON field) 적재 설정

https://docs.datastax.com/en/kafka/doc/kafka/kafkaStringJson.html

 

Mapping a message that contains both basic and JSON fields

When the data format for the Kafka key or value is JSON, individual fields of that JSON structure can be specified in the connector mapping. When the data format for the message key or value is JSON, the connector mapping can include individual fields in t

docs.datastax.com

다음과 같은 레코드의 메시지 키/값이 있다고 가정하자.

key value
APPLE {"symbol":"APPL",
"value":208,
"exchange":"NASDAQ",
"industry":"TECH",
"ts":"2018-11-26T19:26:27.483"}
EXXON MOBIL {"symbol":"M",
"value":80,
"exchange":"NYSE",
"industry":"ENERGY",
"ts":"2018-11-26T19:26:27.483"}
GENERAL MOTORS {"symbol":"GM",
"value":38,
"exchange":"NYSE",
"industry":"AUTO",
"ts":"2018-11-26T19:26:27.483"}
AT&T {"symbol":"AT&T",
"value":33,
"exchange":"NYSE",
"industry":"TELECOM",
"ts":"2018-11-26T19:26:27.483"}
FORD MOTOR {"symbol":"F",
"value":10,
"exchange":"NYSE",
"industry":"AUTO",
"ts":"2018-11-26T19:26:27.483"}

상기 데이터를 테이블에 저장할 때 필요한 것

데이터 적재시 아래 2개의 컬럼을 만족하도록 한다.

- 각 카프카에서 사용하는 JSON의 필드는 테이블 컬럼의 데이터 타입과 동일하게 적용

- 각 카프카에서 사용하는 JSON의 필드 중 하나는 PK 컬럼에 매핑됨. null은 허용되지 않음.

 

Distributed connect에 적용 방법

1. connect-distributed.properties에 key.converter과 value.converter정상 적용 확인.

2-a. 키스페이스 생성

$ cqlsh -e "CREATE KEYSPACE stocks_keyspace \
WITH replication = {'class': 'NetworkTopologyStrategy',\
'Cassandra': 1};"

토폴로지 설정과 데이터센터 이름같은 것은 각 환경에 따라 다르게 설정한다.

 

2-b. 테이블 생성

stocks_table 이라는 이름의 테이블 을 생성한다.

cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
  symbol text, \
  ts timestamp, \
  exchange text, \
  industry text, \
  name text, \
  value double, \
  PRIMARY KEY (symbol, ts));"

symbol, ts, exchange, industry, name, value를 컬럼으로 설정. symbol과 ts를 pk로 묶어서 생성

 

3-a. 토픽 생성

파이프라인으로 사용할 토픽을 생성합니다. 여기서는 stocks_topic 이라는 이름의 토픽을 생성하여 활용합니다.

 

3-b. prefix를 포함한 토픽 테이블 매핑 확인

다음 신텍스가 기본 설정으로 매핑됩니다.

topic.topic_name.keyspace_name.table_name

 

3-c. field column 매핑 확인

레코드의 메시지 값을 기반으로 매핑하기 위해 value.symbol을 호출하면 레코드의 메시지 값에서 json 데이터 중 symbol의 값을 파싱하여 가져온다.

 

만약 레코드의 헤더의 값을 가져오고 싶다면 header.f4와 같이 수행하여 파싱 할 수 있다.

https://docs.datastax.com/en/kafka/doc/kafka/kafkaRecordHeaderToTable.html

 

Extract Kafka record header values

Extract values from Kafka record header and write to the database table. In the Kafka topic mapping, you can extract values from the record's header by using header.header-field-name, and write the values to a supported database table: DataStax Astra cloud

docs.datastax.com

 

 

4. cassandra sinnk connector 설정

분산 커넥트에 설정할 경우 json으로 설정

{
  "name": "stocks-sink",
  "config": {
    "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
    "tasks.max": "1",
    "topics": "stocks_topic",
    "topic.stocks_topic.stocks_keyspace.stocks_table.mapping": 
    "symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value"
  }
}

단일 커넥트에 설정할 경우 key/value로 설정

name=stocks-sink
connector.class=com.datastax.kafkaconnector.DseSinkConnector
tasks.max=1
topics=stocks_topic
topic.stocks_topic.stocks_keyspace.stocks_table.mapping = symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value

 

5. cassandra 데이터 전송 및 적재된 데이터 확인 

$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stocks_topic \
 --property "parse.key=true" --property "key.separator=:"
 >APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:26:27.483"}
>EXXON MOBIL:{"symbol":"M","value":80,"exchange":"NYSE","industry":"ENERGY","ts":"2018-11-26T19:26:27.483"}
>GENERAL MOTORS:{"symbol":"GM","value":38,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}
>AT&T:{"symbol":"AT&T","value":33,"exchange":"NYSE","industry":"TELECOM","ts":"2018-11-26T19:26:27.483"}
>FORD MOTOR:{"symbol":"F","value":10,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}

Table 데이터 확인

cqlsh> select * from stocks_table;

 symbol | ts                              | exchange | industry | name           | value
--------+---------------------------------+----------+----------+----------------+-------
      M | 2018-11-26 19:26:27.483000+0000 |     NYSE |   ENERGY |    EXXON MOBIL |    80
   APPL | 2018-11-26 19:26:27.483000+0000 |   NASDAQ |     TECH |          APPLE |   208
      F | 2018-11-26 19:26:27.483000+0000 |     NYSE |     AUTO |     FORD MOTOR |    10
   AT&T | 2018-11-26 19:26:27.483000+0000 |     NYSE |  TELECOM |           AT&T |    33
     GM | 2018-11-26 19:26:27.483000+0000 |     NYSE |     AUTO | GENERAL MOTORS |    38

(5 rows)
cqlsh> select * from stocks_table where symbol='APPL';

 symbol | ts                              | exchange | industry | name  | value
--------+---------------------------------+----------+----------+-------+-------
   APPL | 2018-11-26 19:26:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208

(1 rows)
cqlsh> select * from stocks_table where exchange='NASDAQ';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"

당연하게도 PK가 걸리지 않은 컬럼에 대해서는 where  조건이 걸리지 않는다.

 

데이터 추가 및 확인

$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stocks_topic \
 --property "parse.key=true" --property "key.separator=:"
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:27:27.483"}
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:28:27.483"}
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:29:27.483"}
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:30:27.483"}
cqlsh> select * from stocks_table where symbol='APPL' order by ts desc;

 symbol | ts                              | exchange | industry | name  | value
--------+---------------------------------+----------+----------+-------+-------
   APPL | 2018-11-26 19:30:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208
   APPL | 2018-11-26 19:29:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208
   APPL | 2018-11-26 19:28:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208
   APPL | 2018-11-26 19:27:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208
   APPL | 2018-11-26 19:26:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208

(5 rows)
cqlsh> select * from stocks_table where symbol='APPL' and ts = '2018-11-26 19:27:27.483000+0000';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Unable to coerce '2018-11-26 19:27:27.483000+0000' to a formatted date (long)"
cqlsh:cory> select * from stocks_table where symbol='APPL' and ts = '2018-11-26 19:27:27.483';

 symbol | ts                              | exchange | industry | name  | value
--------+---------------------------------+----------+----------+-------+-------
   APPL | 2018-11-26 19:27:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208

(1 rows)
cqlsh> select * from stocks_table where symbol='APPL' and ts > '2018-11-26 19:27:27.483';

 symbol | ts                              | exchange | industry | name  | value
--------+---------------------------------+----------+----------+-------+-------
   APPL | 2018-11-26 19:28:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208
   APPL | 2018-11-26 19:29:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208
   APPL | 2018-11-26 19:30:27.483000+0000 |   NASDAQ |     TECH | APPLE |   208

(3 rows)

2018-11-26 19:27:27.483000+0000로 조회가 되지 않는 이유 > Cassandra timestamp types only support milliseconds

https://stackoverflow.com/a/36454277/9634545

 

 

 

 

반응형