Skip to content

Commit

Permalink
Change serdes and converters defaults to use contentId and 4 byte ide…
Browse files Browse the repository at this point in the history
…ntifiers (#5149)
  • Loading branch information
carlesarnal authored Sep 17, 2024
1 parent d7b0834 commit 4be82c8
Show file tree
Hide file tree
Showing 28 changed files with 233 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void testSchema() throws Exception {
JsonSchemaKafkaDeserializer<Person> deserializer = new JsonSchemaKafkaDeserializer<>(clientV3,
true)) {

Map<String, String> configs = Map.of(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
Map<String, String> configs = Map.of(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId,
SerdeConfig.ENABLE_HEADERS, "true");
serializer.configure(configs, false);

deserializer.configure(configs, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.function.Supplier;

import static io.apicurio.registry.utils.tests.TestUtils.waitForSchema;
import static io.apicurio.registry.utils.tests.TestUtils.waitForSchemaLongId;
import static org.junit.jupiter.api.Assertions.assertEquals;

@QuarkusTest
Expand Down Expand Up @@ -168,7 +169,6 @@ private void testAvroAutoRegisterIdInBody(
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, strategy);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ENABLE_HEADERS, "false");
serializer.configure(config, false);

config = new HashMap<>();
Expand All @@ -182,11 +182,12 @@ private void testAvroAutoRegisterIdInBody(
byte[] bytes = serializer.serialize(topic, record);

// some impl details ...
waitForSchema(globalId -> {
waitForSchema(contentId -> {
try {
if (restClient.ids().globalIds().byGlobalId(globalId).get().readAllBytes().length > 0) {
if (restClient.ids().contentIds().byContentId(contentId.longValue()).get()
.readAllBytes().length > 0) {
VersionMetaData artifactMetadata = artifactFinder.get();
assertEquals(globalId, artifactMetadata.getGlobalId());
assertEquals(contentId.longValue(), artifactMetadata.getContentId());
return true;
}
} catch (IOException e) {
Expand Down Expand Up @@ -214,7 +215,6 @@ public void testAvroJSON() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ENABLE_HEADERS, "false");
serializer.configure(config, false);

config = new HashMap<>();
Expand All @@ -228,14 +228,15 @@ public void testAvroJSON() throws Exception {

byte[] bytes = serializer.serialize(artifactId, record);

// Test msg is stored as json, take 1st 9 bytes off (magic byte and long)
JSONObject msgAsJson = new JSONObject(new String(Arrays.copyOfRange(bytes, 9, bytes.length)));
// Test msg is stored as json, take 1st 5 bytes off (magic byte and long)
JSONObject msgAsJson = new JSONObject(new String(Arrays.copyOfRange(bytes, 5, bytes.length)));
Assertions.assertEquals("somebar", msgAsJson.getString("bar"));

// some impl details ...
waitForSchema(globalId -> {
waitForSchema(contentId -> {
try {
return restClient.ids().globalIds().byGlobalId(globalId).get().readAllBytes().length > 0;
return restClient.ids().contentIds().byContentId(contentId.longValue()).get()
.readAllBytes().length > 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -256,7 +257,6 @@ public void avroJsonWithReferences() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ENABLE_HEADERS, "false");
serializer.configure(config, false);

config = new HashMap<>();
Expand Down Expand Up @@ -298,14 +298,15 @@ public void avroJsonWithReferences() throws Exception {

byte[] bytes = serializer.serialize(artifactId, avroSchemaB);

// Test msg is stored as json, take 1st 9 bytes off (magic byte and long)
JSONObject msgAsJson = new JSONObject(new String(Arrays.copyOfRange(bytes, 9, bytes.length)));
// Test msg is stored as json, take 1st 5 bytes off (magic byte and long)
JSONObject msgAsJson = new JSONObject(new String(Arrays.copyOfRange(bytes, 5, bytes.length)));
Assertions.assertEquals("CSymbol", msgAsJson.getJSONObject("schemaC").getString("symbol"));

// some impl details ...
waitForSchema(globalId -> {
waitForSchema(contentId -> {
try {
return restClient.ids().globalIds().byGlobalId(globalId).get().readAllBytes().length > 0;
return restClient.ids().contentIds().byContentId(contentId.longValue()).get()
.readAllBytes().length > 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -331,7 +332,6 @@ public void avroJsonWithReferencesDereferenced() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ENABLE_HEADERS, "false");
config.put(SchemaResolverConfig.DEREFERENCE_SCHEMA, "true");
serializer.configure(config, false);

Expand Down Expand Up @@ -375,14 +375,15 @@ public void avroJsonWithReferencesDereferenced() throws Exception {

byte[] bytes = serializer.serialize(artifactId, avroSchemaB);

// Test msg is stored as json, take 1st 9 bytes off (magic byte and long)
JSONObject msgAsJson = new JSONObject(new String(Arrays.copyOfRange(bytes, 9, bytes.length)));
// Test msg is stored as json, take 1st 5 bytes off (magic byte and long)
JSONObject msgAsJson = new JSONObject(new String(Arrays.copyOfRange(bytes, 5, bytes.length)));
Assertions.assertEquals("CSymbol", msgAsJson.getJSONObject("schemaC").getString("symbol"));

// some impl details ...
waitForSchema(globalId -> {
waitForSchema(contentId -> {
try {
return restClient.ids().globalIds().byGlobalId(globalId).get().readAllBytes().length > 0;
return restClient.ids().contentIds().byContentId(contentId.longValue()).get()
.readAllBytes().length > 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -408,7 +409,6 @@ public void avroJsonWithReferencesDeserializerDereferenced() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ENABLE_HEADERS, "false");
serializer.configure(config, false);

config = new HashMap<>();
Expand Down Expand Up @@ -452,13 +452,14 @@ public void avroJsonWithReferencesDeserializerDereferenced() throws Exception {

byte[] bytes = serializer.serialize(artifactId, avroSchemaB);

// Test msg is stored as json, take 1st 9 bytes off (magic byte and long)
JSONObject msgAsJson = new JSONObject(new String(Arrays.copyOfRange(bytes, 9, bytes.length)));
// Test msg is stored as json, take 1st 5 bytes off (magic byte and long)
JSONObject msgAsJson = new JSONObject(new String(Arrays.copyOfRange(bytes, 5, bytes.length)));
Assertions.assertEquals("CSymbol", msgAsJson.getJSONObject("schemaC").getString("symbol"));

waitForSchema(globalId -> {
waitForSchema(contentId -> {
try {
return restClient.ids().globalIds().byGlobalId(globalId).get().readAllBytes().length > 0;
return restClient.ids().contentIds().byContentId(contentId.longValue()).get()
.readAllBytes().length > 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -481,7 +482,6 @@ public void issue4463Test() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(AvroKafkaSerdeConfig.AVRO_ENCODING, AvroKafkaSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ENABLE_HEADERS, "false");
serializer.configure(config, false);

config = new HashMap<>();
Expand All @@ -503,7 +503,8 @@ public void issue4463Test() throws Exception {

waitForSchema(id -> {
try {
return restClient.ids().globalIds().byGlobalId(id).get().readAllBytes().length > 0;
return restClient.ids().contentIds().byContentId(id.longValue()).get()
.readAllBytes().length > 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -540,8 +541,8 @@ public void testAvroUsingHeaders() throws Exception {
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, record);

Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_GLOBAL_ID));
headers.lastHeader(SerdeHeaders.HEADER_VALUE_GLOBAL_ID);
Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID));
headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID);

GenericData.Record ir = deserializer.deserialize(artifactId, headers, bytes);

Expand Down Expand Up @@ -578,17 +579,18 @@ public void testReferenceRaw() throws Exception {
Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, record);

Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_GLOBAL_ID));
Header globalId = headers.lastHeader(SerdeHeaders.HEADER_VALUE_GLOBAL_ID);
long globalIdkey = ByteBuffer.wrap(globalId.value()).getLong();
Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID));
Header contentId = headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID);
long contentIdKey = ByteBuffer.wrap(contentId.value()).getLong();

waitForSchema(id -> {
waitForSchemaLongId(id -> {
try {
return restClient.ids().globalIds().byGlobalId(id).get().readAllBytes().length > 0;
return restClient.ids().contentIds().byContentId(contentIdKey).get()
.readAllBytes().length > 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
}, bytes, byteBuffer -> globalIdkey);
}, bytes, byteBuffer -> contentIdKey);

GenericData.EnumSymbol ir = deserializer.deserialize(artifactId, headers, bytes);

Expand Down Expand Up @@ -623,7 +625,6 @@ private void testAvroReflect(Class<?> artifactResolverStrategyClass, Class<?> da

Map<String, String> config = new HashMap<>();
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.ENABLE_HEADERS, "false");
config.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, datumProvider.getName());
config.put(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY,
artifactResolverStrategyClass.getName());
Expand All @@ -638,9 +639,10 @@ private void testAvroReflect(Class<?> artifactResolverStrategyClass, Class<?> da
Tester tester = testerFactory.get();
byte[] bytes = serializer.serialize(artifactId, tester);

waitForSchema(globalId -> {
waitForSchema(contentId -> {
try {
return restClient.ids().globalIds().byGlobalId(globalId).get().readAllBytes().length > 0;
return restClient.ids().contentIds().byContentId(contentId.longValue()).get()
.readAllBytes().length > 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -677,14 +679,14 @@ public void testSerdeMix() throws Exception {

TestUtils.retry(() -> TestUtils.waitForSchema(contentId -> {
try {
return restClient.ids().contentIds().byContentId(contentId).get()
return restClient.ids().contentIds().byContentId(contentId.longValue()).get()
.readAllBytes().length > 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
}, bytes, bb -> (long) bb.getInt()));
}, bytes, ByteBuffer::getInt));

deserializer1.asLegacyId();
deserializer1.as4ByteId();
Map<String, String> config = new HashMap<>();
config.put(SerdeConfig.USE_ID, IdOption.contentId.name());
deserializer1.configure(config, false);
Expand All @@ -699,7 +701,7 @@ public void testSerdeMix() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(SerdeConfig.USE_ID, IdOption.contentId.name());

serializer2.asLegacyId();
serializer2.as4ByteId();
serializer2.configure(config, false);
byte[] bytes = serializer2.serialize(subject, record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.apicurio.registry.serde.SchemaResolverConfigurer;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.SerdeHeaders;
import io.apicurio.registry.serde.config.IdOption;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaParser;
Expand Down Expand Up @@ -87,13 +88,17 @@ public void testJsonSchemaSerde() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
config.put(SerdeConfig.ENABLE_HEADERS, "true");
serializer.configure(config, false);

deserializer.configure(Collections.emptyMap(), false);
deserializer.configure(Collections.singletonMap(SerdeConfig.ENABLE_HEADERS, "true"), false);

Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, person);

Assertions.assertNotNull(headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID));
headers.lastHeader(SerdeHeaders.HEADER_VALUE_CONTENT_ID);

person = deserializer.deserialize(artifactId, headers, bytes);

Assertions.assertEquals("Ales", person.getFirstName());
Expand Down Expand Up @@ -135,9 +140,10 @@ public void testJsonSchemaSerdeAutoRegister() throws Exception {
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
config.put(SerdeConfig.SCHEMA_LOCATION, "/io/apicurio/registry/util/json-schema.json");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
config.put(SerdeConfig.ENABLE_HEADERS, "true");
serializer.configure(config, false);

deserializer.configure(Collections.emptyMap(), false);
deserializer.configure(Collections.singletonMap(SerdeConfig.ENABLE_HEADERS, "true"), false);

Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, person);
Expand Down Expand Up @@ -188,9 +194,14 @@ public void testJsonSchemaSerdeHeaders() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
config.put(SerdeConfig.ENABLE_HEADERS, "true");
config.put(SerdeConfig.USE_ID, IdOption.globalId.name());

serializer.configure(config, false);

deserializer.configure(Collections.emptyMap(), false);
deserializer.configure(
Map.of(SerdeConfig.ENABLE_HEADERS, "true", SerdeConfig.USE_ID, IdOption.globalId.name()),
false);

Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, person);
Expand Down Expand Up @@ -223,8 +234,8 @@ public void testJsonSchemaSerdeMagicByte() throws Exception {
String groupId = TestUtils.generateGroupId();
String artifactId = generateArtifactId();

Long globalId = createArtifact(groupId, artifactId, ArtifactType.JSON, IoUtil.toString(jsonSchema),
ContentTypes.APPLICATION_JSON).getVersion().getGlobalId();
Long contentId = createArtifact(groupId, artifactId, ArtifactType.JSON, IoUtil.toString(jsonSchema),
ContentTypes.APPLICATION_JSON).getVersion().getContentId();

Person person = new Person("Ales", "Justin", 23);

Expand All @@ -234,14 +245,15 @@ public void testJsonSchemaSerdeMagicByte() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
config.put(SerdeConfig.ENABLE_HEADERS, "true");
serializer.configure(config, false);

deserializer.configure(Collections.emptyMap(), false);
deserializer.configure(Collections.singletonMap(SerdeConfig.ENABLE_HEADERS, "true"), false);

byte[] bytes = serializer.serialize(artifactId, person);

TestUtils.waitForSchema(schemaGlobalId -> {
assertEquals(globalId.intValue(), schemaGlobalId.intValue());
TestUtils.waitForSchema(schemaContentId -> {
assertEquals(contentId.intValue(), schemaContentId.intValue());
return true;
}, bytes);

Expand Down Expand Up @@ -330,9 +342,10 @@ public void testJsonSchemaSerdeWithReferences() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
config.put(SerdeConfig.ENABLE_HEADERS, "true");
serializer.configure(config, false);

deserializer.configure(Collections.emptyMap(), false);
deserializer.configure(Collections.singletonMap(SerdeConfig.ENABLE_HEADERS, "true"), false);

Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, citizen);
Expand Down
Loading

0 comments on commit 4be82c8

Please sign in to comment.