빅데이터/Kafka

카프카를 활용한레이싱카 센서 실시간 수집 데이터 파이프라인 구축

AndersonChoi 2022. 5. 2. 22:35

오늘은 카레이싱에서 자동차에서 출력되는 여러 지표들을 수집하는 데이터 파이프라인을 만들어 보겠습니다. F1, WRC를 비롯한 다양한 카레이싱 팀에서는 데이터의 수집과 활용이 우승으로 가는 지름길인 것은 이미 널리 알려져 있습니다. 2021 F1 의 월드 컨스트럭터 챔피언인 메르세데스-AMG 페트로나스 포뮬러 원팀은 데이터를 잘 활용하는 팀 중 하나입니다.

페트로나스 포뮬러팀에서 수집하는 F1 자동차의 센서는 5만개에 달하며 한경기당 300기가바이트, 모든 경기 동안에는 테라바이트에 달하는 데이터가 쌓이게 된다고 합니다. F1 경기는 총 22번 경기를 뛰게 되는데 자체 프랙티스 세션까지 합치면 몇백 테라바이트 이상의 데이터가 모이게 됩니다. 이렇게 모아진 데이터는 다음 경기를 위해 분석하고 경주용 F1 머신을 최적화하는데 사용합니다.

 

그렇다면 이 데이터는 어떻게 모을까요? 여러 방법이 있겠지만 대규모 실시간 데이터를 수집하는 방법으로 카프카를 활용하는 방법이 있습니다. 카프카는 대규모 데이터를 안정적으로 처리하고 운영하는데 적합합니다. 카프카를 활용한 데이터 파이프라인 아키텍처의 개략적인 모습은 아래와 같을 것입니다.

실제 센서가 달린 자동차가 없으므로 가상의 자동차로 센서 데이터를 모으기로 결정하였습니다. 저는 Xbox One S 기반 레이싱 휠과 페달을 가지고 있습니다. 그리고 제가 가장 좋아하는 게임 중 하나인 Forza Motorsports 7을 사용하면 레이싱카의 센서 데이터를 수집할 수 있습니다. 

보유 장비

- Xbox one S

- Fanatec CSL Elite

- Fanatec WRC Wheel

- Fanatec Elite Pedal

 

Forza Motorsports 7은 실제 카레이싱이 이루어지는 레이싱 장소들을 제현하고 상용 자동차들과 동일한 셋팅을 직접 몰아볼 수 있는 시뮬레이션 게임입니다. ABS, TCS(트랙션 컨트롤), STM(스태빌리티 컨트롤), 수동변속, 연료 소모, 타이어 마모 등 실제 자동차에서 사용되는 각종 장치들과 상황이 그대로 적용되는 특징이 있습니다. Forza Motorsports 7에서는자동차의 실시간 센서 데이터를 UDP로 특정 서버로 전송할 수 있습니다. UDP로 전송하는 데이터에는 Steer, accel, brake, yaw, turbo 등 차량의 상태를 확인할 수 있는 지표가 포함되어 있습니다. 이 데이터를 추출하고 카프카로 데이터를 보내면 데이터를 실시간 분석, 시각화, 저장할 수 있습니다. 여기서는 레이싱카의 실시간 데이터를 카프카에 적재하고 분석 및 시각화 용도로 엘라스틱서치에 저장하는 아키텍처로 파이프라인을 구성하였습니다. 마지막으로 시각화를 위해 그라파나를 엘라스틱서치와 연동하였습니다.

우선 UDP로 전달되는 데이터를 받기 위해 자바로 UDP 리시버를 만들고 카프카 프로듀서를 붙여 특정 토픽에 데이터를 지속적으로 전달하도록 하였습니다.

기본적으로 Forza Motorsports 7에서는 초당 30개의 데이터를 전송합니다. 분당 1800개의 데이터이고 기본적으로 20분~30분 가량 진행되는 게임 특성을 생각해보면 꽤 많은 양의 데이터가 전송되는 것을 알 수 있습니다. 여기서 프로듀서의 acks옵션은 0으로 설정하였습니다. acks를 0으로 설정할 경우 데이터 전달 신뢰성이 떨어지지만 대규모 데이터 전송시 유리합니다. 우리가 수집하고자 하는 레이싱카 센서 데이터에서 일부가 유실되더라도 대세에는 영향이 없습니다. 왜냐면 지속적으로 추가되는 데이터이고 초당 30개의 데이터중 1~2개가 빠지더라도 전체를 분석하는데는 이슈가 없기 때문입니다.

public class Main {
    private final static int UDP_PORT = 4843;
    private final static Gson gson = new Gson();
    private final static String TOPIC_NAME = "forza-motorsport-raw";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) throws Exception {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.ACKS_CONFIG, 0);
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        DatagramSocket ds = new DatagramSocket(UDP_PORT);
        while (true) {
            byte[] receive = new byte[323];
            DatagramPacket dp = new DatagramPacket(receive, receive.length);
            ds.receive(dp);
            ForzaData forzaData = new ForzaData(dp.getData());
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, gson.toJson(forzaData));
            producer.send(record);
        }
    }
}

4843포트로 UDP를 열고 데이터를 수집합니다. 수집된 데이터는 ForzaData 객체로 설정되고 해당 데이터는 Gson을 통해 JSON으로 만들어 forza-motorsport-raw 토픽으로 전송합니다. 토픽으로 전송된 데이터는 kafka-console-consumer로 데이터를 확인할 수 있습니다.

$./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forza-motorsport-raw --max-messages 1
{
    "readOK": false,
    "isRaceOn": false,
    "timeStampMS": 13228671,
    "engineMaxRpm": 0.0,
    "engineIdleRpm": 0.0,
    "currentEngineRpm": 1550.0,
    ...
    "accel": 30,
    "brake": 0,
    "clutch": 0,
    "handbrake": 0,
    "gear": 0,
    "steer": 0,
    "normalizedDrivingLine": 0,
    "normalizedAIBrakeDifference": 0
}

수집된 데이터는 시각화, 분석을 위해 엘라스틱서치로 저장합니다. 카프카에 저장된 데이터를 특정 데이터베이스에 저장하는 방법은 크게 3가지가 있습니다.

 

1) 카프카 컨슈머

2) 카프카 커넥트

3) 에이전트 애플리케이션(로그스태시, 텔레그래프, fluentd 등)

 

여기서는 로그스태시를 사용하여 카프카의 데이터를 엘라스틱서치에 저장하였습니다. 단발성 데이터 파이프라인을 만들기 때문에 가장 적용하기 쉬운 로그스태시를 썼지만 반복적으로 만들어야할 경우에는 카프카 커넥트를 사용하는 편이 더 낫습니다. 다음은 로그스태시 컨피그 파일입니다.

input {
    kafka {
        bootstrap_servers =>  "localhost:9092"
        topics => ["forza-motorsport-raw"]
        group_id => "logstash"
        codec => json
    }
}
output {
    elasticsearch {
        hosts => "http://localhost:9200"
        index => "forza-kafka"
        document_type => "_doc"
    }
}

이후 그라파나를 통해 엘라스틱서치에 forza-kafka라는 이름으로 인덱스를 조회하면 레이싱 데이터를 실시간으로 그라파나에서 조회할 수 있습니다. 

그라파나 대시보드에서 자동차 센서 데이터를 다음과 같이 실시간으로 조회할 수 있습니다.

그라파나로 조회할 수 있는 지표는 다음과 같습니다.

 

- Accel

- Brake

- Speed

- Acceleration XYZ

- Gear

- RPM

- Torque

 

이렇게 수집, 시각화한 데이터는 레이스를 완료하고 난 뒤 레이싱카, 레이서, 트랙에 대한 분석을 수행하는데 활용할 수 있습니다. 제가 테스트로 달린 자동차와 트랙은 다음과 같습니다.

 

- 2011 Porsche 911 GT3 RSR

- 스파 그랑코샹 서킷(벨기에 스파)

 

위 자동차로 트랙을 2바퀴 돈 결과는 다음과 같습니다.

(클릭하면 커짐)

엑셀, 브레이크에 대한 지점 그리고 속도를 시간대별로 확인할 수 있습니다. 각 섹션별로 어느 정도 시간이 걸렷는지 확인하고 브레이킹 포인트와 코너 탈출 지점을 파악하기에 좋아 보입니다.


여기까지 카프카를 활용하여레이싱카 센서 실시간 수집 데이터 파이프라인 구축을 진행했습니다. 간단한 테스트 목적으로 실시간 파이프라인을 구축하고 1개 토픽만 사용했지만 실전 환경에서는 분명 다를 것입니다. 다양한 토픽이 필요할 것이고 메시지 키를 사용하여 데이터를 구분하여 처리해야할 경우도 있을 것입니다. 또한 파티션 개수도 최소 50개 이상으로 운영해야만 하는 상황이 올 것이죠. 

 

아파치 카프카는 이런 상용 환경으로 전환하기 수월하게 설계되어 있습니다. 당장 다음주에! 제가 레이싱 팀을 이끌어야하고 레이싱카의 센서 데이터를 수집, 분석이 필요하다면 새로 데이터 플랫폼을 구축할 필요 없이 지금 만들어 놓은 파이프라인을 그대로 사용하면 됩니다. 데이터 양에 따라 파티션 개수를 늘려 처리량을 늘리고 파이프라인 개수가 늘어나면 커넥트를 도입하면 향후 몇년간은 이 파이프라인을 뜯어 고칠일은 거의 없을 것입니다.

 

제가 즐겨하는 취미 활동을 카프카와 접목시켜 발전시켜본 것은 처음인데요. 정말 즐겁고 재밌는 프로젝트였습니다. 여러분들도 취미와 개발을 통합시켜 발전해보는 시간을 가져보는 건 어떨까요?


이 프로젝트는 아래 링크들에서 영감을 받았습니다.

- https://www.confluent.io/en-gb/blog/taking-ksql-spin-using-real-time-device-data/

- https://github.com/Josua019/forza-dashboard/blob/main/Telemetry%20Server/FM7_packetformat.dat

- https://www.youtube.com/watch?v=XpLNK34tp2o 

- https://www.youtube.com/watch?v=dyhiqqOepLs 

 

이 프로젝트에 사용한 코드

- https://github.com/AndersonChoi/forza-telemetry-kafka-producer

 

GitHub - AndersonChoi/forza-telemetry-kafka-producer: forza-telemetry-kafka-producer

forza-telemetry-kafka-producer. Contribute to AndersonChoi/forza-telemetry-kafka-producer development by creating an account on GitHub.

github.com

 

반응형