본문 바로가기

빅데이터

스트림 프로세싱 with Faust - kafka consumer/producer

Faust는 python기반 스트림프로세싱을 위한 library입니다. 아래 코드는 kafka로부터 데이터를 가지고 오는 code입니다.

Producer

from dataclasses import asdict, dataclass
import json

import faust # 모듈 import

@dataclass # 데이터 직렬화를 위한 json dataclass 선언 for faust
class ClickEventSanitized(faust.Record):
    timestamp: str
    uri: str
    number: int


app = faust.App("exercise3", broker="kafka://localhost:9092") # 카프카 브로커
clickevents_topic = app.topic("com.udacity.streams.clickevents", # 전송하고자 하는 topic이름
  value_type=ClickEventSanitized)

sanitized_topic = app.topic(
    "com.udacity.streams.clickevents.sanitized",
    key_type=str,
    value_type=ClickEventSanitized,
)

@app.agent(clickevents_topic)
async def clickevent(clickevents):
    async for clickevent in clickevents:
        sanitized = ClickEventSanitized(
            timestamp=clickevent.timestamp,
            uri=clickevent.uri,
            number=clickevent.number
        )
        await sanitized_topic.send(key=sanitized.uri, value=sanitized) # 전송!

if __name__ == "__main__":
    app.main()

Producer는 json type의 데이터를 'com.udacity.streams.clickevents.sanitized' 라는 topic에 데이터를 전송합니다.

 

Consumer

from dataclasses import asdict, dataclass
import json

import faust

@dataclass
class ClickEvent(faust.Record):
    email: str
    timestamp: str
    uri: str
    number: int

app = faust.App("sample2", broker="kafka://localhost:9092")

clickevents_topic = app.topic(
    "com.udacity.streams.clickevents", # 전송 받고자 하는 topic이름
    key_type=str,
    value_type=ClickEvent,
)

@app.agent(clickevents_topic) # 전달받기 with ClickEvent dataclass(역직렬화)
async def clickevent(clickevents):
    async for ce in clickevents:
        print(json.dumps(asdict(ce), indent=2))

if __name__ == "__main__":
    app.main()

Consumer는 'com.udacity.streams.clickevents' 토픽으로부터 json type의 데이터를 가져옵니다.

 

Filtering with Consumer & Producer

Learn more or give us feedback
from dataclasses import asdict, dataclass
import json

import faust


@dataclass
class ClickEvent(faust.Record):
    email: str
    timestamp: str
    uri: str
    number: int


app = faust.App("exercise4", broker="kafka://localhost:9092")
clickevents_topic = app.topic("com.udacity.streams.clickevents", value_type=ClickEvent)
popular_uris_topic = app.topic(
    "com.udacity.streams.clickevents.popular",
    key_type=str,
    value_type=ClickEvent,
)

@app.agent(clickevents_topic)
async def clickevent(clickevents):
    # clickevents 중 number가 100보다 크거나 같은 경우에 
    # com.udacity.streams.clickevents.popular topic에 전달
    async for clickevent in clickevents.filter(lambda x: x.number >= 100):
        await popular_uris_topic.send(key=clickevent.uri, value=clickevent)

if __name__ == "__main__":
    app.main()

'com.udacity.streams.clickevents' topic 중 number가 100보다 크거나 같으면 'com.udacity.streams.clickevents.popular' 로 보내는 역할을 합니다. consume뒤 일정 범위의 데이터를 filtering하여 produce하는 코드입니다.