스키마 레지스트리 자바 클라이언트(프로듀서,컨슈머) 테스트
1. 스키마 레지스트리 설정, 실행
confluent-7.0.0/etc/schema-registry/schema-registry.properties 설정파일
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.topic=_schemas
debug=false
스키마 레지스트리 실행
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
[2021-12-16 21:36:27,121] INFO SchemaRegistryConfig values:
access.control.allow.headers =
access.control.allow.methods =
access.control.allow.origin =
access.control.skip.options = true
authentication.method = NONE
authentication.realm =
authentication.roles = [*]
...
[2021-12-16 21:36:30,410] INFO Started NetworkTrafficServerConnector@7a4ccb53{HTTP/1.1, (http/1.1)}{0.0.0.0:8081} (org.eclipse.jetty.server.AbstractConnector:331)
[2021-12-16 21:36:30,410] INFO Started @5668ms (org.eclipse.jetty.server.Server:415)
[2021-12-16 21:36:30,411] INFO Schema Registry version: 7.0.0 commitId: b16f82f4c22bd32ed24a8e935bf90ac30ee69c5c (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
[2021-12-16 21:36:30,411] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:47)
default로 설정되는 schema.compatibility.level값은 backward이다. backward 호환성은 프로듀서에서 버전n-1 또는 버전n의 스키마가 전달될 때 컨슈머는 버전n을 기준으로 처리하는 것을 뜻한다.
스키마 레지스트리 접속 및 확인
$ curl localhost:8081
{}
$ curl localhost:8081/subjects
[]
2. 토픽 생성
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --list
test
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
Topic: test PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
3. 프로듀서로 데이터 전송
build.gradle
plugins {
id 'java'
}
group 'org.com.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
}
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30'
implementation 'io.confluent:kafka-avro-serializer:7.0.1'
}
test {
useJUnitPlatform()
}
build.gradle 설정시 confluent의 maven repository를 추가해야 정상적으로 라이브러리를 가져온다.
Main.java
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
public class Main {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
configs.setProperty("schema.registry.url", "http://localhost:8081");
String schema = "{"
+ "\"namespace\": \"myrecord\","
+ " \"name\": \"orders\","
+ " \"type\": \"record\","
+ " \"fields\": ["
+ " {\"name\": \"orderTime\", \"type\": \"long\"},"
+ " {\"name\": \"orderId\", \"type\": \"long\"},"
+ " {\"name\": \"itemId\", \"type\": \"string\"}"
+ " ]"
+ "}";
Schema.Parser parser = new Schema.Parser();
Schema avroSchema1 = parser.parse(schema);
// generate avro generic record
GenericRecord avroRecord = new GenericData.Record(avroSchema1);
avroRecord.put("orderTime", System.nanoTime());
avroRecord.put("orderId", new Random().nextLong());
avroRecord.put("itemId", UUID.randomUUID().toString());
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(configs);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(TOPIC_NAME, avroRecord);
producer.send(record);
producer.flush();
producer.close();
}
}
4. 스키마 레지스트리에 등록된 내용 확인
$ curl localhost:8081/subjects
["test-value"]
$ curl localhost:8081/subjects/test-value/versions
[1]
$ curl localhost:8081/subjects/test-value/versions/1
{
"subject": "test-value",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"orders\",\"namespace\":\"myrecord\",\"fields\":[{\"name\":\"orderTime\",\"type\":\"long\"},{\"name\":\"orderId\",\"type\":\"long\"},{\"name\":\"itemId\",\"type\":\"string\"}]}"
}
subject이름이 {토픽명}-value로 자동생성되었다. 신규 생성되었으므로 버전과 아이디는 1
5. 컨슈머에서 데이터 확인
Main.java
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Main {
private final static Logger logger = LoggerFactory.getLogger(Main.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put("schema.registry.url", "http://localhost:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, GenericRecord> record : records) {
logger.info("=====");
logger.info("record:{}", record);
logger.info("value:{}", record.value().toString());
}
}
}
}
실행 로그
9:44:11 오후: Executing task ':Main.main()'...
> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes
> Task :Main.main()
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-test-group-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
[main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values:
auto.register.schemas = true
avro.reflection.allow.null = false
avro.use.logical.type.converters = false
basic.auth.credentials.source = URL
basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
bearer.auth.token = [hidden]
context.name.strategy = class io.confluent.kafka.serializers.context.NullContextNameStrategy
id.compatibility.strict = true
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
latest.compatibility.strict = true
max.schemas.per.subject = 1000
normalize.schemas = false
proxy.host =
proxy.port = -1
schema.reflection = false
schema.registry.basic.auth.user.info = [hidden]
schema.registry.ssl.cipher.suites = null
schema.registry.ssl.enabled.protocols = [TLSv1.2]
schema.registry.ssl.endpoint.identification.algorithm = https
schema.registry.ssl.engine.factory.class = null
schema.registry.ssl.key.password = null
schema.registry.ssl.keymanager.algorithm = SunX509
schema.registry.ssl.keystore.certificate.chain = null
schema.registry.ssl.keystore.key = null
schema.registry.ssl.keystore.location = null
schema.registry.ssl.keystore.password = null
schema.registry.ssl.keystore.type = JKS
schema.registry.ssl.protocol = TLSv1.2
schema.registry.ssl.provider = null
schema.registry.ssl.secure.random.implementation = null
schema.registry.ssl.trustmanager.algorithm = PKIX
schema.registry.ssl.truststore.certificates = null
schema.registry.ssl.truststore.location = null
schema.registry.ssl.truststore.password = null
schema.registry.ssl.truststore.type = JKS
schema.registry.url = [http://localhost:8081]
specific.avro.reader = false
use.latest.version = false
use.schema.id = -1
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 7.0.1-ccs
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: b7e52413e7cb3e8b
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1639658652510
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-group-1, groupId=test-group] Subscribed to topic(s): test
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-group-1, groupId=test-group] Cluster ID: k41kHM4_RP6i3Lx7MYeeXA
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Request joining group due to: need to re-join with the given member-id
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Finished assignment for group at generation 1: {consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6=Assignment(partitions=[test-0, test-1, test-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Notifying assignor about the new Assignment(partitions=[test-0, test-1, test-2])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Adding newly assigned partitions: test-1, test-0, test-2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-2
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1639658381544, serialized key size = -1, serialized value size = 60, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1473982448382288, "orderId": -7246011478677708355, "itemId": "33ce3a10-9277-4240-a7f0-db11bc4d11e3"})
[main] INFO com.example.Main - value:{"orderTime": 1473982448382288, "orderId": -7246011478677708355, "itemId": "33ce3a10-9277-4240-a7f0-db11bc4d11e3"}
6. kafka-console-consumer.sh로 데이터 확인
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
��탉����提À��H33ce3a10-9277-4240-a7f0-db11bc4d11e3
당연하게도 해당 데이터는 Avro로 직렬화되었으므로 String으로는 print가 불가능하다. 즉, 스키마 레지스트리 또는 이미 알고 있는 avroSchema 형태로 컨슈머를 운영해야 한다는 점이다.
7. 스키마 레지스트리에 신규 스키마 적용
$ echo '{
"type": "record",
"name": "orders",
"fields": [
{
"name": "orderTime",
"type": "long"
},
{
"name": "orderId",
"type": "long"
},
{
"name": "itemNo",
"type": "int",
"default": 0
}
]
}' | \
jq '. | {schema: tojson}' | \
curl -X POST http://localhost:8081/subjects/test-value/versions \
-H "Content-Type:application/json" \
-d @-
{"id":2}
$ curl http://localhost:8081/subjects/test-value/versions
[1,2]
$ curl http://localhost:8081/subjects/test-value/versions/2
{
"subject": "test-value",
"version": 2,
"id": 2,
"schema": "{\"type\":\"record\",\"name\":\"orders\",\"fields\": [{\"name\":\"orderTime\",\"type\":\"long\"},{\"name\":\"orderId\",\"type\":\"long\"},{\"name\":\"itemNo\",\"type\":\"int\",\"default\":0}]}"
}
스키마 레지스트리 관련 치트시트 참고 : https://rmoff.net/2019/01/17/confluent-schema-registry-rest-api-cheatsheet/
backward 호환성은 버전1과 버전2가 있을 경우 버전2로 저장되는 컨슈머에서 버전1과 버전2를 읽을 수 있는 것입니다. 즉, 하위 버전을 읽을 수 있는 것이 backward 호환성. 이때 가능한 형태는 필드가 삭제되거나 기본 값이 지정된 필드를 추가하는 경우입니다. 위 경우에서는 itemId를 삭제하고 default value(0)이 있는 itemNo가 신규 필드를 추가하였습니다.
정상적으로 전달되는지 확인하기 위해 2개의 스키마의 레코드를 전송해봅니다. 2개 스키마 레코드를 전송하기 위해 다음과 같이 프로듀서의 Main.java를 수정하였습니다.
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
configs.setProperty("schema.registry.url", "http://localhost:8081");
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(configs);
// 버전1 스키마 전송
String schema = "{"
+ "\"namespace\": \"myrecord\","
+ " \"name\": \"orders\","
+ " \"type\": \"record\","
+ " \"fields\": ["
+ " {\"name\": \"orderTime\", \"type\": \"long\"},"
+ " {\"name\": \"orderId\", \"type\": \"long\"},"
+ " {\"name\": \"itemId\", \"type\": \"string\"}"
+ " ]"
+ "}";
Schema.Parser parser = new Schema.Parser();
Schema avroSchema1 = parser.parse(schema);
GenericRecord avroRecord = new GenericData.Record(avroSchema1);
avroRecord.put("orderTime", System.nanoTime());
avroRecord.put("orderId", new Random().nextLong());
avroRecord.put("itemId", UUID.randomUUID().toString());
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(TOPIC_NAME, avroRecord);
producer.send(record);
// 버전2 스키마 전송
String schema2 = "{"
+ "\"namespace\": \"myrecord\","
+ " \"name\": \"orders\","
+ " \"type\": \"record\","
+ " \"fields\": ["
+ " {\"name\": \"orderTime\", \"type\": \"long\"},"
+ " {\"name\": \"orderId\", \"type\": \"long\"},"
+ " {\"name\": \"itemNo\", \"type\": \"int\"}"
+ " ]"
+ "}";
Schema.Parser parser2 = new Schema.Parser();
Schema avroSchema2 = parser2.parse(schema2);
GenericRecord avroRecord2 = new GenericData.Record(avroSchema2);
avroRecord2.put("orderTime", System.nanoTime());
avroRecord2.put("orderId", new Random().nextLong());
avroRecord2.put("itemNo", 123);
ProducerRecord<String, GenericRecord> record2 = new ProducerRecord<>(TOPIC_NAME, avroRecord2);
producer.send(record2);
producer.flush();
producer.close();
컨슈머에서 출력되는 데이터는 다음과 같습니다.
[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1639659750486, serialized key size = -1, serialized value size = 59, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1475351495838191, "orderId": -111837104074635996, "itemId": "921031d6-900f-4dd3-9a5f-2cdc984ae6bc"})
[main] INFO com.example.Main - value:{"orderTime": 1475351495838191, "orderId": -111837104074635996, "itemId": "921031d6-900f-4dd3-9a5f-2cdc984ae6bc"}
[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1639659750587, serialized key size = -1, serialized value size = 24, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1475351787197929, "orderId": 2076111075740945742, "itemNo": 123})
[main] INFO com.example.Main - value:{"orderTime": 1475351787197929, "orderId": 2076111075740945742, "itemNo": 123}
backward 호환성에서는 배포 순서가 중요하다. 신규 호환 되는 n+1 버전 스키마를 가져가는 컨슈머를 먼저 배포해서 호환성을 유지하면서 프로듀서를 n -> n+1로 배포해야 하는 것이다.
그런데 상기 테스트를 진행하면서 알 수 있다시피 GenericRecord를 확인할 때 컨슈머에서 deserialize시 엄격하게 보지 않는다 이를 해결하기 위해서는 다음과 같은 옵션을 추가해야한다.
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
By default, each record is deserialized into an Avro GenericRecord, but in this tutorial the record should be deserialized using the application’s code-generated Payment class. Therefore, configure the deserializer to use Avro SpecificRecord, i.e., SPECIFIC_AVRO_READER_CONFIG should be set to true. For example:
...
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
...
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, Payment> records = consumer.poll(100);
for (ConsumerRecord<String, Payment> record : records) {
String key = record.key();
Payment value = record.value();
}
}
...
다시 컨슈머를 earliest부터 읽도록 실행하면 다음과 같은 에러가 발생한다.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group3-1, groupId=test-group3] Resetting offset for partition test-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
Exception in thread "main" org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition test-1 at offset 0. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at com.example.Main.main(Main.java:39)
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class myrecord.orders specified in writer's schema whilst finding reader's schema for a SpecificRecord.
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class myrecord.orders specified in writer's schema whilst finding reader's schema for a SpecificRecord.
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getSpecificReaderSchema(AbstractKafkaAvroDeserializer.java:281)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getReaderSchema(AbstractKafkaAvroDeserializer.java:252)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getDatumReader(AbstractKafkaAvroDeserializer.java:196)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:391)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:114)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420)
... 9 more