빅데이터/Kafka
kafka consumer와 seekToBeginning를 활용하여 offset reset하기
AndersonChoi
2022. 1. 17. 15:37
카프카 컨슈머 클라이언트는 seekToBeginning 함수가 있습니다. 이 함수를 사용하면 특정 파티션의 오프셋을 최소 레코드로 지정할 수 있습니다.
/**
* Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the
* first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called.
* If no partitions are provided, seek to the first offset for all of the currently assigned partitions.
*
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot be null");
acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
} finally {
release();
}
}
위 함수를 사용하면 offset을 kafka-consumer-groups.sh를 사용하지 않고 리셋할 수 있습니다. 이런 특수사항은 카프카 스트림즈 등을 활용할 때 사용할 수 있는데요. 사용하는 방법은 다음과 같습니다.
public class Main {
private final static Logger logger = LoggerFactory.getLogger(Main.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(consumer.assignment());
consumer.poll(Duration.ofSeconds(1));
consumer.commitSync();
consumer.close();
}
}
참고로 2.5.0 버전 이후부터는 kafka admin client를 사용하여 reset을 수행할 수도 있습니다.
KIP-396 : https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484
KAFAK-7689 : https://issues.apache.org/jira/browse/KAFKA-7689
반응형