본문 바로가기

빅데이터/Kafka

confluent-kafka-go 컨슈머를 구현하는 5가지 방법

confluent-kafka-go는 컨플루언트에서 개발하고 유지보수하는 golang기반 카프카 클라이언트입니다.

github.com/confluentinc/confluent-kafka-go

 

confluentinc/confluent-kafka-go

Confluent's Apache Kafka Golang client. Contribute to confluentinc/confluent-kafka-go development by creating an account on GitHub.

github.com

위 라이브러리를 사용해서 컨슈머 5가지 패턴을 구현해보겠습니다.

 

컨슈머 구현 사전 작업 코드(컨슈머 초기화)

이 작업은 컨슈머를 이용하기 위한 가장 기본적인 작업입니다. 기본 옵션은 enable.auto.commit이 true임을 생각해야합니다. 수동 커밋을 사용할 경우 해당 옵션을 false로 두고 사용해야 합니다.

package main

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
	"os"
)

func main() {

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
		"group.id":          "test-group",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{"test"}, nil)
	
    //컨슈머 구현 코드가 들어감

	c.Close()
}

 

1. ReadMessage로 구현

for {
	msg, err := c.ReadMessage(-1)
	if err == nil {
		fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
	} else {
		// The client will automatically try to recover from all errors.
		fmt.Printf("Consumer error: %v (%v)\n", err, msg)
	}
}

ReadMessage는 polling으로 가져온 컨슈머의 데이터를 메시지단위(레코드단위)로 처리하도록 하는 간단한 구문입니다. 내부 구현체는 사실상 poll()메서드를 호출하는 구조로 되어 있는 것을 확인할 수 있습니다.

2. poll()로 구현

run := true
for run == true {
	ev := c.Poll(1000)
	switch e := ev.(type) {
	case *kafka.Message:
		fmt.Printf("%% Message on %s:\n%s\n",
			e.TopicPartition, string(e.Value))
	case kafka.PartitionEOF:
		fmt.Printf("%% Reached %v\n", e)
	case kafka.Error:
		fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
		run = false
	default:
		fmt.Printf("Ignored %v\n", e)
	}
}

자바 카프카 컨슈머에서 흔히 보던 스타일입니다. 오토커밋일때 사용할 수 있습니다. poll()을 통해 가져온 데이터는 1개 메시지씩 처리하게 됩니다.

3. poll()과 동기커밋

run := true
msg_count := 0
for run == true {
	ev := c.Poll(0)
	switch e := ev.(type) {
	case *kafka.Message:
		msg_count += 1
		c.Commit()
		fmt.Printf("%% Message on %s:\n%s\n",
			e.TopicPartition, string(e.Value))
	case kafka.PartitionEOF:
		fmt.Printf("%% Reached %v\n", e)
	case kafka.Error:
		fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
		run = false
	default:
		fmt.Printf("Ignored %v\n", e)
	}
}

enable.auto.commit이 false일 경우 commit을 위해 Commit()메서드를 호출합니다.

4. poll()과 비동기 커밋

run := true
msg_count := 0
for run == true {
	ev := c.Poll(0)
	switch e := ev.(type) {
	case *kafka.Message:
		msg_count += 1
		go func() {
			offsets, err := c.Commit()
			if err != nil {
				fmt.Printf("%% Error %v\n", err)
				os.Exit(1)
			}else
				fmt.Printf("%% Record %v\n", offsets)
		}()
		fmt.Printf("%% Message on %s:\n%s\n",
			e.TopicPartition, string(e.Value))
	case kafka.PartitionEOF:
		fmt.Printf("%% Reached %v\n", e)
	case kafka.Error:
		fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
		run = false
	default:
		fmt.Printf("Ignored %v\n", e)
	}
}

비동기 커밋을 사용할 경우 Commit()에 대한 리턴 값을 go func()으로 받음으로서 비동기 처리를 할 수 있습니다.

 

 

참고 링크

컨플루언트 go kafka : docs.confluent.io/clients-confluent-kafka-go/current/overview.html

반응형