Skip to content

Commit

Permalink
Generic serdes implementation (#5171)
Browse files Browse the repository at this point in the history
* Move kafka serdes to a separate directory and remove deprecated classes

* Create generic serdes and prepare avro kafka serdes delegating them

* Prepare generic jsonschema serdes

* Create kafka jsonschema serde

* Prepare protobuf kafka serdes

* Adapt converters to new serdes structure

* Fix headers handler configuration and improve default configuration management

* Improve generic serdes structure

* Fix schema resolver initialisation process

* Fix headers handling in jsonschema serde

* Fix protobuf serde configuration

* Adjust integration tests to new structure

* Rename kafka serdes common module directory

* Adapt examples to the generic serdes structure

* Update migrating client applications documentation
  • Loading branch information
carlesarnal authored Sep 18, 2024
1 parent f39bdea commit 2c6897a
Show file tree
Hide file tree
Showing 160 changed files with 2,521 additions and 2,286 deletions.
6 changes: 3 additions & 3 deletions app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,17 @@
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
<artifactId>apicurio-registry-avro-serde-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-protobuf-serde</artifactId>
<artifactId>apicurio-registry-protobuf-serde-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-jsonschema-serde</artifactId>
<artifactId>apicurio-registry-jsonschema-serde-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.apicurio.registry.noprofile;

import io.apicurio.registry.AbstractResourceTestBase;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.resolver.SchemaResolverConfig;
import io.apicurio.registry.serde.config.KafkaSerdeConfig;
import io.apicurio.registry.serde.config.SerdeConfig;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
import io.apicurio.registry.support.Person;
Expand Down Expand Up @@ -39,12 +41,11 @@ public void testSchema() throws Exception {

Person person = new Person("Ales", "Justin", 23);

try (JsonSchemaKafkaSerializer<Person> serializer = new JsonSchemaKafkaSerializer<>(clientV3, true);
JsonSchemaKafkaDeserializer<Person> deserializer = new JsonSchemaKafkaDeserializer<>(clientV3,
true)) {
try (JsonSchemaKafkaSerializer<Person> serializer = new JsonSchemaKafkaSerializer<>(clientV3);
JsonSchemaKafkaDeserializer<Person> deserializer = new JsonSchemaKafkaDeserializer<>(clientV3)) {

Map<String, String> configs = Map.of(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId,
SerdeConfig.ENABLE_HEADERS, "true");
Map<String, String> configs = Map.of(SchemaResolverConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId,
KafkaSerdeConfig.ENABLE_HEADERS, "true", SerdeConfig.VALIDATION_ENABLED, "true");
serializer.configure(configs, false);

deserializer.configure(configs, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
import io.apicurio.registry.AbstractResourceTestBase;
import io.apicurio.registry.client.auth.VertXAuthFactory;
import io.apicurio.registry.model.GroupId;
import io.apicurio.registry.resolver.SchemaResolverConfig;
import io.apicurio.registry.resolver.strategy.ArtifactReferenceResolverStrategy;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.models.VersionMetaData;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.SerdeHeaders;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
import io.apicurio.registry.serde.avro.AvroSerdeConfig;
import io.apicurio.registry.serde.avro.DefaultAvroDatumProvider;
import io.apicurio.registry.serde.avro.ReflectAllowNullAvroDatumProvider;
import io.apicurio.registry.serde.avro.ReflectAvroDatumProvider;
import io.apicurio.registry.serde.avro.strategy.QualifiedRecordIdStrategy;
import io.apicurio.registry.serde.avro.strategy.RecordIdStrategy;
import io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy;
import io.apicurio.registry.serde.config.IdOption;
import io.apicurio.registry.serde.config.KafkaSerdeConfig;
import io.apicurio.registry.serde.config.SerdeConfig;
import io.apicurio.registry.serde.headers.KafkaSerdeHeaders;
import io.apicurio.registry.support.Tester;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.types.ContentTypes;
Expand Down Expand Up @@ -97,7 +97,7 @@ public void testConfiguration() throws Exception {
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.EXPLICIT_ARTIFACT_VERSION, "1");
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class.getName());
config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
Serializer<GenericData.Record> serializer = new AvroKafkaSerializer<GenericData.Record>();
serializer.configure(config, true);

Expand All @@ -117,8 +117,8 @@ public void testConfiguration() throws Exception {
Assertions.assertEquals(record, deserializedRecord);
Assertions.assertEquals("somebar", record.get("bar").toString());

config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class);
config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class.getName());
config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
serializer.configure(config, true);
bytes = serializer.serialize(topic, record);

Expand All @@ -127,12 +127,13 @@ record = deserializer.deserialize(topic, bytes);
Assertions.assertEquals("somebar", record.get("bar").toString());

config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class.getName());
config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
serializer.configure(config, true);
bytes = serializer.serialize(topic, record);
deserializer.configure(deserializerConfig, true);
record = deserializer.deserialize(topic, bytes);
Assertions.assertEquals("somebar", record.get("bar").toString());

});

serializer.close();
Expand Down Expand Up @@ -213,12 +214,12 @@ public void testAvroJSON() throws Exception {
Deserializer<GenericData.Record> deserializer = new AvroKafkaDeserializer<>(restClient)) {

Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);

config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
deserializer.configure(config, false);

GenericData.Record record = new GenericData.Record(schema);
Expand Down Expand Up @@ -255,14 +256,13 @@ public void avroJsonWithReferences() throws Exception {
Deserializer<AvroSchemaB> deserializer = new AvroKafkaDeserializer<>(restClient)) {

Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);

config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER,
ReflectAvroDatumProvider.class.getName());
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
deserializer.configure(config, false);

AvroSchemaB avroSchemaB = new AvroSchemaB();
Expand Down Expand Up @@ -330,15 +330,14 @@ public void avroJsonWithReferencesDereferenced() throws Exception {
Deserializer<AvroSchemaB> deserializer = new AvroKafkaDeserializer<>(restClient)) {

Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SchemaResolverConfig.DEREFERENCE_SCHEMA, "true");
config.put(SerdeConfig.DEREFERENCE_SCHEMA, "true");
serializer.configure(config, false);

config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER,
ReflectAvroDatumProvider.class.getName());
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
deserializer.configure(config, false);

AvroSchemaB avroSchemaB = new AvroSchemaB();
Expand Down Expand Up @@ -407,15 +406,14 @@ public void avroJsonWithReferencesDeserializerDereferenced() throws Exception {
Deserializer<AvroSchemaB> deserializer = new AvroKafkaDeserializer<>(restClient)) {

Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);

config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER,
ReflectAvroDatumProvider.class.getName());
config.putIfAbsent(SchemaResolverConfig.DESERIALIZER_DEREFERENCE_SCHEMA, "true");
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
config.putIfAbsent(SerdeConfig.DESERIALIZER_DEREFERENCE_SCHEMA, "true");
deserializer.configure(config, false);

AvroSchemaB avroSchemaB = new AvroSchemaB();
Expand Down Expand Up @@ -480,14 +478,13 @@ public void issue4463Test() throws Exception {
Deserializer<LeadFallErstellen> deserializer = new AvroKafkaDeserializer<>(restClient)) {

Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);

config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER,
ReflectAvroDatumProvider.class.getName());
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
deserializer.configure(config, false);

LeadFallErstellen leadFallErstellen = LeadFallErstellen.newBuilder()
Expand Down Expand Up @@ -526,12 +523,12 @@ public void testAvroUsingHeaders() throws Exception {
Deserializer<GenericData.Record> deserializer = new AvroKafkaDeserializer<>(restClient)) {

Map<String, String> config = new HashMap<>();
config.put(SerdeConfig.ENABLE_HEADERS, "true");
config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);

config = new HashMap<>();
config.put(SerdeConfig.ENABLE_HEADERS, "true");
config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
deserializer.configure(config, false);

GenericData.Record record = new GenericData.Record(schema);
Expand All @@ -541,8 +538,8 @@ public void testAvroUsingHeaders() throws Exception {
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, record);

Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID));
headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID);
Assertions.assertNotNull(headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID));
headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID);

GenericData.Record ir = deserializer.deserialize(artifactId, headers, bytes);

Expand All @@ -564,13 +561,13 @@ public void testReferenceRaw() throws Exception {
Deserializer<GenericData.EnumSymbol> deserializer = new AvroKafkaDeserializer<>(restClient)) {

Map<String, String> config = new HashMap<>();
config.put(SerdeConfig.ENABLE_HEADERS, "true");
config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, RecordIdStrategy.class.getName());
serializer.configure(config, false);

config = new HashMap<>();
config.put(SerdeConfig.ENABLE_HEADERS, "true");
config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
deserializer.configure(config, false);

GenericData.EnumSymbol record = new GenericData.EnumSymbol(eventTypeSchema, "UNDEFINED");
Expand All @@ -579,8 +576,8 @@ public void testReferenceRaw() throws Exception {
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, record);

Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID));
Header contentId = headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID);
Assertions.assertNotNull(headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID));
Header contentId = headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID);
long contentIdKey = ByteBuffer.wrap(contentId.value()).getLong();

waitForSchemaLongId(id -> {
Expand Down Expand Up @@ -625,13 +622,12 @@ private void testAvroReflect(Class<?> artifactResolverStrategyClass, Class<?> da

Map<String, String> config = new HashMap<>();
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
config.put(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY,
artifactResolverStrategyClass.getName());
config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, artifactResolverStrategyClass.getName());
serializer.configure(config, false);

config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
deserializer.configure(config, false);

String artifactId = generateArtifactId();
Expand Down
Loading

0 comments on commit 2c6897a

Please sign in to comment.