본문 바로가기

빅데이터/Kafka

스키마 레지스트리 자바 클라이언트(프로듀서,컨슈머) 테스트

1. 스키마 레지스트리 설정, 실행

confluent-7.0.0/etc/schema-registry/schema-registry.properties 설정파일

listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.topic=_schemas
debug=false

스키마 레지스트리 실행

$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
[2021-12-16 21:36:27,121] INFO SchemaRegistryConfig values:
	access.control.allow.headers =
	access.control.allow.methods =
	access.control.allow.origin =
	access.control.skip.options = true
	authentication.method = NONE
	authentication.realm =
	authentication.roles = [*]
    ...
[2021-12-16 21:36:30,410] INFO Started NetworkTrafficServerConnector@7a4ccb53{HTTP/1.1, (http/1.1)}{0.0.0.0:8081} (org.eclipse.jetty.server.AbstractConnector:331)
[2021-12-16 21:36:30,410] INFO Started @5668ms (org.eclipse.jetty.server.Server:415)
[2021-12-16 21:36:30,411] INFO Schema Registry version: 7.0.0 commitId: b16f82f4c22bd32ed24a8e935bf90ac30ee69c5c (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
[2021-12-16 21:36:30,411] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:47)

default로 설정되는 schema.compatibility.level값은 backward이다. backward 호환성은 프로듀서에서 버전n-1 또는 버전n의 스키마가 전달될 때 컨슈머는 버전n을 기준으로 처리하는 것을 뜻한다.

 

스키마 레지스트리 접속 및 확인

$ curl localhost:8081
{}
$ curl localhost:8081/subjects
[]

 

2. 토픽 생성

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --list
test

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
Topic: test	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: test	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

 

3. 프로듀서로 데이터 전송

build.gradle

plugins {
    id 'java'
}

group 'org.com.example'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
    maven {
        url "https://packages.confluent.io/maven"
    }
}

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
    implementation 'org.apache.kafka:kafka-clients:2.5.0'
    implementation 'org.slf4j:slf4j-simple:1.7.30'
    implementation 'io.confluent:kafka-avro-serializer:7.0.1'
}

test {
    useJUnitPlatform()
}

build.gradle 설정시 confluent의 maven repository를 추가해야 정상적으로 라이브러리를 가져온다.

 

Main.java

import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;

import java.util.Properties;
import java.util.Random;
import java.util.UUID;

public class Main {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        configs.setProperty("schema.registry.url", "http://localhost:8081");

        String schema = "{"
                + "\"namespace\": \"myrecord\","
                + " \"name\": \"orders\","
                + " \"type\": \"record\","
                + " \"fields\": ["
                + "     {\"name\": \"orderTime\", \"type\": \"long\"},"
                + "     {\"name\": \"orderId\",  \"type\": \"long\"},"
                + "     {\"name\": \"itemId\", \"type\": \"string\"}"
                + " ]"
                + "}";

        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema1 = parser.parse(schema);

        // generate avro generic record
        GenericRecord avroRecord = new GenericData.Record(avroSchema1);
        avroRecord.put("orderTime", System.nanoTime());
        avroRecord.put("orderId", new Random().nextLong());
        avroRecord.put("itemId", UUID.randomUUID().toString());

        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(configs);
        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(TOPIC_NAME, avroRecord);
        producer.send(record);
        producer.flush();
        producer.close();
    }
}

 

4. 스키마 레지스트리에 등록된 내용 확인

$ curl localhost:8081/subjects
["test-value"]

$ curl localhost:8081/subjects/test-value/versions
[1]

$ curl localhost:8081/subjects/test-value/versions/1
{
    "subject": "test-value",
    "version": 1,
    "id": 1,
    "schema": "{\"type\":\"record\",\"name\":\"orders\",\"namespace\":\"myrecord\",\"fields\":[{\"name\":\"orderTime\",\"type\":\"long\"},{\"name\":\"orderId\",\"type\":\"long\"},{\"name\":\"itemId\",\"type\":\"string\"}]}"
}

subject이름이 {토픽명}-value로 자동생성되었다. 신규 생성되었으므로 버전과 아이디는 1

 

5. 컨슈머에서 데이터 확인

Main.java

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class Main {
    private final static Logger logger = LoggerFactory.getLogger(Main.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configs.put("schema.registry.url", "http://localhost:8081");

        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, GenericRecord> record : records) {
                logger.info("=====");
                logger.info("record:{}", record);
                logger.info("value:{}", record.value().toString());
            }
        }
    }
}

 

실행 로그

9:44:11 오후: Executing task ':Main.main()'...

> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes

> Task :Main.main()
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-test-group-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = test-group
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer

[main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: 
	auto.register.schemas = true
	avro.reflection.allow.null = false
	avro.use.logical.type.converters = false
	basic.auth.credentials.source = URL
	basic.auth.user.info = [hidden]
	bearer.auth.credentials.source = STATIC_TOKEN
	bearer.auth.token = [hidden]
	context.name.strategy = class io.confluent.kafka.serializers.context.NullContextNameStrategy
	id.compatibility.strict = true
	key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
	latest.compatibility.strict = true
	max.schemas.per.subject = 1000
	normalize.schemas = false
	proxy.host = 
	proxy.port = -1
	schema.reflection = false
	schema.registry.basic.auth.user.info = [hidden]
	schema.registry.ssl.cipher.suites = null
	schema.registry.ssl.enabled.protocols = [TLSv1.2]
	schema.registry.ssl.endpoint.identification.algorithm = https
	schema.registry.ssl.engine.factory.class = null
	schema.registry.ssl.key.password = null
	schema.registry.ssl.keymanager.algorithm = SunX509
	schema.registry.ssl.keystore.certificate.chain = null
	schema.registry.ssl.keystore.key = null
	schema.registry.ssl.keystore.location = null
	schema.registry.ssl.keystore.password = null
	schema.registry.ssl.keystore.type = JKS
	schema.registry.ssl.protocol = TLSv1.2
	schema.registry.ssl.provider = null
	schema.registry.ssl.secure.random.implementation = null
	schema.registry.ssl.trustmanager.algorithm = PKIX
	schema.registry.ssl.truststore.certificates = null
	schema.registry.ssl.truststore.location = null
	schema.registry.ssl.truststore.password = null
	schema.registry.ssl.truststore.type = JKS
	schema.registry.url = [http://localhost:8081]
	specific.avro.reader = false
	use.latest.version = false
	use.schema.id = -1
	value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 7.0.1-ccs
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: b7e52413e7cb3e8b
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1639658652510
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-group-1, groupId=test-group] Subscribed to topic(s): test
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-group-1, groupId=test-group] Cluster ID: k41kHM4_RP6i3Lx7MYeeXA
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Request joining group due to: need to re-join with the given member-id
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Finished assignment for group at generation 1: {consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6=Assignment(partitions=[test-0, test-1, test-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Notifying assignor about the new Assignment(partitions=[test-0, test-1, test-2])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Adding newly assigned partitions: test-1, test-0, test-2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-2
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1639658381544, serialized key size = -1, serialized value size = 60, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1473982448382288, "orderId": -7246011478677708355, "itemId": "33ce3a10-9277-4240-a7f0-db11bc4d11e3"})
[main] INFO com.example.Main - value:{"orderTime": 1473982448382288, "orderId": -7246011478677708355, "itemId": "33ce3a10-9277-4240-a7f0-db11bc4d11e3"}

 

6. kafka-console-consumer.sh로 데이터 확인

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
��탉����提À��H33ce3a10-9277-4240-a7f0-db11bc4d11e3

당연하게도 해당 데이터는 Avro로 직렬화되었으므로 String으로는 print가 불가능하다. 즉, 스키마 레지스트리 또는 이미 알고 있는 avroSchema 형태로 컨슈머를 운영해야 한다는 점이다. 

 

7. 스키마 레지스트리에 신규 스키마 적용 

$ echo '{
    "type": "record",
    "name": "orders",
    "fields": [
        {
            "name": "orderTime",
            "type": "long"
        },
        {
            "name": "orderId",
            "type": "long"
        },
        {
            "name": "itemNo",
            "type": "int",
            "default": 0
        }
    ]
}' | \
    jq '. | {schema: tojson}' | \
    curl -X POST http://localhost:8081/subjects/test-value/versions \
         -H "Content-Type:application/json" \
         -d @-
         
{"id":2}

$ curl http://localhost:8081/subjects/test-value/versions
[1,2]

$ curl http://localhost:8081/subjects/test-value/versions/2
{
    "subject": "test-value",
    "version": 2,
    "id": 2,
    "schema": "{\"type\":\"record\",\"name\":\"orders\",\"fields\": [{\"name\":\"orderTime\",\"type\":\"long\"},{\"name\":\"orderId\",\"type\":\"long\"},{\"name\":\"itemNo\",\"type\":\"int\",\"default\":0}]}"
}

스키마 레지스트리 관련 치트시트 참고 : https://rmoff.net/2019/01/17/confluent-schema-registry-rest-api-cheatsheet/

 

backward 호환성은 버전1과 버전2가 있을 경우 버전2로 저장되는 컨슈머에서 버전1과 버전2를 읽을 수 있는 것입니다. 즉, 하위 버전을 읽을 수 있는 것이 backward 호환성. 이때 가능한 형태는 필드가 삭제되거나 기본 값이 지정된 필드를 추가하는 경우입니다. 위 경우에서는 itemId를 삭제하고 default value(0)이 있는 itemNo가 신규 필드를 추가하였습니다. 

 

정상적으로 전달되는지 확인하기 위해 2개의 스키마의 레코드를 전송해봅니다. 2개 스키마 레코드를 전송하기 위해 다음과 같이 프로듀서의 Main.java를 수정하였습니다.

Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
configs.setProperty("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(configs);

// 버전1 스키마 전송
String schema = "{"
        + "\"namespace\": \"myrecord\","
        + " \"name\": \"orders\","
        + " \"type\": \"record\","
        + " \"fields\": ["
        + "     {\"name\": \"orderTime\", \"type\": \"long\"},"
        + "     {\"name\": \"orderId\",  \"type\": \"long\"},"
        + "     {\"name\": \"itemId\", \"type\": \"string\"}"
        + " ]"
        + "}";

Schema.Parser parser = new Schema.Parser();
Schema avroSchema1 = parser.parse(schema);

GenericRecord avroRecord = new GenericData.Record(avroSchema1);
avroRecord.put("orderTime", System.nanoTime());
avroRecord.put("orderId", new Random().nextLong());
avroRecord.put("itemId", UUID.randomUUID().toString());

ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(TOPIC_NAME, avroRecord);
producer.send(record);


// 버전2 스키마 전송
String schema2 = "{"
        + "\"namespace\": \"myrecord\","
        + " \"name\": \"orders\","
        + " \"type\": \"record\","
        + " \"fields\": ["
        + "     {\"name\": \"orderTime\", \"type\": \"long\"},"
        + "     {\"name\": \"orderId\",  \"type\": \"long\"},"
        + "     {\"name\": \"itemNo\", \"type\": \"int\"}"
        + " ]"
        + "}";

Schema.Parser parser2 = new Schema.Parser();
Schema avroSchema2 = parser2.parse(schema2);

GenericRecord avroRecord2 = new GenericData.Record(avroSchema2);
avroRecord2.put("orderTime", System.nanoTime());
avroRecord2.put("orderId", new Random().nextLong());
avroRecord2.put("itemNo", 123);

ProducerRecord<String, GenericRecord> record2 = new ProducerRecord<>(TOPIC_NAME, avroRecord2);
producer.send(record2);


producer.flush();
producer.close();

컨슈머에서 출력되는 데이터는 다음과 같습니다.

[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1639659750486, serialized key size = -1, serialized value size = 59, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1475351495838191, "orderId": -111837104074635996, "itemId": "921031d6-900f-4dd3-9a5f-2cdc984ae6bc"})
[main] INFO com.example.Main - value:{"orderTime": 1475351495838191, "orderId": -111837104074635996, "itemId": "921031d6-900f-4dd3-9a5f-2cdc984ae6bc"}
[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1639659750587, serialized key size = -1, serialized value size = 24, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1475351787197929, "orderId": 2076111075740945742, "itemNo": 123})
[main] INFO com.example.Main - value:{"orderTime": 1475351787197929, "orderId": 2076111075740945742, "itemNo": 123}

backward 호환성에서는 배포 순서가 중요하다. 신규 호환 되는 n+1 버전 스키마를 가져가는 컨슈머를 먼저 배포해서 호환성을 유지하면서 프로듀서를 n -> n+1로 배포해야 하는 것이다.

 

그런데 상기 테스트를 진행하면서 알 수 있다시피 GenericRecord를 확인할 때 컨슈머에서 deserialize시 엄격하게 보지 않는다 이를 해결하기 위해서는 다음과 같은 옵션을 추가해야한다.

properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#example-consumer-code

 

On-Premises Schema Registry Tutorial | Confluent Documentation

Home Schema Management Schema Registry Tutorials Looking for Confluent Cloud Schema Management docs? These pages cover some aspects of Schema Registry that are generally applicable, such as general concepts, schema formats, hybrid use cases, and tutorials,

docs.confluent.io

By default, each record is deserialized into an Avro GenericRecord, but in this tutorial the record should be deserialized using the application’s code-generated Payment class. Therefore, configure the deserializer to use Avro SpecificRecord, i.e., SPECIFIC_AVRO_READER_CONFIG should be set to true. For example:

...
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
...
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
  ConsumerRecords<String, Payment> records = consumer.poll(100);
  for (ConsumerRecord<String, Payment> record : records) {
    String key = record.key();
    Payment value = record.value();
  }
}
...

다시 컨슈머를 earliest부터 읽도록 실행하면 다음과 같은 에러가 발생한다.

[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group3-1, groupId=test-group3] Resetting offset for partition test-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
Exception in thread "main" org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition test-1 at offset 0. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
	at com.example.Main.main(Main.java:39)
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class myrecord.orders specified in writer's schema whilst finding reader's schema for a SpecificRecord.
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class myrecord.orders specified in writer's schema whilst finding reader's schema for a SpecificRecord.

	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getSpecificReaderSchema(AbstractKafkaAvroDeserializer.java:281)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getReaderSchema(AbstractKafkaAvroDeserializer.java:252)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getDatumReader(AbstractKafkaAvroDeserializer.java:196)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:391)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:114)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
	at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420)
	... 9 more

 

반응형