confluent-kafka-go는 컨플루언트에서 개발하고 유지보수하는 golang기반 카프카 클라이언트입니다.
github.com/confluentinc/confluent-kafka-go
위 라이브러리를 사용해서 컨슈머 5가지 패턴을 구현해보겠습니다.
컨슈머 구현 사전 작업 코드(컨슈머 초기화)
이 작업은 컨슈머를 이용하기 위한 가장 기본적인 작업입니다. 기본 옵션은 enable.auto.commit이 true임을 생각해야합니다. 수동 커밋을 사용할 경우 해당 옵션을 false로 두고 사용해야 합니다.
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"os"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"test"}, nil)
//컨슈머 구현 코드가 들어감
c.Close()
}
1. ReadMessage로 구현
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
ReadMessage는 polling으로 가져온 컨슈머의 데이터를 메시지단위(레코드단위)로 처리하도록 하는 간단한 구문입니다. 내부 구현체는 사실상 poll()메서드를 호출하는 구조로 되어 있는 것을 확인할 수 있습니다.
2. poll()로 구현
run := true
for run == true {
ev := c.Poll(1000)
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
자바 카프카 컨슈머에서 흔히 보던 스타일입니다. 오토커밋일때 사용할 수 있습니다. poll()을 통해 가져온 데이터는 1개 메시지씩 처리하게 됩니다.
3. poll()과 동기커밋
run := true
msg_count := 0
for run == true {
ev := c.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
msg_count += 1
c.Commit()
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
enable.auto.commit이 false일 경우 commit을 위해 Commit()메서드를 호출합니다.
4. poll()과 비동기 커밋
run := true
msg_count := 0
for run == true {
ev := c.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
msg_count += 1
go func() {
offsets, err := c.Commit()
if err != nil {
fmt.Printf("%% Error %v\n", err)
os.Exit(1)
}else
fmt.Printf("%% Record %v\n", offsets)
}()
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
비동기 커밋을 사용할 경우 Commit()에 대한 리턴 값을 go func()으로 받음으로서 비동기 처리를 할 수 있습니다.
참고 링크
컨플루언트 go kafka : docs.confluent.io/clients-confluent-kafka-go/current/overview.html
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 스트림즈 All stream threads have died. 오류 해결 방안 (0) | 2021.05.27 |
---|---|
ProducerRecord에 파티션 번호를 지정하면 어떻게 동작할까? (0) | 2021.05.05 |
카프카를 이벤트 소싱, CQRS로 사용할 수 있을까? (2) | 2021.05.03 |
레디슈 큐(queue), 레디스 스트림(streams), 레디스 펍섭(pub/sub) 그리고 카프카와 비교 (0) | 2021.04.25 |
카프카 스트림즈의 commit.interval.ms옵션 (2) | 2021.04.15 |
초~중급자를 위한 [아파치 카프카 애플리케이션 개발] 서적을 출간하였습니다. (12) | 2021.04.11 |