diff --git a/app/pom.xml b/app/pom.xml
index 215a2b04c6..6a80b4d93b 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -258,17 +258,17 @@
io.apicurio
- apicurio-registry-serdes-avro-serde
+ apicurio-registry-avro-serde-kafka
test
io.apicurio
- apicurio-registry-serdes-protobuf-serde
+ apicurio-registry-protobuf-serde-kafka
test
io.apicurio
- apicurio-registry-serdes-jsonschema-serde
+ apicurio-registry-jsonschema-serde-kafka
test
diff --git a/app/src/test/java/io/apicurio/registry/noprofile/JsonSerdeTest.java b/app/src/test/java/io/apicurio/registry/noprofile/JsonSerdeTest.java
index 2f563c0b01..9321d34876 100644
--- a/app/src/test/java/io/apicurio/registry/noprofile/JsonSerdeTest.java
+++ b/app/src/test/java/io/apicurio/registry/noprofile/JsonSerdeTest.java
@@ -1,7 +1,9 @@
package io.apicurio.registry.noprofile;
import io.apicurio.registry.AbstractResourceTestBase;
-import io.apicurio.registry.serde.SerdeConfig;
+import io.apicurio.registry.resolver.SchemaResolverConfig;
+import io.apicurio.registry.serde.config.KafkaSerdeConfig;
+import io.apicurio.registry.serde.config.SerdeConfig;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
import io.apicurio.registry.support.Person;
@@ -39,12 +41,11 @@ public void testSchema() throws Exception {
Person person = new Person("Ales", "Justin", 23);
- try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(clientV3, true);
- JsonSchemaKafkaDeserializer deserializer = new JsonSchemaKafkaDeserializer<>(clientV3,
- true)) {
+ try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(clientV3);
+ JsonSchemaKafkaDeserializer deserializer = new JsonSchemaKafkaDeserializer<>(clientV3)) {
- Map configs = Map.of(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId,
- SerdeConfig.ENABLE_HEADERS, "true");
+ Map configs = Map.of(SchemaResolverConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId,
+ KafkaSerdeConfig.ENABLE_HEADERS, "true", SerdeConfig.VALIDATION_ENABLED, "true");
serializer.configure(configs, false);
deserializer.configure(configs, false);
diff --git a/app/src/test/java/io/apicurio/registry/noprofile/serde/AvroSerdeTest.java b/app/src/test/java/io/apicurio/registry/noprofile/serde/AvroSerdeTest.java
index 33e806f890..1ee4bc2587 100644
--- a/app/src/test/java/io/apicurio/registry/noprofile/serde/AvroSerdeTest.java
+++ b/app/src/test/java/io/apicurio/registry/noprofile/serde/AvroSerdeTest.java
@@ -13,15 +13,12 @@
import io.apicurio.registry.AbstractResourceTestBase;
import io.apicurio.registry.client.auth.VertXAuthFactory;
import io.apicurio.registry.model.GroupId;
-import io.apicurio.registry.resolver.SchemaResolverConfig;
import io.apicurio.registry.resolver.strategy.ArtifactReferenceResolverStrategy;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.models.VersionMetaData;
-import io.apicurio.registry.serde.SerdeConfig;
-import io.apicurio.registry.serde.SerdeHeaders;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
-import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
+import io.apicurio.registry.serde.avro.AvroSerdeConfig;
import io.apicurio.registry.serde.avro.DefaultAvroDatumProvider;
import io.apicurio.registry.serde.avro.ReflectAllowNullAvroDatumProvider;
import io.apicurio.registry.serde.avro.ReflectAvroDatumProvider;
@@ -29,6 +26,9 @@
import io.apicurio.registry.serde.avro.strategy.RecordIdStrategy;
import io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy;
import io.apicurio.registry.serde.config.IdOption;
+import io.apicurio.registry.serde.config.KafkaSerdeConfig;
+import io.apicurio.registry.serde.config.SerdeConfig;
+import io.apicurio.registry.serde.headers.KafkaSerdeHeaders;
import io.apicurio.registry.support.Tester;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.types.ContentTypes;
@@ -97,7 +97,7 @@ public void testConfiguration() throws Exception {
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.EXPLICIT_ARTIFACT_VERSION, "1");
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class.getName());
- config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
+ config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
Serializer serializer = new AvroKafkaSerializer();
serializer.configure(config, true);
@@ -117,8 +117,8 @@ public void testConfiguration() throws Exception {
Assertions.assertEquals(record, deserializedRecord);
Assertions.assertEquals("somebar", record.get("bar").toString());
- config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class);
- config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class);
+ config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class.getName());
+ config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
serializer.configure(config, true);
bytes = serializer.serialize(topic, record);
@@ -127,12 +127,13 @@ record = deserializer.deserialize(topic, bytes);
Assertions.assertEquals("somebar", record.get("bar").toString());
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class.getName());
- config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
+ config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, DefaultAvroDatumProvider.class.getName());
serializer.configure(config, true);
bytes = serializer.serialize(topic, record);
deserializer.configure(deserializerConfig, true);
record = deserializer.deserialize(topic, bytes);
Assertions.assertEquals("somebar", record.get("bar").toString());
+
});
serializer.close();
@@ -213,12 +214,12 @@ public void testAvroJSON() throws Exception {
Deserializer deserializer = new AvroKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);
config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
deserializer.configure(config, false);
GenericData.Record record = new GenericData.Record(schema);
@@ -255,14 +256,13 @@ public void avroJsonWithReferences() throws Exception {
Deserializer deserializer = new AvroKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);
config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
- config.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER,
- ReflectAvroDatumProvider.class.getName());
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
+ config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
deserializer.configure(config, false);
AvroSchemaB avroSchemaB = new AvroSchemaB();
@@ -330,15 +330,14 @@ public void avroJsonWithReferencesDereferenced() throws Exception {
Deserializer deserializer = new AvroKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
- config.put(SchemaResolverConfig.DEREFERENCE_SCHEMA, "true");
+ config.put(SerdeConfig.DEREFERENCE_SCHEMA, "true");
serializer.configure(config, false);
config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
- config.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER,
- ReflectAvroDatumProvider.class.getName());
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
+ config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
deserializer.configure(config, false);
AvroSchemaB avroSchemaB = new AvroSchemaB();
@@ -407,15 +406,14 @@ public void avroJsonWithReferencesDeserializerDereferenced() throws Exception {
Deserializer deserializer = new AvroKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);
config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
- config.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER,
- ReflectAvroDatumProvider.class.getName());
- config.putIfAbsent(SchemaResolverConfig.DESERIALIZER_DEREFERENCE_SCHEMA, "true");
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
+ config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
+ config.putIfAbsent(SerdeConfig.DESERIALIZER_DEREFERENCE_SCHEMA, "true");
deserializer.configure(config, false);
AvroSchemaB avroSchemaB = new AvroSchemaB();
@@ -480,14 +478,13 @@ public void issue4463Test() throws Exception {
Deserializer deserializer = new AvroKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);
config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
- config.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER,
- ReflectAvroDatumProvider.class.getName());
+ config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
+ config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
deserializer.configure(config, false);
LeadFallErstellen leadFallErstellen = LeadFallErstellen.newBuilder()
@@ -526,12 +523,12 @@ public void testAvroUsingHeaders() throws Exception {
Deserializer deserializer = new AvroKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);
config = new HashMap<>();
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
deserializer.configure(config, false);
GenericData.Record record = new GenericData.Record(schema);
@@ -541,8 +538,8 @@ public void testAvroUsingHeaders() throws Exception {
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, record);
- Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID));
- headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID);
+ Assertions.assertNotNull(headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID));
+ headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID);
GenericData.Record ir = deserializer.deserialize(artifactId, headers, bytes);
@@ -564,13 +561,13 @@ public void testReferenceRaw() throws Exception {
Deserializer deserializer = new AvroKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, RecordIdStrategy.class.getName());
serializer.configure(config, false);
config = new HashMap<>();
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
deserializer.configure(config, false);
GenericData.EnumSymbol record = new GenericData.EnumSymbol(eventTypeSchema, "UNDEFINED");
@@ -579,8 +576,8 @@ public void testReferenceRaw() throws Exception {
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, record);
- Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID));
- Header contentId = headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID);
+ Assertions.assertNotNull(headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID));
+ Header contentId = headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID);
long contentIdKey = ByteBuffer.wrap(contentId.value()).getLong();
waitForSchemaLongId(id -> {
@@ -625,13 +622,12 @@ private void testAvroReflect(Class> artifactResolverStrategyClass, Class> da
Map config = new HashMap<>();
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
- config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
- config.put(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY,
- artifactResolverStrategyClass.getName());
+ config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
+ config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, artifactResolverStrategyClass.getName());
serializer.configure(config, false);
config = new HashMap<>();
- config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
+ config.put(AvroSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
deserializer.configure(config, false);
String artifactId = generateArtifactId();
diff --git a/app/src/test/java/io/apicurio/registry/noprofile/serde/JsonSchemaSerdeTest.java b/app/src/test/java/io/apicurio/registry/noprofile/serde/JsonSchemaSerdeTest.java
index 38e1a64697..a39f73a2e1 100644
--- a/app/src/test/java/io/apicurio/registry/noprofile/serde/JsonSchemaSerdeTest.java
+++ b/app/src/test/java/io/apicurio/registry/noprofile/serde/JsonSchemaSerdeTest.java
@@ -16,9 +16,10 @@
import io.apicurio.registry.rest.client.models.IfArtifactExists;
import io.apicurio.registry.rest.client.models.VersionMetaData;
import io.apicurio.registry.serde.SchemaResolverConfigurer;
-import io.apicurio.registry.serde.SerdeConfig;
-import io.apicurio.registry.serde.SerdeHeaders;
import io.apicurio.registry.serde.config.IdOption;
+import io.apicurio.registry.serde.config.KafkaSerdeConfig;
+import io.apicurio.registry.serde.config.SerdeConfig;
+import io.apicurio.registry.serde.headers.KafkaSerdeHeaders;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaParser;
@@ -82,22 +83,23 @@ public void testJsonSchemaSerde() throws Exception {
Person person = new Person("Ales", "Justin", 23);
- try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient, true);
- Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient, true)) {
+ try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient);
+ Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
+ config.put(SerdeConfig.VALIDATION_ENABLED, "true");
serializer.configure(config, false);
- deserializer.configure(Collections.singletonMap(SerdeConfig.ENABLE_HEADERS, "true"), false);
+ deserializer.configure(Collections.singletonMap(KafkaSerdeConfig.ENABLE_HEADERS, "true"), false);
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, person);
- Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID));
- headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID);
+ Assertions.assertNotNull(headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID));
+ headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_CONTENT_ID);
person = deserializer.deserialize(artifactId, headers, bytes);
@@ -132,18 +134,18 @@ public void testJsonSchemaSerdeAutoRegister() throws Exception {
Person person = new Person("Carles", "Arnal", 30);
- try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient, true);
- Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient, true)) {
+ try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient);
+ Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
config.put(SerdeConfig.SCHEMA_LOCATION, "/io/apicurio/registry/util/json-schema.json");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
serializer.configure(config, false);
- deserializer.configure(Collections.singletonMap(SerdeConfig.ENABLE_HEADERS, "true"), false);
+ deserializer.configure(Collections.singletonMap(KafkaSerdeConfig.ENABLE_HEADERS, "true"), false);
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, person);
@@ -188,31 +190,31 @@ public void testJsonSchemaSerdeHeaders() throws Exception {
Person person = new Person("Ales", "Justin", 23);
- try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient, true);
- Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient, true)) {
+ try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient);
+ Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
+ config.put(SerdeConfig.VALIDATION_ENABLED, "true");
config.put(SerdeConfig.USE_ID, IdOption.globalId.name());
serializer.configure(config, false);
- deserializer.configure(
- Map.of(SerdeConfig.ENABLE_HEADERS, "true", SerdeConfig.USE_ID, IdOption.globalId.name()),
- false);
+ deserializer.configure(Map.of(KafkaSerdeConfig.ENABLE_HEADERS, "true", SerdeConfig.USE_ID,
+ IdOption.globalId.name()), false);
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, person);
- Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_GLOBAL_ID));
- Header headerGlobalId = headers.lastHeader(SerdeHeaders.HEADER_VALUE_GLOBAL_ID);
+ Assertions.assertNotNull(headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_GLOBAL_ID));
+ Header headerGlobalId = headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_GLOBAL_ID);
long id = ByteBuffer.wrap(headerGlobalId.value()).getLong();
assertEquals(globalId.intValue(), Long.valueOf(id).intValue());
- Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_MESSAGE_TYPE));
- Header headerMsgType = headers.lastHeader(SerdeHeaders.HEADER_VALUE_MESSAGE_TYPE);
+ Assertions.assertNotNull(headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_MESSAGE_TYPE));
+ Header headerMsgType = headers.lastHeader(KafkaSerdeHeaders.HEADER_VALUE_MESSAGE_TYPE);
assertEquals(person.getClass().getName(), IoUtil.toString(headerMsgType.value()));
person = deserializer.deserialize(artifactId, headers, bytes);
@@ -239,16 +241,19 @@ public void testJsonSchemaSerdeMagicByte() throws Exception {
Person person = new Person("Ales", "Justin", 23);
- try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient, true);
- Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient, true)) {
+ try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient);
+ Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
+ config.put(SerdeConfig.VALIDATION_ENABLED, "true");
serializer.configure(config, false);
- deserializer.configure(Collections.singletonMap(SerdeConfig.ENABLE_HEADERS, "true"), false);
+ deserializer.configure(
+ Map.of(KafkaSerdeConfig.ENABLE_HEADERS, "true", SerdeConfig.VALIDATION_ENABLED, "true"),
+ false);
byte[] bytes = serializer.serialize(artifactId, person);
@@ -335,17 +340,19 @@ public void testJsonSchemaSerdeWithReferences() throws Exception {
CitizenIdentifier identifier = new CitizenIdentifier(123456789);
Citizen citizen = new Citizen("Carles", "Arnal", 23, city, identifier, Collections.emptyList());
- try (
- JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient, true);
- Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient, true)) {
+ try (JsonSchemaKafkaSerializer serializer = new JsonSchemaKafkaSerializer<>(restClient);
+ Deserializer deserializer = new JsonSchemaKafkaDeserializer<>(restClient)) {
Map config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
- config.put(SerdeConfig.ENABLE_HEADERS, "true");
+ config.put(SerdeConfig.VALIDATION_ENABLED, "true");
+ config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
serializer.configure(config, false);
- deserializer.configure(Collections.singletonMap(SerdeConfig.ENABLE_HEADERS, "true"), false);
+ deserializer.configure(
+ Map.of(KafkaSerdeConfig.ENABLE_HEADERS, "true", SerdeConfig.VALIDATION_ENABLED, "true"),
+ false);
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, citizen);
diff --git a/app/src/test/java/io/apicurio/registry/noprofile/serde/ProtobufSerdeTest.java b/app/src/test/java/io/apicurio/registry/noprofile/serde/ProtobufSerdeTest.java
index 1b4f940d34..56566a2e73 100644
--- a/app/src/test/java/io/apicurio/registry/noprofile/serde/ProtobufSerdeTest.java
+++ b/app/src/test/java/io/apicurio/registry/noprofile/serde/ProtobufSerdeTest.java
@@ -8,7 +8,7 @@
import io.apicurio.registry.resolver.SchemaResolverConfig;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.models.VersionMetaData;
-import io.apicurio.registry.serde.SerdeConfig;
+import io.apicurio.registry.serde.config.SerdeConfig;
import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer;
import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer;
import io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy;
diff --git a/docs/modules/ROOT/partials/getting-started/proc-migrating-registry-applications.adoc b/docs/modules/ROOT/partials/getting-started/proc-migrating-registry-applications.adoc
index bf4f5ebe47..e502804d3e 100644
--- a/docs/modules/ROOT/partials/getting-started/proc-migrating-registry-applications.adoc
+++ b/docs/modules/ROOT/partials/getting-started/proc-migrating-registry-applications.adoc
@@ -5,36 +5,35 @@
= Migrating {registry} client applications
[role="_abstract"]
-You must review your existing {registry} client applications to ensure that the Maven dependencies and Java client configuration meet the new requirements for version 2.x. For example, this includes new Maven dependencies for the {registry} Java REST client libraries or Kafka client serializer/deserializer (Serdes) libraries. You must also update your Java application configuration with the new registry v2 API path.
+You must review your existing {registry} client applications to ensure that the Maven dependencies and Java client configuration meet the new requirements for version 3.x. For example, this includes new Maven dependencies for the {registry} Java REST client libraries or Kafka client serializer/deserializer (Serdes) libraries. You must also update your Java application configuration with the new registry v3 API path.
.Prerequisites
-* Existing {registry} {registry-v1} Java client application or Kafka client producer and consumer Java applications with SerDes
+* Existing {registry} {registry-v2} Java client application or Kafka client producer and consumer Java applications with SerDes
.Procedure
-. If you are using the {registry} Java REST client, you must change the Maven dependencies for the {registry} Java client libraries, which have been repackaged in version 2.x:
+. If you are using the {registry} Java REST client, you must change the Maven dependencies for the {registry} Java client libraries, which have been repackaged in version 3.x:
+
[source, xml, subs="attributes+"]
----
io.apicurio
- apicurio-registry-client
+ apicurio-registry-java-sdk
{registry-release}
----
-. In your Java client application, you must change your registry URL configuration, from pointing to the existing v1 API path to the new v2 path. For example:
+. In your Java client application, you must change your registry URL configuration, from pointing to the existing v2 API path to the new v3 path. Starting with v3, we use Vertx as the default platform for our rest client, due to this change, a RequestAdapter is required when the client is create. For example:
+
[source,java, subs="attributes+"]
----
public class ClientExample {
- private static final RegistryRestClient client;
public static void main(String[] args) throws Exception {
- // Create a registry client
- String registryUrl = "https://new-registry.my-company.com/apis/registry/v2";
- RegistryClient client = RegistryClientFactory.create(registryUrl);
+ VertXRequestAdapter vertXRequestAdapter = new VertXRequestAdapter(vertx);
+ vertXRequestAdapter.setBaseUrl("https://new-registry.my-company.com/apis/registry/v3");
+ RegistryClient client = new RegistryClient(vertXRequestAdapter);
}
}
----
@@ -47,52 +46,44 @@ ifdef::rh-service-registry[]
the link:{LinkServiceRegistryUser}#using-the-registry-client[{NameServiceRegistryUser}].
endif::[]
-. If you are using the {registry} SerDes libraries, you must change the Maven dependencies, which have been repackaged in version 2.x. In {registry} {registry-v1}, the SerDes libraries were all provided with only one Maven dependency:
+. If you are using the {registry} SerDes libraries, you must change the Maven dependencies, which have been repackaged in version 3.x. In {registry} {registry-v2}, the SerDes libraries were provided into three separate Maven modules, while this is still true, the names have been changed:
+
[source, xml, subs="attributes+"]
----
io.apicurio
- apicurio-registry-utils-serde
- {registry-v1-release}
-
-----
-+
-In {registry} 2.x, the SerDes libraries have been split into three Maven dependencies, one for each supported data format: `avro`, `protobuf`, and `json schema`, depending on your use cases:
-+
-[source, xml, subs="attributes+"]
-----
-
- io.apicurio
- apicurio-registry-serdes-avro-serde
+ apicurio-registry-avro-serde-kafka
{registry-release}
io.apicurio
- apicurio-registry-serdes-protobuf-serde
+ apicurio-registry-jsonschema-serde-kafka
{registry-release}
io.apicurio
- apicurio-registry-serdes-jsonschema-serde
+ apicurio-registry-protobuf-serde-kafka
{registry-release}
----
++
+In {registry} 3.x, the SerDes libraries have been significantly refactored to make them re-usable for other messaging platforms like Apache Pulsar, that's why the Apache Kafka specific ones have been renamed.
++
-. In your Kafka producer and consumer Java applications, you must change your registry URL configuration from pointing to the existing v1 API path to the new v2 path. For example:
+. In your Kafka producer and consumer Java applications, you must change your registry URL configuration from pointing to the existing v2 API path to the new v3 path. For example:
+
-_Existing registry v1 API path_:
+_Existing registry v2 API path_:
+
[source,java]
----
-props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, "http://old-registry.my-company.com/api");
+props.putIfAbsent(SerdeConfig.REGISTRY_URL, "http://new-registry.my-company.com/apis/registry/v2");
----
+
-_New registry v2 API path_:
+_New registry v3 API path_:
+
[source,java, subs="attributes+"]
----
-props.putIfAbsent(SerdeConfig.REGISTRY_URL, "http://new-registry.my-company.com/apis/registry/v2");
+props.putIfAbsent(SerdeConfig.REGISTRY_URL, "http://new-registry.my-company.com/apis/registry/v3");
----
+
The refactored SerDes libraries also include other important changes to configuration properties. For more details on SerDes configuration, see
diff --git a/docs/modules/ROOT/partials/shared/attributes.adoc b/docs/modules/ROOT/partials/shared/attributes.adoc
index ce7fc12ec0..cfb35c01e0 100644
--- a/docs/modules/ROOT/partials/shared/attributes.adoc
+++ b/docs/modules/ROOT/partials/shared/attributes.adoc
@@ -25,7 +25,8 @@ ifndef::service-registry-downstream[]
:registry-release: 3.0.0
:registry-docker-version: latest-release
:registry-v1: 1.3
-:registry-v1-release: 1.3.2.Final
+:registry-v1-release: 1.3.2.Final
+:registry-v2: 2.6.3
:operator-version: 1.1.0-v2.4.12.final
:kafka-streams: Strimzi
:registry-kafka-version: 3.5
diff --git a/examples/avro-bean/pom.xml b/examples/avro-bean/pom.xml
index 606aac34f0..800d39df62 100644
--- a/examples/avro-bean/pom.xml
+++ b/examples/avro-bean/pom.xml
@@ -18,7 +18,7 @@
io.apicurio
- apicurio-registry-serdes-avro-serde
+ apicurio-registry-avro-serde-kafka
${project.version}
diff --git a/examples/avro-bean/src/main/java/io/apicurio/registry/examples/avro/bean/AvroBeanExample.java b/examples/avro-bean/src/main/java/io/apicurio/registry/examples/avro/bean/AvroBeanExample.java
index ad5abb2a7c..2537a1861b 100644
--- a/examples/avro-bean/src/main/java/io/apicurio/registry/examples/avro/bean/AvroBeanExample.java
+++ b/examples/avro-bean/src/main/java/io/apicurio/registry/examples/avro/bean/AvroBeanExample.java
@@ -16,11 +16,11 @@
package io.apicurio.registry.examples.avro.bean;
-import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
-import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
+import io.apicurio.registry.serde.avro.AvroSerdeConfig;
import io.apicurio.registry.serde.avro.ReflectAvroDatumProvider;
+import io.apicurio.registry.serde.config.SerdeConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -57,7 +57,7 @@
*/
public class AvroBeanExample {
- private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2";
+ private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v3";
private static final String SERVERS = "localhost:9092";
private static final String TOPIC_NAME = AvroBeanExample.class.getSimpleName();
private static final String SUBJECT_NAME = "Greeting";
@@ -143,7 +143,7 @@ private static Producer