본문 바로가기

빅데이터/Kafka

kafka exactly-once delivery를 지원하기 위한 transaction

파이프라인에서 exactly-once처리는 매우 중요합니다. 데이터 파이프라인, 마이크로서비스 파이프라인 구분할것 없이 모든 부분에서 필요한데, 카프카는 0.11.0.0 이후 버젼에 대해서 exactly-once transaction 처리를 지원합니다. 기존에는 적어도 한번 이상 처리할 수 있는 at-least-once를 지원했었는 것에 반해 엄청난 발전이라고 볼 수 있습니다.

 

카프카 트랜잭션을 사용하기 위한 조건은 브로커가 0.11.0.0 이후 버젼이여야하고, 클라이언트도 0.11.0.0 이후 버젼이여야 사용할 수 있습니다. 만약 클라이언트 또는 브로커 중 한개라도 0.11.0.0 보다 낮은 버젼이면 사용할 수 없습니다.

 

여기서 말하는 exactly-once delivery는 프로듀서부터 컨슈머까지 연결되는 파이프라인의 처리를 뜻합니다. 기존 프로듀서의 경우 트랜잭션처리를 하지 않으면 카프카 클러스터에 두번이상 데이터가 저장될 수 있습니다. 데이터가 클러스터에 저장되었으나 ack가 유실되어 프로듀서가 재처리하는 경우가 대표적입니다. 결과적으로 카프카 트랜잭션 처리를 하더라도 컨슈머가 중복해서 데이터 처리하는 것에 대해 보장하지 않으므로, 컨슈머의 중복처리는 따로 로직을 작성해야합니다.

 

카프카 트랜잭션 적용

카프카 트랜잭션은 프로듀서와 컨슈머단에서 옵션을 설정해야 합니다. 상세 코드는 아래와 같습니다.

프로듀서 코드

public class ProducerWithTransaction {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "kafka:9092";
    private final static String TRANSACTION_ID_CONFIG = UUID.randomUUID().toString(); # 트랜잭션 처리하는 프로듀서 구분자

    public static void main(String[] args) throws Exception{

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID_CONFIG); # 트랜잭션 구분자 설정
        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); # 트랜잭션 처리 설정

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "data");

        producer.initTransactions(); # 트랜잭션 준비
        producer.beginTransaction(); # 트랜잭션 시작

        producer.send(record);
        
        producer.flush();

        producer.commitTransaction(); # 트랜잭션 커밋
        producer.close();
    }
}

 

프로듀서는 브로커로 데이터를 전송할 때 트랜잭션을 커밋하는 주체입니다. 프로듀서는 트랜잭션 처리를 하기 위해 반드시 TRANSACTIONAL_ID_CONFIG와 ENABLE_IDEMPOTENCE_CONFIG 설정을 해야합니다.

- TRANSACTIONAL_ID_CONFIG : 프로듀서 트랜잭션을 구분하기 위한 ID(중복방지를 위해 UUID 사용)
- ENABLE_IDEMPOTENCE_CONFIG : 트랜잭션 처리를 한다고 명시적으로 설정

최초에 initTransactions() 메서드로 트랜젝션처리를 위한 준비를 하고 send() 메서드 앞뒤로 beginTransaction(), commitTransaction() 메서드를 선언함으로서 트랜젝션을 마칩니다. 만약 트랜잭션을 commit하지 않고 프로듀서를 close하면 close구문에서 자동으로 commit하고 종료하게 됩니다. 만약 프로듀서가 close하지 않고 애플리케이션이 종료되면 일정 시간(transaction.timeout.ms) 이후에 abort 처리됩니다. abort처리된 데이터는 트랜젝션옵션이 활성화된 컨슈머가 가져가지 못합니다.

프로듀서가 데이터를 보내면 아래와 같이 데이터가 브로커에 적재되는 것을 스크립트를 통해 확인할 수 있습니다.

$ echo "exclude.internal.topics=false" > consumer.config
$ ./kafka-console-consumer.sh --consumer.config consumer.config --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --bootstrap-server localhost:9092 --topic __transaction_state --from-beginning
13c2df10-1a3c-4024-b28d-d155e24b941a::TransactionMetadata(transactionalId=13c2df10-1a3c-4024-b28d-d155e24b941a, producerId=1000, producerEpoch=28, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(test-1), txnStartTimestamp=1594967312702, txnLastUpdateTimestamp=1594967312702)
13c2df10-1a3c-4024-b28d-d155e24b941a::TransactionMetadata(transactionalId=13c2df10-1a3c-4024-b28d-d155e24b941a, producerId=1000, producerEpoch=28, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(test-1), txnStartTimestamp=1594967312702, txnLastUpdateTimestamp=1594967312712)
13c2df10-1a3c-4024-b28d-d155e24b941a::TransactionMetadata(transactionalId=13c2df10-1a3c-4024-b28d-d155e24b941a, producerId=1000, producerEpoch=28, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1594967312702, txnLastUpdateTimestamp=1594967312713)

__transaction_state는 프로듀서가 보내는 데이터의 트랜젝션을 기록하는데 사용합니다. 이 데이터는 트랜젝션이 활성화된 컨슈머가 데이터를 가져가는데 사용합니다. 위 데이터를 보면 Ongoing -> PrepareCommit -> CompleteCommit 순서대로 트랜잭션이 순차적으로 진행된 모습을 확인할 수 있습니다. 이렇게 트랜잭션이 완료된 데이터만 컨슈머가 가져갈 수 있습니다.

트랜젝션을 활성화할 경우 프로듀서는 기존에 사용하던 토픽에 추가적으로 데이터를 넣습니다. 상세 내용을 살펴보겠습니다.

$ ./kafka-dump-log.sh --files /home/ec2-user/kafka_2.12-2.5.0/data/test-1/00000000000000000000.log --deep-iteration
baseOffset: 16 lastOffset: 16 count: 1 baseSequence: 0 lastSequence: 0 producerId: 1000 producerEpoch: 21 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 1680 CreateTime: 1594965813907 size: 122 magic: 2 compresscodec: NONE crc: 3909138376 isvalid: true
| offset: 16 CreateTime: 1594965813907 keysize: -1 valuesize: 54 sequence: 0 headerKeys: []
baseOffset: 17 lastOffset: 17 count: 1 baseSequence: -1 lastSequence: -1 producerId: 1000 producerEpoch: 21 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 1802 CreateTime: 1594965813942 size: 78 magic: 2 compresscodec: NONE crc: 3102183917 isvalid: true
| offset: 17 CreateTime: 1594965813942 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0

위 스크립트를 통해 카프카 브로커에 적재된 세그먼트 파일을 확인할 수 있습니다. test토픽의 1번 파티션에 저장된 데이터를 확인했는데, 16번 오프셋과 17번 오프셋이 작성된 것을 알 수 있습니다. 

16번 오프셋 - isTransactional: true, isControl: false
17번 오프셋 - isTransactional: true, isControl: true,  endTxnMarker: COMMIT

즉, 프로듀서가 트랜젝션을 끝내면 해당 토픽에 COMMIT메시지를 명시적으로 전달합니다. 이로 인해 추가 오프셋이 소모된 것을 확인할 수 있습니다. 컨슈머는 이 토픽에서 데이터를 가져가더라도 16번 오프셋만 가져가고 17번 오프셋은 무시합니다. 17번 오프셋은 COMMIT을 명시하는 데이터이고 실질적인 프로듀서가 전송하는 값은 16번에만 존재하기 때문입니다.

컨슈머코드

public class ConsumerWithSyncCommit {
    private final static Logger logger = LoggerFactory.getLogger(ConsumerWithSyncCommit.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "kafka:9092";
    private final static String GROUP_ID = "test-group";
    private final static String TRANSACTION_CONFIG = "read_committed"; # 트랜잭션 설정 커밋된 데이터만 읽는다.

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); # 명시적 오프셋 커밋
        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, TRANSACTION_CONFIG); # 트랜잭션 설정

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) 
                logger.info("{}", record.toString());
            }
            consumer.commitSync();
        }
    }
}

컨슈머에서는 오토 커밋을 false로 설정하고 ISOLATION_LEVEL_CONFIG를 read_committed로 설정해야만 합니다. 이를 통해 프로듀서가 브로커로 보낸 데이터 중 트랜잭션이 완벽하게 완료된 데이터에 대해서만 읽을 수 있는 것입니다. 위 컨슈머 실행결과는 아래와 같습니다.

[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [ec2-kafka:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
...
[main] INFO com.example.ConsumerWithSyncCommit - ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 16, CreateTime = 1594970489649, serialized key size = -1, serialized value size = 54, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value =data)

16번 오프셋의 데이터만 읽은것을 확인할 수 있습니다. 

결론

카프카 트랜잭션은 프로듀서부터 시작하여 컨슈머까지 데이터가 전달될 때 exactly-once를 지원하기 위한 옵션입니다. 이 옵션은 만능이 아닙니다. 결과적으로 컨슈머단에서 중복이 생길 수 있기 때문입니다. 다만 프로듀서에서 시작하여 컨슈머까지 가는 파이프라인에 대해서는 이 옵션을 적용하여 exactly-once 전달을 보장할 수 있게 되었습니다. 이 옵션은 transaction관련 내부 브로커처리를 하기 때문에 옵션을 사용하지 않는 것보다 성능이 다소 떨어질 수 있다는 점을 주의하여야 합니다.

만약 컨슈머가 적재하는 저장소가 유니크키를 지원하지 않는 저장소라면 이 옵션을 프로듀서와 컨슈머에 적용하는 것을 추천드립니다. 만약 적재하는 저장소가 유니크키를 지원하는 파이프라인이라면 at-least-once를 통해 컨슈머가 중복해서 데이터를 처리하더라도 최종 적재되는 데이터가 중복되지 않으므로 굳이 위 옵션을 적용할 필요는 없어보입니다.

데이터 처리에 있어 마이크로 서비스 파이프라인과 같이 프로듀서가 중복해서 데이터를 보내는 것을 방지할때 이 옵션을 반드시 사용해야할것입니다.

 

관련 링크

https://www.confluent.io/blog/transactions-apache-kafka/

 

Transactions in Apache Kafka | Confluent

Learn the main concepts needed to use the transaction API in Apache Kafka effectively.

www.confluent.io

https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/

 

Exactly-once Semantics is Possible: Here's How Apache Kafka Does it

Exactly-once is a hard problem to solve, but we've done it. Available now in Apache Kafka 0.11, exactly-once semantics.

www.confluent.io

https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka

 

Transactional Messaging in Kafka - Apache Kafka - Apache Software Foundation

Kafka provides at-least-once messaging guarantees. Duplicates can arise due to either producer retries or consumer restarts after failure. One way to provide exactly-once messaging semantics is to implement an idempotent producer. This has been covered at

cwiki.apache.org

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

 

KIP-98 - Exactly Once Delivery and Transactional Messaging - Apache Kafka - Apache Software Foundation

[This KIP proposal is a joint work between Jason Gustafson, Flavio Paiva Junqueira,  Apurva Mehta, Sriram, and Guozhang Wang] Status Current state: Adopted Discussion thread: http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+DISCUSS+KIP+98+Exact

cwiki.apache.org

 

  • 좋은 글이네요ㅎㅎ
    제가 알기로 producer에서 트랜잭션 처리 없이 exactly-once만 지원하려면 transactional.id 없이 enable.idempotence=ture만 설정하면 됩니다.

    • 좋은 정보 감사합니다!
      --
      If no TransactionalId is specified in the configuration, a fresh PID is assigned, and the producer only enjoys idempotent semantics and transactional semantics within a single session.

  • 잘 읽었습니다.

    궁금한 것이 2가지가 있는데요 우선 네트워크 오류로 인해 한 broker가 down이 되어서 ack을 브로커에서 프로듀서로 보내지 못해서 프로듀서가 retry를 실행 할 때, produce request ID를 확인해서 동일한 메세지를 commit하지 못하도록 방지한다고 배웠는데 이를 실행하는 주체는 Producer인가요 아니면 카프카 브로커 인가요?

    또한 프로듀서는 브로커로 데이터를 전송할 때 트랜잭션을 커밋하는 주체라고 하셨는데, 제가 알기로는 메세지를 제대로 브로커가 받고 leader 파티션과 follower 파티션이 해당 메세지를 commit하는 것으로 알고 있는데 트랜잭션 commit과 메세지 commit이 차이가 있는 것인지 아니면 제가 제대로 알고 있지 못한 것인지 궁금합니다.

    ++ 전 최근에 tistory 블로그를 시작하게 되었는데 혹시 콘솔 스크립트에 뜨는 log하고 쉘 스크립트의 명령어들을 원영님처럼 예쁘게 블로그에 업로드 할 수 있는 방법이 있을까요??? 코드 블럭을 쓰면 된다고는 했는데 어떻게 사용해야 할 지 감이 안옵니다. OracleDB 쿼리하고 Elasticsearch 쿼리도 업로드 하고 싶은데 조언을 구하고자 합니다.

    • 안녕하세요 쁘룸쁘룸 님, 문의주신내용 답변드리겠습니다.

      Q: produce request ID를 확인해서 동일한 메세지를 commit하지 못하도록 방지한다고 배웠는데 이를 실행하는 주체는 Producer인가요 아니면 카프카 브로커 인가요?
      A: ack를 보내는 주체는 브로커이므로 브로커가 PID와 SEQ를 확인해서 ACK(중복)이라고 명시적으로 프로듀서에 알립니다.

      Q:메세지를 제대로 브로커가 받고 leader 파티션과 follower 파티션이 해당 메세지를 commit하는 것으로 알고 있는데 트랜잭션 commit과 메세지 commit이 차이가 있는 것인지?
      A: 말씀하신 부분은 ACK를 뜻하는것 같습니다. 멀티 파티션으로 레코드를 전송하고 동시에 처리됨을 원할 때 Transactional producer를 사용하는 것입니다. ACK와 transaction은 별개입니다. 또한 transactional producer에서 ack는 all이 기본 옵션으로 사용해야합니다. MSA환경에서 유용하게 사용할 수 있는데, 다수의 토픽에 데이터가 동시에 들어감을 명시적으로 선언할 때 사용할 수 있습니다.

      Q: log하고 쉘 스크립트의 명령어들을 원영님처럼 예쁘게 블로그에 업로드 할 수 있는 방법이 있을까요???
      A: 저는 기본 티스토리 코드를 사용하고 css를 커스텀해서 꾸며서 사용하고 있습니다. 아래 블로그 글에서 상세 내용을 확인하실 수 있습니다.
      https://blog.voidmainvoid.net/166

      댓글달아주셔서 감사합니다.

  • 언젠가 Kafka exactly-once 알아봐야지 하고 있었는데 정리 감사합니다.
    트랜잭션과 멱등성을 어떻게 처리하는지 궁금해졌네요. ;)