본문 바로가기

빅데이터/Kafka

kafka consumer와 seekToBeginning를 활용하여 offset reset하기

카프카 컨슈머 클라이언트는 seekToBeginning 함수가 있습니다. 이 함수를 사용하면 특정 파티션의 오프셋을 최소 레코드로 지정할 수 있습니다.

/**
 * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the
 * first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called.
 * If no partitions are provided, seek to the first offset for all of the currently assigned partitions.
 *
 * @throws IllegalArgumentException if {@code partitions} is {@code null}
 * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
 */
@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
    if (partitions == null)
        throw new IllegalArgumentException("Partitions collection cannot be null");

    acquireAndEnsureOpen();
    try {
        Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
        subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
    } finally {
        release();
    }
}

위 함수를 사용하면 offset을 kafka-consumer-groups.sh를 사용하지 않고 리셋할 수 있습니다. 이런 특수사항은 카프카 스트림즈 등을 활용할 때 사용할 수 있는데요. 사용하는 방법은 다음과 같습니다.

public class Main {
    private final static Logger logger = LoggerFactory.getLogger(Main.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        consumer.poll(Duration.ofSeconds(1));
        consumer.seekToBeginning(consumer.assignment());
        consumer.poll(Duration.ofSeconds(1));
        consumer.commitSync();
        consumer.close();

    }
}

참고로 2.5.0 버전 이후부터는 kafka admin client를 사용하여 reset을 수행할 수도 있습니다.

KIP-396 : https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484 

KAFAK-7689 : https://issues.apache.org/jira/browse/KAFKA-7689

반응형