From 5d756a9f399e9b6bb7093b023d919c927b634b2d Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Thu, 10 Oct 2024 08:44:15 +0200 Subject: [PATCH] Add artifact created event based on the outbox pattern (#5274) * Add artifact created event based on the outbox pattern * Fire artifact create event inside the db transaction * Add artifact metadata update event and artifact delete event * Initial implementation for kafkasql events * Improve test assertions * Add group events * Only create the event in the table if the database supports events * Implement artifact version events * Add rules events * Update events test with rules tests for modifications and deletions * Add documentation to properties * Remove property that enables events * Ensure that any event is sent only once --- .../registry/events/ArtifactCreated.java | 41 + .../registry/events/ArtifactDeleted.java | 37 + .../events/ArtifactMetadataUpdated.java | 41 + .../events/ArtifactRuleConfigured.java | 40 + .../events/ArtifactVersionCreated.java | 41 + .../events/ArtifactVersionDeleted.java | 37 + .../ArtifactVersionMetadataUpdated.java | 41 + .../registry/events/GlobalRuleConfigured.java | 38 + .../registry/events/GroupCreated.java | 37 + .../registry/events/GroupDeleted.java | 35 + .../registry/events/GroupMetadataUpdated.java | 37 + .../registry/events/GroupRuleConfigured.java | 38 + .../registry/rest/v3/GroupsResourceImpl.java | 26 +- .../registry/storage/RegistryStorage.java | 38 +- .../registry/storage/StorageEventType.java | 4 +- .../ReadOnlyRegistryStorageDecorator.java | 6 + .../RegistryStorageDecoratorBase.java | 18 +- .../RegistryStorageDecoratorReadOnlyBase.java | 5 + .../registry/storage/dto/AggregateType.java | 6 + .../registry/storage/dto/OutboxEvent.java | 26 + .../impl/gitops/GitOpsRegistryStorage.java | 29 +- .../impl/kafkasql/KafkaSqlConfiguration.java | 2 + .../kafkasql/KafkaSqlEventsProcessor.java | 30 + .../impl/kafkasql/KafkaSqlFactory.java | 41 +- .../impl/kafkasql/KafkaSqlOutboxEvent.java | 20 + .../kafkasql/KafkaSqlRegistryStorage.java | 85 +- .../impl/sql/AbstractSqlRegistryStorage.java | 163 +++- .../storage/impl/sql/CommonSqlStatements.java | 16 + .../storage/impl/sql/SqlEventsProcessor.java | 20 + .../storage/impl/sql/SqlOutboxEvent.java | 20 + .../storage/impl/sql/SqlStatements.java | 3 + app/src/main/resources/application.properties | 6 +- .../registry/storage/impl/sql/db-version | 2 +- .../apicurio/registry/storage/impl/sql/h2.ddl | 2 +- .../registry/storage/impl/sql/mssql.ddl | 5 +- .../registry/storage/impl/sql/postgresql.ddl | 5 +- .../impl/sql/upgrades/101/h2.upgrade.ddl | 4 + .../impl/sql/upgrades/101/mssql.upgrade.ddl | 7 + .../sql/upgrades/101/postgresql.upgrade.ddl | 7 + .../registry/AbstractResourceTestBase.java | 59 ++ .../event/kafkasql/KafkaSqlEventsTest.java | 25 + .../kafkasql/KafkaSqlEventsTestProfile.java | 22 + .../registry/event/sql/EventsTestProfile.java | 22 + .../event/sql/RegistryEventsTest.java | 724 ++++++++++++++++++ .../readonly/ReadOnlyRegistryStorageTest.java | 2 + .../events/dto/RegistryEventType.java | 29 - .../ref-registry-all-configs.adoc | 11 +- .../ref-registry-config-migration.adoc | 4 +- .../src/test/resources/infra/kafka/kafka.yml | 31 +- .../resources/infra/kafka/registry-kafka.yml | 2 +- pom.xml | 6 + utils/tests/pom.xml | 15 + .../tests/DebeziumContainerResource.java | 78 ++ .../tests/KafkaTestContainerManager.java | 1 - 54 files changed, 1920 insertions(+), 170 deletions(-) create mode 100644 app/src/main/java/io/apicurio/registry/events/ArtifactCreated.java create mode 100644 app/src/main/java/io/apicurio/registry/events/ArtifactDeleted.java create mode 100644 app/src/main/java/io/apicurio/registry/events/ArtifactMetadataUpdated.java create mode 100644 app/src/main/java/io/apicurio/registry/events/ArtifactRuleConfigured.java create mode 100644 app/src/main/java/io/apicurio/registry/events/ArtifactVersionCreated.java create mode 100644 app/src/main/java/io/apicurio/registry/events/ArtifactVersionDeleted.java create mode 100644 app/src/main/java/io/apicurio/registry/events/ArtifactVersionMetadataUpdated.java create mode 100644 app/src/main/java/io/apicurio/registry/events/GlobalRuleConfigured.java create mode 100644 app/src/main/java/io/apicurio/registry/events/GroupCreated.java create mode 100644 app/src/main/java/io/apicurio/registry/events/GroupDeleted.java create mode 100644 app/src/main/java/io/apicurio/registry/events/GroupMetadataUpdated.java create mode 100644 app/src/main/java/io/apicurio/registry/events/GroupRuleConfigured.java create mode 100644 app/src/main/java/io/apicurio/registry/storage/dto/AggregateType.java create mode 100644 app/src/main/java/io/apicurio/registry/storage/dto/OutboxEvent.java create mode 100644 app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlEventsProcessor.java create mode 100644 app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlOutboxEvent.java create mode 100644 app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlEventsProcessor.java create mode 100644 app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlOutboxEvent.java create mode 100644 app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/h2.upgrade.ddl create mode 100644 app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/mssql.upgrade.ddl create mode 100644 app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/postgresql.upgrade.ddl create mode 100644 app/src/test/java/io/apicurio/registry/event/kafkasql/KafkaSqlEventsTest.java create mode 100644 app/src/test/java/io/apicurio/registry/event/kafkasql/KafkaSqlEventsTestProfile.java create mode 100644 app/src/test/java/io/apicurio/registry/event/sql/EventsTestProfile.java create mode 100644 app/src/test/java/io/apicurio/registry/event/sql/RegistryEventsTest.java delete mode 100644 common/src/main/java/io/apicurio/registry/events/dto/RegistryEventType.java create mode 100644 utils/tests/src/main/java/io/apicurio/registry/utils/tests/DebeziumContainerResource.java diff --git a/app/src/main/java/io/apicurio/registry/events/ArtifactCreated.java b/app/src/main/java/io/apicurio/registry/events/ArtifactCreated.java new file mode 100644 index 0000000000..c0132035e4 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/ArtifactCreated.java @@ -0,0 +1,41 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.ArtifactMetaDataDto; +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.ARTIFACT_CREATED; + +public class ArtifactCreated extends OutboxEvent { + private final JSONObject eventPayload; + + private ArtifactCreated(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static ArtifactCreated of(ArtifactMetaDataDto artifactMetaDataDto) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", artifactMetaDataDto.getGroupId()) + .put("artifactId", artifactMetaDataDto.getArtifactId()) + .put("name", artifactMetaDataDto.getName()) + .put("description", artifactMetaDataDto.getDescription()) + .put("eventType", ARTIFACT_CREATED.name()); + + return new ArtifactCreated(id, + artifactMetaDataDto.getGroupId() + "-" + artifactMetaDataDto.getArtifactId(), jsonObject); + } + + @Override + public String getType() { + return ARTIFACT_CREATED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/ArtifactDeleted.java b/app/src/main/java/io/apicurio/registry/events/ArtifactDeleted.java new file mode 100644 index 0000000000..6f3931ed69 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/ArtifactDeleted.java @@ -0,0 +1,37 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.ARTIFACT_DELETED; + +public class ArtifactDeleted extends OutboxEvent { + + private final JSONObject eventPayload; + + private ArtifactDeleted(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static ArtifactDeleted of(String groupId, String artifactId) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupId).put("artifactId", artifactId).put("eventType", + ARTIFACT_DELETED.name()); + + return new ArtifactDeleted(id, groupId + "-" + artifactId, jsonObject); + } + + @Override + public String getType() { + return ARTIFACT_DELETED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} diff --git a/app/src/main/java/io/apicurio/registry/events/ArtifactMetadataUpdated.java b/app/src/main/java/io/apicurio/registry/events/ArtifactMetadataUpdated.java new file mode 100644 index 0000000000..93e68d925d --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/ArtifactMetadataUpdated.java @@ -0,0 +1,41 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto; +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.ARTIFACT_METADATA_UPDATED; + +public class ArtifactMetadataUpdated extends OutboxEvent { + + private final JSONObject eventPayload; + + private ArtifactMetadataUpdated(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static ArtifactMetadataUpdated of(String groupId, String artifactId, + EditableArtifactMetaDataDto artifactMetaDataDto) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupId).put("artifactId", artifactId) + .put("name", artifactMetaDataDto.getName()).put("owner", artifactMetaDataDto.getOwner()) + .put("description", artifactMetaDataDto.getDescription()) + .put("eventType", ARTIFACT_METADATA_UPDATED.name()); + + return new ArtifactMetadataUpdated(id, groupId + "-" + artifactId, jsonObject); + } + + @Override + public String getType() { + return ARTIFACT_METADATA_UPDATED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/ArtifactRuleConfigured.java b/app/src/main/java/io/apicurio/registry/events/ArtifactRuleConfigured.java new file mode 100644 index 0000000000..63443a6a6b --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/ArtifactRuleConfigured.java @@ -0,0 +1,40 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.OutboxEvent; +import io.apicurio.registry.storage.dto.RuleConfigurationDto; +import io.apicurio.registry.types.RuleType; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.ARTIFACT_RULE_CONFIGURED; + +public class ArtifactRuleConfigured extends OutboxEvent { + private final JSONObject eventPayload; + + private ArtifactRuleConfigured(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static ArtifactRuleConfigured of(String groupId, String artifactId, RuleType ruleType, + RuleConfigurationDto rule) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupId).put("artifactId", artifactId) + .put("ruleType", ruleType.value()).put("rule", rule.getConfiguration()) + .put("eventType", ARTIFACT_RULE_CONFIGURED.name()); + + return new ArtifactRuleConfigured(id, groupId + "-" + artifactId, jsonObject); + } + + @Override + public String getType() { + return ARTIFACT_RULE_CONFIGURED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/ArtifactVersionCreated.java b/app/src/main/java/io/apicurio/registry/events/ArtifactVersionCreated.java new file mode 100644 index 0000000000..b861e1f044 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/ArtifactVersionCreated.java @@ -0,0 +1,41 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto; +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.ARTIFACT_VERSION_CREATED; + +public class ArtifactVersionCreated extends OutboxEvent { + private final JSONObject eventPayload; + + private ArtifactVersionCreated(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static ArtifactVersionCreated of(ArtifactVersionMetaDataDto versionMetaDataDto) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", versionMetaDataDto.getGroupId()) + .put("artifactId", versionMetaDataDto.getArtifactId()) + .put("version", versionMetaDataDto.getVersion()).put("name", versionMetaDataDto.getName()) + .put("description", versionMetaDataDto.getDescription()) + .put("eventType", ARTIFACT_VERSION_CREATED.name()); + + return new ArtifactVersionCreated(id, versionMetaDataDto.getGroupId() + "-" + + versionMetaDataDto.getArtifactId() + "-" + versionMetaDataDto.getVersion(), jsonObject); + } + + @Override + public String getType() { + return ARTIFACT_VERSION_CREATED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/ArtifactVersionDeleted.java b/app/src/main/java/io/apicurio/registry/events/ArtifactVersionDeleted.java new file mode 100644 index 0000000000..304476e82c --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/ArtifactVersionDeleted.java @@ -0,0 +1,37 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.ARTIFACT_VERSION_DELETED; + +public class ArtifactVersionDeleted extends OutboxEvent { + + private final JSONObject eventPayload; + + private ArtifactVersionDeleted(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static ArtifactVersionDeleted of(String groupId, String artifactId, String version) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupId).put("artifactId", artifactId).put("version", version) + .put("eventType", ARTIFACT_VERSION_DELETED.name()); + + return new ArtifactVersionDeleted(id, groupId + "-" + artifactId + "-" + version, jsonObject); + } + + @Override + public String getType() { + return ARTIFACT_VERSION_DELETED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} diff --git a/app/src/main/java/io/apicurio/registry/events/ArtifactVersionMetadataUpdated.java b/app/src/main/java/io/apicurio/registry/events/ArtifactVersionMetadataUpdated.java new file mode 100644 index 0000000000..ee27ef3d0c --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/ArtifactVersionMetadataUpdated.java @@ -0,0 +1,41 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.EditableVersionMetaDataDto; +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.ARTIFACT_VERSION_METADATA_UPDATED; + +public class ArtifactVersionMetadataUpdated extends OutboxEvent { + + private final JSONObject eventPayload; + + private ArtifactVersionMetadataUpdated(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static ArtifactVersionMetadataUpdated of(String groupId, String artifactId, String version, + EditableVersionMetaDataDto editableVersionMetaDataDto) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupId).put("artifactId", artifactId).put("version", version) + .put("name", editableVersionMetaDataDto.getName()) + .put("description", editableVersionMetaDataDto.getDescription()) + .put("eventType", ARTIFACT_VERSION_METADATA_UPDATED.name()); + + return new ArtifactVersionMetadataUpdated(id, groupId + "-" + artifactId + "-" + version, jsonObject); + } + + @Override + public String getType() { + return ARTIFACT_VERSION_METADATA_UPDATED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/GlobalRuleConfigured.java b/app/src/main/java/io/apicurio/registry/events/GlobalRuleConfigured.java new file mode 100644 index 0000000000..e5a59a7150 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/GlobalRuleConfigured.java @@ -0,0 +1,38 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.OutboxEvent; +import io.apicurio.registry.storage.dto.RuleConfigurationDto; +import io.apicurio.registry.types.RuleType; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.GLOBAL_RULE_CONFIGURED; + +public class GlobalRuleConfigured extends OutboxEvent { + private final JSONObject eventPayload; + + private GlobalRuleConfigured(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static GlobalRuleConfigured of(RuleType ruleType, RuleConfigurationDto rule) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("ruleType", ruleType.value()).put("rule", rule.getConfiguration()) + .put("eventType", GLOBAL_RULE_CONFIGURED.name()); + + return new GlobalRuleConfigured(id, ruleType.value(), jsonObject); + } + + @Override + public String getType() { + return GLOBAL_RULE_CONFIGURED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/GroupCreated.java b/app/src/main/java/io/apicurio/registry/events/GroupCreated.java new file mode 100644 index 0000000000..d4992236d5 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/GroupCreated.java @@ -0,0 +1,37 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.GroupMetaDataDto; +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.GROUP_CREATED; + +public class GroupCreated extends OutboxEvent { + private final JSONObject eventPayload; + + private GroupCreated(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static GroupCreated of(GroupMetaDataDto groupMetaDataDto) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupMetaDataDto.getGroupId()).put("eventType", + GROUP_CREATED.name()); + + return new GroupCreated(id, groupMetaDataDto.getGroupId(), jsonObject); + } + + @Override + public String getType() { + return GROUP_CREATED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/GroupDeleted.java b/app/src/main/java/io/apicurio/registry/events/GroupDeleted.java new file mode 100644 index 0000000000..31c41a0967 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/GroupDeleted.java @@ -0,0 +1,35 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.GROUP_DELETED; + +public class GroupDeleted extends OutboxEvent { + private final JSONObject eventPayload; + + private GroupDeleted(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static GroupDeleted of(String groupId) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupId).put("eventType", GROUP_DELETED.name()); + + return new GroupDeleted(id, groupId, jsonObject); + } + + @Override + public String getType() { + return GROUP_DELETED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/GroupMetadataUpdated.java b/app/src/main/java/io/apicurio/registry/events/GroupMetadataUpdated.java new file mode 100644 index 0000000000..75aefbb316 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/GroupMetadataUpdated.java @@ -0,0 +1,37 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.EditableGroupMetaDataDto; +import io.apicurio.registry.storage.dto.OutboxEvent; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.GROUP_METADATA_UPDATED; + +public class GroupMetadataUpdated extends OutboxEvent { + private final JSONObject eventPayload; + + private GroupMetadataUpdated(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static GroupMetadataUpdated of(String groupId, EditableGroupMetaDataDto groupMetaDataDto) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupId).put("description", groupMetaDataDto.getDescription()) + .put("eventType", GROUP_METADATA_UPDATED.name()); + + return new GroupMetadataUpdated(id, groupId, jsonObject); + } + + @Override + public String getType() { + return GROUP_METADATA_UPDATED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/events/GroupRuleConfigured.java b/app/src/main/java/io/apicurio/registry/events/GroupRuleConfigured.java new file mode 100644 index 0000000000..d26b0466e2 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/events/GroupRuleConfigured.java @@ -0,0 +1,38 @@ +package io.apicurio.registry.events; + +import io.apicurio.registry.storage.dto.OutboxEvent; +import io.apicurio.registry.storage.dto.RuleConfigurationDto; +import io.apicurio.registry.types.RuleType; +import org.json.JSONObject; + +import java.util.UUID; + +import static io.apicurio.registry.storage.StorageEventType.GROUP_RULE_CONFIGURED; + +public class GroupRuleConfigured extends OutboxEvent { + private final JSONObject eventPayload; + + private GroupRuleConfigured(String id, String aggregateId, JSONObject eventPayload) { + super(id, aggregateId); + this.eventPayload = eventPayload; + } + + public static GroupRuleConfigured of(String groupId, RuleType ruleType, RuleConfigurationDto rule) { + String id = UUID.randomUUID().toString(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id", id).put("groupId", groupId).put("ruleType", ruleType.value()) + .put("rule", rule.getConfiguration()).put("eventType", GROUP_RULE_CONFIGURED.name()); + + return new GroupRuleConfigured(id, groupId, jsonObject); + } + + @Override + public String getType() { + return GROUP_RULE_CONFIGURED.name(); + } + + @Override + public JSONObject getPayload() { + return eventPayload; + } +} \ No newline at end of file diff --git a/app/src/main/java/io/apicurio/registry/rest/v3/GroupsResourceImpl.java b/app/src/main/java/io/apicurio/registry/rest/v3/GroupsResourceImpl.java index fbacea92be..203728a9cf 100644 --- a/app/src/main/java/io/apicurio/registry/rest/v3/GroupsResourceImpl.java +++ b/app/src/main/java/io/apicurio/registry/rest/v3/GroupsResourceImpl.java @@ -47,30 +47,10 @@ import io.apicurio.registry.rest.v3.beans.VersionMetaData; import io.apicurio.registry.rest.v3.beans.VersionSearchResults; import io.apicurio.registry.rest.v3.beans.VersionSortBy; -import io.apicurio.registry.rest.v3.shared.CommonResourceOperations; import io.apicurio.registry.rules.RuleApplicationType; import io.apicurio.registry.rules.RulesService; import io.apicurio.registry.storage.RegistryStorage.RetrievalBehavior; -import io.apicurio.registry.storage.dto.ArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.ArtifactReferenceDto; -import io.apicurio.registry.storage.dto.ArtifactSearchResultsDto; -import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto; -import io.apicurio.registry.storage.dto.BranchMetaDataDto; -import io.apicurio.registry.storage.dto.BranchSearchResultsDto; -import io.apicurio.registry.storage.dto.CommentDto; -import io.apicurio.registry.storage.dto.ContentWrapperDto; -import io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.EditableBranchMetaDataDto; -import io.apicurio.registry.storage.dto.EditableGroupMetaDataDto; -import io.apicurio.registry.storage.dto.EditableVersionMetaDataDto; -import io.apicurio.registry.storage.dto.GroupMetaDataDto; -import io.apicurio.registry.storage.dto.GroupSearchResultsDto; -import io.apicurio.registry.storage.dto.OrderBy; -import io.apicurio.registry.storage.dto.OrderDirection; -import io.apicurio.registry.storage.dto.RuleConfigurationDto; -import io.apicurio.registry.storage.dto.SearchFilter; -import io.apicurio.registry.storage.dto.StoredArtifactVersionDto; -import io.apicurio.registry.storage.dto.VersionSearchResultsDto; +import io.apicurio.registry.storage.dto.*; import io.apicurio.registry.storage.error.ArtifactAlreadyExistsException; import io.apicurio.registry.storage.error.ArtifactNotFoundException; import io.apicurio.registry.storage.error.GroupNotFoundException; @@ -145,9 +125,6 @@ public class GroupsResourceImpl extends AbstractResourceImpl implements GroupsRe @Inject SecurityIdentity securityIdentity; - @Inject - CommonResourceOperations common; - public enum RegistryHashAlgorithm { SHA256, MD5 } @@ -836,6 +813,7 @@ public CreateArtifactResponse createArtifact(String groupId, IfArtifactExists if if (storageResult.getRight() != null) { rval.setVersion(V3ApiUtil.dtoToVersionMetaData(storageResult.getRight())); } + return rval; } catch (ArtifactAlreadyExistsException ex) { return handleIfExists(groupId, artifactId, ifExists, data.getFirstVersion(), fcanonical); diff --git a/app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java index 1a50d5f334..80b6b72499 100644 --- a/app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java @@ -7,29 +7,7 @@ import io.apicurio.registry.model.GA; import io.apicurio.registry.model.GAV; import io.apicurio.registry.model.VersionId; -import io.apicurio.registry.storage.dto.ArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.ArtifactReferenceDto; -import io.apicurio.registry.storage.dto.ArtifactSearchResultsDto; -import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto; -import io.apicurio.registry.storage.dto.BranchMetaDataDto; -import io.apicurio.registry.storage.dto.BranchSearchResultsDto; -import io.apicurio.registry.storage.dto.CommentDto; -import io.apicurio.registry.storage.dto.ContentWrapperDto; -import io.apicurio.registry.storage.dto.DownloadContextDto; -import io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.EditableBranchMetaDataDto; -import io.apicurio.registry.storage.dto.EditableGroupMetaDataDto; -import io.apicurio.registry.storage.dto.EditableVersionMetaDataDto; -import io.apicurio.registry.storage.dto.GroupMetaDataDto; -import io.apicurio.registry.storage.dto.GroupSearchResultsDto; -import io.apicurio.registry.storage.dto.OrderBy; -import io.apicurio.registry.storage.dto.OrderDirection; -import io.apicurio.registry.storage.dto.RoleMappingDto; -import io.apicurio.registry.storage.dto.RoleMappingSearchResultsDto; -import io.apicurio.registry.storage.dto.RuleConfigurationDto; -import io.apicurio.registry.storage.dto.SearchFilter; -import io.apicurio.registry.storage.dto.StoredArtifactVersionDto; -import io.apicurio.registry.storage.dto.VersionSearchResultsDto; +import io.apicurio.registry.storage.dto.*; import io.apicurio.registry.storage.error.ArtifactAlreadyExistsException; import io.apicurio.registry.storage.error.ArtifactNotFoundException; import io.apicurio.registry.storage.error.ContentNotFoundException; @@ -962,6 +940,20 @@ boolean isArtifactRuleExists(String groupId, String artifactId, RuleType rule) */ String createSnapshot(String snapshotLocation) throws RegistryStorageException; + /** + * Creates a new event row in the outbox table. + * + * @throws RegistryStorageException + */ + String createEvent(OutboxEvent event); + + /** + * true if the underlying Registry storage supports emitting events to the database. + * + * @throws RegistryStorageException + */ + boolean supportsDatabaseEvents(); + enum RetrievalBehavior { DEFAULT, /** diff --git a/app/src/main/java/io/apicurio/registry/storage/StorageEventType.java b/app/src/main/java/io/apicurio/registry/storage/StorageEventType.java index 49b7486a54..7e58c09315 100644 --- a/app/src/main/java/io/apicurio/registry/storage/StorageEventType.java +++ b/app/src/main/java/io/apicurio/registry/storage/StorageEventType.java @@ -3,7 +3,7 @@ public enum StorageEventType { /** - * This event type MUST be fired only once. + * The READY event type MUST be fired only once. */ - READY, + READY, ARTIFACT_CREATED, ARTIFACT_DELETED, ARTIFACT_METADATA_UPDATED, GROUP_CREATED, GROUP_DELETED, GROUP_METADATA_UPDATED, ARTIFACT_VERSION_CREATED, ARTIFACT_VERSION_METADATA_UPDATED, ARTIFACT_VERSION_DELETED, GLOBAL_RULE_CONFIGURED, GROUP_RULE_CONFIGURED, ARTIFACT_RULE_CONFIGURED } diff --git a/app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java b/app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java index 4040f0d6ba..e944ffa8e2 100644 --- a/app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java +++ b/app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java @@ -459,6 +459,12 @@ public String createSnapshot(String snapshotLocation) throws RegistryStorageExce return delegate.createSnapshot(snapshotLocation); } + @Override + public String createEvent(OutboxEvent event) { + checkReadOnly(); + return delegate.createEvent(event); + } + @Override public ContentWrapperDto getContentByReference(ArtifactReferenceDto reference) { checkReadOnly(); diff --git a/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java b/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java index 5762b4c809..dad4f4bf3b 100644 --- a/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java +++ b/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java @@ -4,18 +4,7 @@ import io.apicurio.registry.model.BranchId; import io.apicurio.registry.model.GA; import io.apicurio.registry.model.VersionId; -import io.apicurio.registry.storage.dto.ArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto; -import io.apicurio.registry.storage.dto.BranchMetaDataDto; -import io.apicurio.registry.storage.dto.CommentDto; -import io.apicurio.registry.storage.dto.ContentWrapperDto; -import io.apicurio.registry.storage.dto.DownloadContextDto; -import io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.EditableBranchMetaDataDto; -import io.apicurio.registry.storage.dto.EditableGroupMetaDataDto; -import io.apicurio.registry.storage.dto.EditableVersionMetaDataDto; -import io.apicurio.registry.storage.dto.GroupMetaDataDto; -import io.apicurio.registry.storage.dto.RuleConfigurationDto; +import io.apicurio.registry.storage.dto.*; import io.apicurio.registry.storage.error.ArtifactNotFoundException; import io.apicurio.registry.storage.error.GroupAlreadyExistsException; import io.apicurio.registry.storage.error.GroupNotFoundException; @@ -372,4 +361,9 @@ public String triggerSnapshotCreation() throws RegistryStorageException { public String createSnapshot(String snapshotLocation) throws RegistryStorageException { return delegate.createSnapshot(snapshotLocation); } + + @Override + public String createEvent(OutboxEvent event) { + return delegate.createEvent(event); + } } diff --git a/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorReadOnlyBase.java b/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorReadOnlyBase.java index c981d522a2..c28bf6d58c 100644 --- a/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorReadOnlyBase.java +++ b/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorReadOnlyBase.java @@ -382,4 +382,9 @@ public BranchSearchResultsDto getBranches(GA ga, int offset, int limit) { public BranchMetaDataDto getBranchMetaData(GA ga, BranchId branchId) { return delegate.getBranchMetaData(ga, branchId); } + + @Override + public boolean supportsDatabaseEvents() { + return delegate.supportsDatabaseEvents(); + } } diff --git a/app/src/main/java/io/apicurio/registry/storage/dto/AggregateType.java b/app/src/main/java/io/apicurio/registry/storage/dto/AggregateType.java new file mode 100644 index 0000000000..4ee0d61d49 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/storage/dto/AggregateType.java @@ -0,0 +1,6 @@ +package io.apicurio.registry.storage.dto; + +public enum AggregateType { + + GROUP, ARTIFACT, VERSION, GROUP_RULE, ARTIFACT_RULE, GLOBAL_RULE +} diff --git a/app/src/main/java/io/apicurio/registry/storage/dto/OutboxEvent.java b/app/src/main/java/io/apicurio/registry/storage/dto/OutboxEvent.java new file mode 100644 index 0000000000..00d87f4f36 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/storage/dto/OutboxEvent.java @@ -0,0 +1,26 @@ +package io.apicurio.registry.storage.dto; + +import org.json.JSONObject; + +public abstract class OutboxEvent { + + private final String id; + private final String aggregateId; + + protected OutboxEvent(String id, String aggregateId) { + this.id = id; + this.aggregateId = aggregateId; + } + + public String getId() { + return this.id; + } + + public String getAggregateId() { + return aggregateId; + } + + public abstract JSONObject getPayload(); + + public abstract String getType(); +} diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/gitops/GitOpsRegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/impl/gitops/GitOpsRegistryStorage.java index ccdd964f8a..97408b01de 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/gitops/GitOpsRegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/gitops/GitOpsRegistryStorage.java @@ -10,24 +10,7 @@ import io.apicurio.registry.model.GA; import io.apicurio.registry.model.GAV; import io.apicurio.registry.storage.RegistryStorage; -import io.apicurio.registry.storage.dto.ArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.ArtifactReferenceDto; -import io.apicurio.registry.storage.dto.ArtifactSearchResultsDto; -import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto; -import io.apicurio.registry.storage.dto.BranchMetaDataDto; -import io.apicurio.registry.storage.dto.BranchSearchResultsDto; -import io.apicurio.registry.storage.dto.CommentDto; -import io.apicurio.registry.storage.dto.ContentWrapperDto; -import io.apicurio.registry.storage.dto.GroupMetaDataDto; -import io.apicurio.registry.storage.dto.GroupSearchResultsDto; -import io.apicurio.registry.storage.dto.OrderBy; -import io.apicurio.registry.storage.dto.OrderDirection; -import io.apicurio.registry.storage.dto.RoleMappingDto; -import io.apicurio.registry.storage.dto.RoleMappingSearchResultsDto; -import io.apicurio.registry.storage.dto.RuleConfigurationDto; -import io.apicurio.registry.storage.dto.SearchFilter; -import io.apicurio.registry.storage.dto.StoredArtifactVersionDto; -import io.apicurio.registry.storage.dto.VersionSearchResultsDto; +import io.apicurio.registry.storage.dto.*; import io.apicurio.registry.storage.error.RegistryStorageException; import io.apicurio.registry.storage.error.VersionNotFoundException; import io.apicurio.registry.storage.impl.gitops.sql.BlueSqlStorage; @@ -509,4 +492,14 @@ public String triggerSnapshotCreation() throws RegistryStorageException { public String createSnapshot(String snapshotLocation) throws RegistryStorageException { return proxy((storage -> storage.createSnapshot(snapshotLocation))); } + + @Override + public String createEvent(OutboxEvent event) { + return proxy((storage -> storage.createEvent(event))); + } + + @Override + public boolean supportsDatabaseEvents() { + return proxy((RegistryStorage::supportsDatabaseEvents)); + } } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlConfiguration.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlConfiguration.java index f31d3261af..b6d9828d38 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlConfiguration.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlConfiguration.java @@ -13,6 +13,8 @@ public interface KafkaSqlConfiguration { String snapshotLocation(); + String eventsTopic(); + Properties topicProperties(); boolean isTopicAutoCreate(); diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlEventsProcessor.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlEventsProcessor.java new file mode 100644 index 0000000000..c73d006b69 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlEventsProcessor.java @@ -0,0 +1,30 @@ +package io.apicurio.registry.storage.impl.kafkasql; + +import io.apicurio.registry.storage.dto.OutboxEvent; +import io.apicurio.registry.utils.ConcurrentUtil; +import io.apicurio.registry.utils.kafka.ProducerActions; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Collections; + +@ApplicationScoped +public class KafkaSqlEventsProcessor { + + @Inject + KafkaSqlConfiguration configuration; + + @Inject + @Named("KafkaSqlEventsProducer") + ProducerActions eventsProducer; + + public void processEvent(@Observes KafkaSqlOutboxEvent event) { + OutboxEvent outboxEvent = event.getOutboxEvent(); + ProducerRecord record = new ProducerRecord<>(configuration.eventsTopic(), 0, + outboxEvent.getAggregateId(), outboxEvent.getPayload().toString(), Collections.emptyList()); + ConcurrentUtil.get(eventsProducer.apply(record)); + } +} diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlFactory.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlFactory.java index 5f83017e7e..ee668db677 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlFactory.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlFactory.java @@ -41,19 +41,24 @@ public class KafkaSqlFactory { @Inject @ConfigProperty(name = "apicurio.kafkasql.snapshots.topic", defaultValue = "kafkasql-snapshots") - @Info(category = "storage", description = "Kafka sql storage topic name") + @Info(category = "storage", description = "Kafka sql storage topic name", registryAvailableSince = "3.0.0") String snapshotsTopic; @Inject @ConfigProperty(name = "apicurio.kafkasql.snapshot.every.seconds", defaultValue = "86400s") - @Info(category = "storage", description = "Kafka sql journal topic snapshot every") + @Info(category = "storage", description = "Kafka sql journal topic snapshot every", registryAvailableSince = "3.0.0") String snapshotEvery; @Inject @ConfigProperty(name = "apicurio.storage.snapshot.location", defaultValue = "./") - @Info(category = "storage", description = "Kafka sql snapshots store location") + @Info(category = "storage", description = "Kafka sql snapshots store location", registryAvailableSince = "3.0.0") String snapshotStoreLocation; + @Inject + @ConfigProperty(name = "apicurio.events.kafka.topic", defaultValue = "registry-events") + @Info(category = "storage", description = "Kafka sql storage event topic", registryAvailableSince = "3.0.1") + String eventsTopic; + @Inject @RegistryProperties(value = "apicurio.kafkasql.topic") @Info(category = "storage", description = "Kafka sql storage topic properties") @@ -165,6 +170,11 @@ public String snapshotsTopic() { return snapshotsTopic; } + @Override + public String eventsTopic() { + return eventsTopic; + } + @Override public String snapshotEvery() { return snapshotEvery; @@ -310,6 +320,31 @@ public KafkaConsumer createKafkaSnapshotsConsumer() { return new KafkaConsumer<>(props, keySerializer, valueSerializer); } + /** + * Creates the Kafka data producer. + */ + @ApplicationScoped + @Produces + @Named("KafkaSqlEventsProducer") + public ProducerActions createKafkaSqlEventsProducer() { + Properties props = (Properties) producerProperties.clone(); + + // Configure kafka settings + props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + UUID.randomUUID().toString()); + props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); + props.putIfAbsent(ProducerConfig.LINGER_MS_CONFIG, 10); + props.putIfAbsent(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaSqlPartitioner.class); + + tryToConfigureSecurity(props); + + StringSerializer keySerializer = new StringSerializer(); + StringSerializer valueSerializer = new StringSerializer(); + + // Create the Kafka producer + return new AsyncProducer<>(props, keySerializer, valueSerializer); + } + private void tryToConfigureSecurity(Properties props) { protocol.ifPresent(s -> props.putIfAbsent("security.protocol", s)); diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlOutboxEvent.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlOutboxEvent.java new file mode 100644 index 0000000000..a656071836 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlOutboxEvent.java @@ -0,0 +1,20 @@ +package io.apicurio.registry.storage.impl.kafkasql; + +import io.apicurio.registry.storage.dto.OutboxEvent; + +public class KafkaSqlOutboxEvent { + + private final OutboxEvent outboxEvent; + + private KafkaSqlOutboxEvent(OutboxEvent outboxEvent) { + this.outboxEvent = outboxEvent; + } + + public static KafkaSqlOutboxEvent of(OutboxEvent outboxEvent) { + return new KafkaSqlOutboxEvent(outboxEvent); + } + + public OutboxEvent getOutboxEvent() { + return outboxEvent; + } +} diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java index 9c7cb3b521..1758b37092 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java @@ -3,12 +3,16 @@ import io.apicurio.common.apps.config.DynamicConfigPropertyDto; import io.apicurio.common.apps.logging.Logged; import io.apicurio.registry.content.ContentHandle; +import io.apicurio.registry.events.*; import io.apicurio.registry.metrics.StorageMetricsApply; import io.apicurio.registry.metrics.health.liveness.PersistenceExceptionLivenessApply; import io.apicurio.registry.metrics.health.readiness.PersistenceTimeoutReadinessApply; import io.apicurio.registry.model.BranchId; import io.apicurio.registry.model.GA; import io.apicurio.registry.model.VersionId; +import io.apicurio.registry.rules.compatibility.CompatibilityLevel; +import io.apicurio.registry.rules.integrity.IntegrityLevel; +import io.apicurio.registry.rules.validity.ValidityLevel; import io.apicurio.registry.storage.RegistryStorage; import io.apicurio.registry.storage.StorageEvent; import io.apicurio.registry.storage.StorageEventType; @@ -110,6 +114,9 @@ public class KafkaSqlRegistryStorage extends RegistryStorageDecoratorReadOnlyBas @Inject Event storageEvent; + @Inject + Event outboxEvent; + private volatile boolean bootstrapped = false; private volatile boolean stopped = true; private volatile boolean snapshotProcessed = false; @@ -166,7 +173,9 @@ void onDestroy() { * Automatically create the Kafka topics. */ private void autoCreateTopics() { - Set topicNames = Set.of(configuration.topic(), configuration.snapshotsTopic()); + Set topicNames = Set.of(configuration.topic(), configuration.snapshotsTopic(), + configuration.eventsTopic()); + Map topicProperties = new HashMap<>(); configuration.topicProperties() .forEach((key, value) -> topicProperties.put(key.toString(), value.toString())); @@ -384,7 +393,17 @@ public Pair createArtifact(Stri var message = new CreateArtifact9Message(groupId, artifactId, artifactType, artifactMetaData, version, contentType, content, references, versionMetaData, versionBranches, dryRun); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); - return (Pair) coordinator.waitForResponse(uuid); + + Pair createdArtifact = (Pair) coordinator + .waitForResponse(uuid); + + outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactCreated.of(createdArtifact.getLeft()))); + + if (createdArtifact.getRight() != null) { + outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactVersionCreated.of(createdArtifact.getRight()))); + } + + return createdArtifact; } /** @@ -396,7 +415,9 @@ public List deleteArtifact(String groupId, String artifactId) throws ArtifactNotFoundException, RegistryStorageException { var message = new DeleteArtifact2Message(groupId, artifactId); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); - return (List) coordinator.waitForResponse(uuid); + List versions = (List) coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactDeleted.of(groupId, artifactId))); + return versions; } /** @@ -419,7 +440,10 @@ public ArtifactVersionMetaDataDto createArtifactVersion(String groupId, String a var message = new CreateArtifactVersion8Message(groupId, artifactId, version, artifactType, contentType, content, references, metaData, branches, dryRun); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); - return (ArtifactVersionMetaDataDto) coordinator.waitForResponse(uuid); + ArtifactVersionMetaDataDto versionMetaDataDto = (ArtifactVersionMetaDataDto) coordinator + .waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactVersionCreated.of(versionMetaDataDto))); + return versionMetaDataDto; } /** @@ -432,6 +456,7 @@ public void updateArtifactMetaData(String groupId, String artifactId, var message = new UpdateArtifactMetaData3Message(groupId, artifactId, metaData); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactMetadataUpdated.of(groupId, artifactId, metaData))); } @Override @@ -440,6 +465,8 @@ public void createArtifactRule(String groupId, String artifactId, RuleType rule, var message = new CreateArtifactRule4Message(groupId, artifactId, rule, config); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent + .fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule, config))); } /** @@ -466,6 +493,8 @@ public void updateArtifactRule(String groupId, String artifactId, RuleType rule, var message = new UpdateArtifactRule4Message(groupId, artifactId, rule, config); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent + .fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule, config))); } /** @@ -478,6 +507,18 @@ public void deleteArtifactRule(String groupId, String artifactId, RuleType rule) var message = new DeleteArtifactRule3Message(groupId, artifactId, rule); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + + switch (rule) { + case VALIDITY -> + outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule, + RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()))); + case COMPATIBILITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, + artifactId, rule, + RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build()))); + case INTEGRITY -> + outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule, + RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()))); + } } @Override @@ -486,6 +527,7 @@ public void createGroupRule(String groupId, RuleType rule, RuleConfigurationDto var message = new CreateGroupRule3Message(groupId, rule, config); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, config))); } @Override @@ -494,6 +536,7 @@ public void updateGroupRule(String groupId, RuleType rule, RuleConfigurationDto var message = new UpdateGroupRule3Message(groupId, rule, config); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, config))); } @Override @@ -501,6 +544,15 @@ public void deleteGroupRule(String groupId, RuleType rule) throws RegistryStorag var message = new DeleteGroupRule2Message(groupId, rule); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + switch (rule) { + case VALIDITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, + RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()))); + case COMPATIBILITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, + rule, + RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build()))); + case INTEGRITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, + RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()))); + } } @Override @@ -520,6 +572,7 @@ public void deleteArtifactVersion(String groupId, String artifactId, String vers var message = new DeleteArtifactVersion3Message(groupId, artifactId, version); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactVersionDeleted.of(groupId, artifactId, version))); } /** @@ -533,6 +586,8 @@ public void updateArtifactVersionMetaData(String groupId, String artifactId, Str var message = new UpdateArtifactVersionMetaData4Message(groupId, artifactId, version, metaData); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent + .of(ArtifactVersionMetadataUpdated.of(groupId, artifactId, version, metaData))); } /** @@ -545,6 +600,7 @@ public void createGlobalRule(RuleType rule, RuleConfigurationDto config) var message = new CreateGlobalRule2Message(rule, config); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule, config))); } /** @@ -567,6 +623,7 @@ public void updateGlobalRule(RuleType rule, RuleConfigurationDto config) var message = new UpdateGlobalRule2Message(rule, config); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule, config))); } /** @@ -577,6 +634,15 @@ public void deleteGlobalRule(RuleType rule) throws RuleNotFoundException, Regist var message = new DeleteGlobalRule1Message(rule); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + + switch (rule) { + case VALIDITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule, + RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()))); + case COMPATIBILITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule, + RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build()))); + case INTEGRITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule, + RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()))); + } } /** @@ -588,6 +654,8 @@ public void createGroup(GroupMetaDataDto group) var message = new CreateGroup1Message(group); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + + outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupCreated.of(group))); } /** @@ -598,6 +666,8 @@ public void deleteGroup(String groupId) throws GroupNotFoundException, RegistryS var message = new DeleteGroup1Message(groupId); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + + outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupDeleted.of(groupId))); } /** @@ -609,6 +679,7 @@ public void updateGroupMetaData(String groupId, EditableGroupMetaDataDto dto) { var message = new UpdateGroupMetaData2Message(groupId, dto); var uuid = ConcurrentUtil.get(submitter.submitMessage(message)); coordinator.waitForResponse(uuid); + outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupMetadataUpdated.of(groupId, dto))); } /** @@ -1001,4 +1072,10 @@ public String triggerSnapshotCreation() throws RegistryStorageException { public String createSnapshot(String snapshotLocation) throws RegistryStorageException { throw new IllegalStateException("Directly creating a snapshot is not supported in Kafkasql"); } + + @Override + public String createEvent(OutboxEvent event) { + // No op, the event is created by the event processor. + return event.getId(); + } } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java index 3f077b56c2..cfba4fae70 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java @@ -6,44 +6,21 @@ import io.apicurio.common.apps.config.Info; import io.apicurio.common.apps.core.System; import io.apicurio.registry.content.TypedContent; +import io.apicurio.registry.events.*; import io.apicurio.registry.exception.UnreachableCodeException; import io.apicurio.registry.model.BranchId; import io.apicurio.registry.model.GA; import io.apicurio.registry.model.GAV; import io.apicurio.registry.model.VersionId; +import io.apicurio.registry.rules.compatibility.CompatibilityLevel; +import io.apicurio.registry.rules.integrity.IntegrityLevel; +import io.apicurio.registry.rules.validity.ValidityLevel; import io.apicurio.registry.semver.SemVerConfigProperties; import io.apicurio.registry.storage.RegistryStorage; import io.apicurio.registry.storage.StorageBehaviorProperties; import io.apicurio.registry.storage.StorageEvent; import io.apicurio.registry.storage.StorageEventType; -import io.apicurio.registry.storage.VersionStateExt; -import io.apicurio.registry.storage.dto.ArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.ArtifactReferenceDto; -import io.apicurio.registry.storage.dto.ArtifactSearchResultsDto; -import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto; -import io.apicurio.registry.storage.dto.BranchMetaDataDto; -import io.apicurio.registry.storage.dto.BranchSearchResultsDto; -import io.apicurio.registry.storage.dto.CommentDto; -import io.apicurio.registry.storage.dto.ContentWrapperDto; -import io.apicurio.registry.storage.dto.DownloadContextDto; -import io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto; -import io.apicurio.registry.storage.dto.EditableBranchMetaDataDto; -import io.apicurio.registry.storage.dto.EditableGroupMetaDataDto; -import io.apicurio.registry.storage.dto.EditableVersionMetaDataDto; -import io.apicurio.registry.storage.dto.GroupMetaDataDto; -import io.apicurio.registry.storage.dto.GroupSearchResultsDto; -import io.apicurio.registry.storage.dto.OrderBy; -import io.apicurio.registry.storage.dto.OrderDirection; -import io.apicurio.registry.storage.dto.RoleMappingDto; -import io.apicurio.registry.storage.dto.RoleMappingSearchResultsDto; -import io.apicurio.registry.storage.dto.RuleConfigurationDto; -import io.apicurio.registry.storage.dto.SearchFilter; -import io.apicurio.registry.storage.dto.SearchedArtifactDto; -import io.apicurio.registry.storage.dto.SearchedBranchDto; -import io.apicurio.registry.storage.dto.SearchedGroupDto; -import io.apicurio.registry.storage.dto.SearchedVersionDto; -import io.apicurio.registry.storage.dto.StoredArtifactVersionDto; -import io.apicurio.registry.storage.dto.VersionSearchResultsDto; +import io.apicurio.registry.storage.dto.*; import io.apicurio.registry.storage.error.ArtifactAlreadyExistsException; import io.apicurio.registry.storage.error.ArtifactNotFoundException; import io.apicurio.registry.storage.error.BranchAlreadyExistsException; @@ -191,9 +168,6 @@ public abstract class AbstractSqlRegistryStorage implements RegistryStorage { @Inject SecurityIdentity securityIdentity; - @Inject - VersionStateExt artifactStateEx; - HandleFactory handles; @Inject @@ -205,6 +179,8 @@ public abstract class AbstractSqlRegistryStorage implements RegistryStorage { @Inject SemVerConfigProperties semVerConfigProps; + @Inject + protected SqlStatements sqlStatements() { return sqlStatements; } @@ -213,12 +189,20 @@ protected SqlStatements sqlStatements() { @Info(category = "storage", description = "SQL init", availableSince = "2.0.0.Final") boolean initDB; + @Inject + @ConfigProperty(name = "apicurio.events.kafka.topic", defaultValue = "registry-events") + @Info(category = "storage", description = "Storage event topic") + String eventsTopic; + @Inject Event sqlStorageEvent; @Inject Event storageEvent; + @Inject + Event outboxEvent; + private volatile boolean isReady = false; private volatile Instant isAliveLastCheck = Instant.MIN; private volatile boolean isAliveCached = false; @@ -545,15 +529,20 @@ public Pair createArtifact(Stri .modifiedBy(owner).artifactType(artifactType).labels(labels).build(); // The artifact was successfully created! Create the version as well, if one was included. + ImmutablePair pair; if (versionContent != null) { ArtifactVersionMetaDataDto vmdDto = createArtifactVersionRaw(handle, true, groupId, artifactId, version, versionMetaData, owner, createdOn, contentId, versionBranches); - return ImmutablePair.of(amdDto, vmdDto); + pair = ImmutablePair.of(amdDto, vmdDto); } else { - return ImmutablePair.left(amdDto); + pair = ImmutablePair.of(amdDto, null); } + + outboxEvent.fire(SqlOutboxEvent.of(ArtifactCreated.of(amdDto))); + + return pair; }); } catch (Exception ex) { if (sqlStatements.isPrimaryKeyViolation(ex)) { @@ -629,14 +618,19 @@ private ArtifactVersionMetaDataDto createArtifactVersionRaw(Handle handle, boole }); } - return handle.createQuery(sqlStatements.selectArtifactVersionMetaDataByGlobalId()).bind(0, globalId) + ArtifactVersionMetaDataDto avmd = handle + .createQuery(sqlStatements.selectArtifactVersionMetaDataByGlobalId()).bind(0, globalId) .map(ArtifactVersionMetaDataDtoMapper.instance).one(); + + outboxEvent.fire(SqlOutboxEvent.of(ArtifactVersionCreated.of(avmd))); + + return avmd; } /** * If SemVer support is enabled, create (or update) the automatic system generated semantic versioning * branches. - * + * * @param handle * @param gav */ @@ -789,6 +783,8 @@ public List deleteArtifact(String groupId, String artifactId) deleteAllOrphanedContentRaw(handle); + outboxEvent.fire(SqlOutboxEvent.of(ArtifactDeleted.of(groupId, artifactId))); + return versions; }); } @@ -1201,8 +1197,10 @@ public void updateArtifactMetaData(String groupId, String artifactId, modified = true; if (rowCount == 0) { throw new ArtifactNotFoundException(groupId, artifactId); + } else { + outboxEvent.fire( + SqlOutboxEvent.of(ArtifactMetadataUpdated.of(groupId, artifactId, metaData))); } - } return null; @@ -1261,6 +1259,10 @@ public void createArtifactRule(String groupId, String artifactId, RuleType rule, handle.createUpdate(sqlStatements.insertArtifactRule()).bind(0, normalizeGroupId(groupId)) .bind(1, artifactId).bind(2, rule.name()).bind(3, config.getConfiguration()) .execute(); + + outboxEvent.fire( + SqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule, config))); + return null; }); } catch (Exception ex) { @@ -1283,6 +1285,9 @@ public void createGroupRule(String groupId, RuleType rule, RuleConfigurationDto handles.withHandle(handle -> { handle.createUpdate(sqlStatements.insertGroupRule()).bind(0, normalizeGroupId(groupId)) .bind(1, rule.name()).bind(2, config.getConfiguration()).execute(); + + outboxEvent.fire(SqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, config))); + return null; }); } catch (Exception ex) { @@ -1378,6 +1383,9 @@ public void updateArtifactRule(String groupId, String artifactId, RuleType rule, } throw new RuleNotFoundException(rule); } + + outboxEvent.fire(SqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule, config))); + return null; }); } @@ -1397,6 +1405,9 @@ public void updateGroupRule(String groupId, RuleType rule, RuleConfigurationDto } throw new RuleNotFoundException(rule); } + + outboxEvent.fire(SqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, config))); + return null; }); } @@ -1415,6 +1426,19 @@ public void deleteArtifactRule(String groupId, String artifactId, RuleType rule) } throw new RuleNotFoundException(rule); } + + switch (rule) { + case VALIDITY -> outboxEvent.fire(SqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, + artifactId, rule, + RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()))); + case COMPATIBILITY -> outboxEvent.fire(SqlOutboxEvent + .of(ArtifactRuleConfigured.of(groupId, artifactId, rule, RuleConfigurationDto + .builder().configuration(CompatibilityLevel.NONE.name()).build()))); + case INTEGRITY -> outboxEvent.fire(SqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, + artifactId, rule, + RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()))); + } + return null; }); } @@ -1431,6 +1455,17 @@ public void deleteGroupRule(String groupId, RuleType rule) throws RegistryStorag } throw new RuleNotFoundException(rule); } + + switch (rule) { + case VALIDITY -> outboxEvent.fire(SqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, + RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()))); + case COMPATIBILITY -> outboxEvent + .fire(SqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, RuleConfigurationDto + .builder().configuration(CompatibilityLevel.NONE.name()).build()))); + case INTEGRITY -> outboxEvent.fire(SqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, + RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()))); + } + return null; }); } @@ -1699,6 +1734,8 @@ public void deleteArtifactVersion(String groupId, String artifactId, String vers deleteAllOrphanedContentRaw(handle); + outboxEvent.fire(SqlOutboxEvent.of(ArtifactVersionDeleted.of(groupId, artifactId, version))); + return null; }); } @@ -1790,6 +1827,9 @@ public void updateArtifactVersionMetaData(String groupId, String artifactId, Str } } + outboxEvent.fire(SqlOutboxEvent + .of(ArtifactVersionMetadataUpdated.of(groupId, artifactId, version, editableMetadata))); + return null; }); } @@ -1906,6 +1946,9 @@ public void createGlobalRule(RuleType rule, RuleConfigurationDto config) handles.withHandle(handle -> { handle.createUpdate(sqlStatements.insertGlobalRule()).bind(0, rule.name()) .bind(1, config.getConfiguration()).execute(); + + outboxEvent.fire(SqlOutboxEvent.of(GlobalRuleConfigured.of(rule, config))); + return null; }); } catch (Exception ex) { @@ -1946,6 +1989,9 @@ public void updateGlobalRule(RuleType rule, RuleConfigurationDto config) if (rowCount == 0) { throw new RuleNotFoundException(rule); } + + outboxEvent.fire(SqlOutboxEvent.of(GlobalRuleConfigured.of(rule, config))); + return null; }); } @@ -1959,6 +2005,17 @@ public void deleteGlobalRule(RuleType rule) throws RuleNotFoundException, Regist if (rowCount == 0) { throw new RuleNotFoundException(rule); } + + switch (rule) { + case VALIDITY -> outboxEvent.fire(SqlOutboxEvent.of(GlobalRuleConfigured.of(rule, + RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()))); + case COMPATIBILITY -> + outboxEvent.fire(SqlOutboxEvent.of(GlobalRuleConfigured.of(rule, RuleConfigurationDto + .builder().configuration(CompatibilityLevel.NONE.name()).build()))); + case INTEGRITY -> outboxEvent.fire(SqlOutboxEvent.of(GlobalRuleConfigured.of(rule, + RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()))); + } + return null; }); } @@ -2070,6 +2127,8 @@ public void createGroup(GroupMetaDataDto group) }); } + outboxEvent.fire(SqlOutboxEvent.of(GroupCreated.of(group))); + return null; }); } catch (Exception ex) { @@ -2082,7 +2141,7 @@ public void createGroup(GroupMetaDataDto group) /** * Deletes a group and all artifacts in that group. - * + * * @see io.apicurio.registry.storage.RegistryStorage#deleteGroup(java.lang.String) */ @Override @@ -2102,6 +2161,9 @@ public void deleteGroup(String groupId) throws GroupNotFoundException, RegistryS if (rows == 0) { throw new GroupNotFoundException(groupId); } + + outboxEvent.fire(SqlOutboxEvent.of(GroupDeleted.of(groupId))); + return null; }); } @@ -2138,6 +2200,8 @@ public void updateGroupMetaData(String groupId, EditableGroupMetaDataDto dto) { }); } + outboxEvent.fire(SqlOutboxEvent.of(GroupMetadataUpdated.of(groupId, dto))); + return null; }); } @@ -3534,10 +3598,35 @@ public String createSnapshot(String location) throws RegistryStorageException { return null; } + @Override + public String createEvent(OutboxEvent event) { + if (supportsDatabaseEvents()) { + // Create outbox event + handles.withHandle(handle -> { + handle.createUpdate(sqlStatements.createOutboxEvent()).bind(0, event.getId()) + .bind(1, eventsTopic).bind(2, event.getAggregateId()).bind(3, event.getType()) + .bind(4, event.getPayload().toString()).execute(); + + return handle.createUpdate(sqlStatements.deleteOutboxEvent()).bind(0, event.getId()) + .execute(); + }); + } + return event.getId(); + } + + @Override + public boolean supportsDatabaseEvents() { + return isPostgresql() || isMssql(); + } + private boolean isPostgresql() { return sqlStatements.dbType().equals("postgresql"); } + private boolean isMssql() { + return sqlStatements.dbType().equals("mssql"); + } + private boolean isH2() { return sqlStatements.dbType().equals("h2"); } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java index be8af1630f..9e37948822 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java @@ -1166,4 +1166,20 @@ public String deleteGroupRule() { public String deleteGroupRules() { return "DELETE FROM group_rules WHERE groupId = ?"; } + + @Override + public String createOutboxEvent() { + return """ + INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload) \ + VALUES (?, ?, ?, ?, ?::jsonb)\ + """; + } + + @Override + public String deleteOutboxEvent() { + return """ + DELETE FROM outbox o \ + WHERE o.id= ?\ + """; + } } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlEventsProcessor.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlEventsProcessor.java new file mode 100644 index 0000000000..a21a486b29 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlEventsProcessor.java @@ -0,0 +1,20 @@ +package io.apicurio.registry.storage.impl.sql; + +import io.apicurio.registry.storage.dto.OutboxEvent; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; + +@ApplicationScoped +public class SqlEventsProcessor { + + @Inject + SqlRegistryStorage sqlStore; + + public void processEvent(@Observes SqlOutboxEvent event) { + if (sqlStore.supportsDatabaseEvents() && sqlStore.isReady()) { + OutboxEvent outboxEvent = event.getOutboxEvent(); + sqlStore.createEvent(outboxEvent); + } + } +} diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlOutboxEvent.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlOutboxEvent.java new file mode 100644 index 0000000000..b6c6f4db48 --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlOutboxEvent.java @@ -0,0 +1,20 @@ +package io.apicurio.registry.storage.impl.sql; + +import io.apicurio.registry.storage.dto.OutboxEvent; + +public class SqlOutboxEvent { + + private final OutboxEvent outboxEvent; + + private SqlOutboxEvent(OutboxEvent outboxEvent) { + this.outboxEvent = outboxEvent; + } + + public static SqlOutboxEvent of(OutboxEvent outboxEvent) { + return new SqlOutboxEvent(outboxEvent); + } + + public OutboxEvent getOutboxEvent() { + return outboxEvent; + } +} diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java index fd8daeaadd..f09534174a 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java @@ -619,4 +619,7 @@ public interface SqlStatements { public String restoreFromSnapshot(); + String createOutboxEvent(); + + String deleteOutboxEvent(); } diff --git a/app/src/main/resources/application.properties b/app/src/main/resources/application.properties index 3f945b1f75..d53c3c73f3 100644 --- a/app/src/main/resources/application.properties +++ b/app/src/main/resources/application.properties @@ -135,11 +135,7 @@ apicurio.ui.features.read-only.enabled.dynamic.allow=${apicurio.config.dynamic.a apicurio.api.errors.include-stack-in-response=false # Events -apicurio.events.ksink= -apicurio.events.kafka.config.bootstrap.servers=localhost:9092 -apicurio.events.kafka.config.enable.idempotence=true -apicurio.events.kafka.config.retries=3 -apicurio.events.kafka.config.acks=all +apicurio.events.kafka.topic=registry-events # Logging apicurio.logconfigjob.every=5s diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/db-version b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/db-version index 105d7d9ad3..97a55e1d74 100644 --- a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/db-version +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/db-version @@ -1 +1 @@ -100 \ No newline at end of file +101 \ No newline at end of file diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl index d8bd6afc90..4d5f9c98e6 100644 --- a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/h2.ddl @@ -4,7 +4,7 @@ CREATE TABLE apicurio (propName VARCHAR(255) NOT NULL, propValue VARCHAR(255)); ALTER TABLE apicurio ADD PRIMARY KEY (propName); -INSERT INTO apicurio (propName, propValue) VALUES ('db_version', 100); +INSERT INTO apicurio (propName, propValue) VALUES ('db_version', 101); CREATE TABLE sequences (seqName VARCHAR(32) NOT NULL, seqValue BIGINT NOT NULL); ALTER TABLE sequences ADD PRIMARY KEY (seqName); diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/mssql.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/mssql.ddl index 2cdf641162..b9a312f593 100644 --- a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/mssql.ddl +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/mssql.ddl @@ -4,7 +4,7 @@ CREATE TABLE apicurio (propName NVARCHAR(255) NOT NULL, propValue NVARCHAR(255)); ALTER TABLE apicurio ADD PRIMARY KEY (propName); -INSERT INTO apicurio (propName, propValue) VALUES ('db_version', 100); +INSERT INTO apicurio (propName, propValue) VALUES ('db_version', 101); CREATE TABLE sequences (seqName NVARCHAR(32) NOT NULL, seqValue BIGINT NOT NULL); ALTER TABLE sequences ADD PRIMARY KEY (seqName); @@ -102,3 +102,6 @@ ALTER TABLE branch_versions ADD CONSTRAINT FK_branch_versions_2 FOREIGN KEY (gro CREATE INDEX IDX_branch_versions_1 ON branch_versions(groupId, artifactId, branchId, branchOrder); CREATE INDEX IDX_branch_versions_2 ON branch_versions(branchId); CREATE INDEX IDX_branch_versions_3 ON branch_versions(branchOrder); + +CREATE TABLE outbox (id VARCHAR(128) NOT NULL, aggregatetype VARCHAR(255) NOT NULL, aggregateid VARCHAR(255) NOT NULL, type VARCHAR(255) NOT NULL, payload JSONB NOT NULL); +ALTER TABLE outbox ADD PRIMARY KEY (id); diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/postgresql.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/postgresql.ddl index b94dbbd7b8..b1d0fd43d2 100644 --- a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/postgresql.ddl +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/postgresql.ddl @@ -4,7 +4,7 @@ CREATE TABLE apicurio (propName VARCHAR(255) NOT NULL, propValue VARCHAR(255)); ALTER TABLE apicurio ADD PRIMARY KEY (propName); -INSERT INTO apicurio (propName, propValue) VALUES ('db_version', 100); +INSERT INTO apicurio (propName, propValue) VALUES ('db_version', 101); CREATE TABLE sequences (seqName VARCHAR(32) NOT NULL, seqValue BIGINT NOT NULL); ALTER TABLE sequences ADD PRIMARY KEY (seqName); @@ -102,3 +102,6 @@ ALTER TABLE branch_versions ADD CONSTRAINT FK_branch_versions_2 FOREIGN KEY (gro CREATE INDEX IDX_branch_versions_1 ON branch_versions(groupId, artifactId, branchId, branchOrder); CREATE INDEX IDX_branch_versions_2 ON branch_versions(branchId); CREATE INDEX IDX_branch_versions_3 ON branch_versions(branchOrder); + +CREATE TABLE outbox (id VARCHAR(128) NOT NULL, aggregatetype VARCHAR(255) NOT NULL, aggregateid VARCHAR(255) NOT NULL, type VARCHAR(255) NOT NULL, payload JSONB NOT NULL); +ALTER TABLE outbox ADD PRIMARY KEY (id); diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/h2.upgrade.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/h2.upgrade.ddl new file mode 100644 index 0000000000..a80c75332e --- /dev/null +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/h2.upgrade.ddl @@ -0,0 +1,4 @@ +-- ********************************************************************* +-- DDL for the Apicurio Registry - Database: H2 +-- Upgrade Script from 100 to 101 +-- ********************************************************************* \ No newline at end of file diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/mssql.upgrade.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/mssql.upgrade.ddl new file mode 100644 index 0000000000..462f77ee3b --- /dev/null +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/mssql.upgrade.ddl @@ -0,0 +1,7 @@ +-- ********************************************************************* +-- DDL for the Apicurio Registry - Database: mssql +-- Upgrade Script from 100 to 101 +-- ********************************************************************* + +CREATE TABLE outbox (id VARCHAR(128) NOT NULL, aggregatetype VARCHAR(255) NOT NULL, aggregateid VARCHAR(255) NOT NULL, type VARCHAR(255) NOT NULL, payload JSONB NOT NULL); +ALTER TABLE outbox ADD PRIMARY KEY (id); \ No newline at end of file diff --git a/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/postgresql.upgrade.ddl b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/postgresql.upgrade.ddl new file mode 100644 index 0000000000..9b4e1c929a --- /dev/null +++ b/app/src/main/resources/io/apicurio/registry/storage/impl/sql/upgrades/101/postgresql.upgrade.ddl @@ -0,0 +1,7 @@ +-- ********************************************************************* +-- DDL for the Apicurio Registry - Database: postgresql +-- Upgrade Script from 100 to 101 +-- ********************************************************************* + +CREATE TABLE outbox (id VARCHAR(128) NOT NULL, aggregatetype VARCHAR(255) NOT NULL, aggregateid VARCHAR(255) NOT NULL, type VARCHAR(255) NOT NULL, payload JSONB NOT NULL); +ALTER TABLE outbox ADD PRIMARY KEY (id); \ No newline at end of file diff --git a/app/src/test/java/io/apicurio/registry/AbstractResourceTestBase.java b/app/src/test/java/io/apicurio/registry/AbstractResourceTestBase.java index c606742652..8fbec308ce 100644 --- a/app/src/test/java/io/apicurio/registry/AbstractResourceTestBase.java +++ b/app/src/test/java/io/apicurio/registry/AbstractResourceTestBase.java @@ -7,7 +7,10 @@ import io.apicurio.registry.rest.client.RegistryClient; import io.apicurio.registry.rest.client.models.CreateArtifact; import io.apicurio.registry.rest.client.models.CreateArtifactResponse; +import io.apicurio.registry.rest.client.models.CreateGroup; import io.apicurio.registry.rest.client.models.CreateVersion; +import io.apicurio.registry.rest.client.models.GroupMetaData; +import io.apicurio.registry.rest.client.models.Labels; import io.apicurio.registry.rest.client.models.VersionContent; import io.apicurio.registry.rest.client.models.VersionMetaData; import io.apicurio.registry.rest.v3.V3ApiUtil; @@ -116,6 +119,25 @@ protected CreateArtifactResponse createArtifact(String groupId, String artifactI return createArtifact(groupId, artifactId, artifactType, content, contentType, null); } + protected GroupMetaData createGroup(String groupId, String description, Labels labels, + Consumer requestCustomizer) throws Exception { + CreateGroup createGroup = new CreateGroup(); + createGroup.setGroupId(groupId); + createGroup.setDescription(description); + createGroup.setLabels(labels); + + if (requestCustomizer != null) { + requestCustomizer.accept(createGroup); + } + + var result = clientV3.groups().post(createGroup); + + assert (result.getGroupId().equals(groupId)); + assert (result.getDescription().equals(description)); + + return result; + } + protected CreateArtifactResponse createArtifact(String groupId, String artifactId, String artifactType, String content, String contentType, Consumer requestCustomizer) throws Exception { CreateArtifact createArtifact = new CreateArtifact(); @@ -233,6 +255,32 @@ protected void createArtifactRule(String groupId, String artifactId, RuleType ru clientV3.groups().byGroupId(groupId).artifacts().byArtifactId(artifactId).rules().post(createRule); } + protected void updateArtifactRule(String groupId, String artifactId, RuleType ruleType, + String ruleConfig) { + var updateRule = new io.apicurio.registry.rest.client.models.Rule(); + updateRule.setConfig(ruleConfig); + updateRule.setRuleType(io.apicurio.registry.rest.client.models.RuleType.forValue(ruleType.value())); + + clientV3.groups().byGroupId(groupId).artifacts().byArtifactId(artifactId).rules() + .byRuleType(ruleType.value()).put(updateRule); + } + + protected void createGroupRule(String groupId, RuleType ruleType, String ruleConfig) { + var createRule = new io.apicurio.registry.rest.client.models.CreateRule(); + createRule.setConfig(ruleConfig); + createRule.setRuleType(io.apicurio.registry.rest.client.models.RuleType.forValue(ruleType.value())); + + clientV3.groups().byGroupId(groupId).rules().post(createRule); + } + + protected void updateGroupRule(String groupId, RuleType ruleType, String ruleConfig) { + var updateRule = new io.apicurio.registry.rest.client.models.Rule(); + updateRule.setConfig(ruleConfig); + updateRule.setRuleType(io.apicurio.registry.rest.client.models.RuleType.forValue(ruleType.value())); + + clientV3.groups().byGroupId(groupId).rules().byRuleType(ruleType.value()).put(updateRule); + } + protected io.apicurio.registry.rest.client.models.Rule createGlobalRule(RuleType ruleType, String ruleConfig) { var createRule = new io.apicurio.registry.rest.client.models.CreateRule(); @@ -244,6 +292,17 @@ protected io.apicurio.registry.rest.client.models.Rule createGlobalRule(RuleType return clientV3.admin().rules().byRuleType(ruleType.value()).get(); } + protected io.apicurio.registry.rest.client.models.Rule updateGlobalRule(RuleType ruleType, + String ruleConfig) { + var createRule = new io.apicurio.registry.rest.client.models.Rule(); + createRule.setConfig(ruleConfig); + createRule.setRuleType(io.apicurio.registry.rest.client.models.RuleType.forValue(ruleType.value())); + + clientV3.admin().rules().byRuleType(ruleType.value()).put(createRule); + // TODO: verify this get + return clientV3.admin().rules().byRuleType(ruleType.value()).get(); + } + /** * Ensures the state of the meta-data response is what we expect. * diff --git a/app/src/test/java/io/apicurio/registry/event/kafkasql/KafkaSqlEventsTest.java b/app/src/test/java/io/apicurio/registry/event/kafkasql/KafkaSqlEventsTest.java new file mode 100644 index 0000000000..d1bfecc496 --- /dev/null +++ b/app/src/test/java/io/apicurio/registry/event/kafkasql/KafkaSqlEventsTest.java @@ -0,0 +1,25 @@ +package io.apicurio.registry.event.kafkasql; + +import io.apicurio.registry.event.sql.RegistryEventsTest; +import io.apicurio.registry.utils.tests.ApicurioTestTags; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import jakarta.enterprise.inject.Typed; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; + +import java.util.List; + +@QuarkusTest +@TestProfile(KafkaSqlEventsTestProfile.class) +@Tag(ApicurioTestTags.SLOW) +@Typed(KafkaSqlEventsTest.class) +public class KafkaSqlEventsTest extends RegistryEventsTest { + + @BeforeAll + @Override + public void init() { + consumer = getConsumer(System.getProperty("bootstrap.servers.external")); + consumer.subscribe(List.of("registry-events")); + } +} diff --git a/app/src/test/java/io/apicurio/registry/event/kafkasql/KafkaSqlEventsTestProfile.java b/app/src/test/java/io/apicurio/registry/event/kafkasql/KafkaSqlEventsTestProfile.java new file mode 100644 index 0000000000..c2d7f41428 --- /dev/null +++ b/app/src/test/java/io/apicurio/registry/event/kafkasql/KafkaSqlEventsTestProfile.java @@ -0,0 +1,22 @@ +package io.apicurio.registry.event.kafkasql; + +import io.apicurio.registry.utils.tests.KafkaTestContainerManager; +import io.quarkus.test.junit.QuarkusTestProfile; + +import java.util.List; +import java.util.Map; + +public class KafkaSqlEventsTestProfile implements QuarkusTestProfile { + + @Override + public Map getConfigOverrides() { + return Map.of("apicurio.storage.kind", "kafkasql", "apicurio.rest.deletion.artifact.enabled", "true", + "apicurio.rest.deletion.artifact-version.enabled", "true", + "apicurio.rest.deletion.group.enabled", "true"); + } + + @Override + public List testResources() { + return List.of(new TestResourceEntry(KafkaTestContainerManager.class)); + } +} \ No newline at end of file diff --git a/app/src/test/java/io/apicurio/registry/event/sql/EventsTestProfile.java b/app/src/test/java/io/apicurio/registry/event/sql/EventsTestProfile.java new file mode 100644 index 0000000000..166282a94a --- /dev/null +++ b/app/src/test/java/io/apicurio/registry/event/sql/EventsTestProfile.java @@ -0,0 +1,22 @@ +package io.apicurio.registry.event.sql; + +import io.apicurio.registry.utils.tests.DebeziumContainerResource; +import io.quarkus.test.junit.QuarkusTestProfile; + +import java.util.List; +import java.util.Map; + +public class EventsTestProfile implements QuarkusTestProfile { + + @Override + public Map getConfigOverrides() { + return Map.of("apicurio.storage.sql.kind", "postgresql", "apicurio.rest.deletion.artifact.enabled", + "true", "apicurio.rest.deletion.artifact-version.enabled", "true", + "apicurio.rest.deletion.group.enabled", "true"); + } + + @Override + public List testResources() { + return List.of(new TestResourceEntry(DebeziumContainerResource.class)); + } +} \ No newline at end of file diff --git a/app/src/test/java/io/apicurio/registry/event/sql/RegistryEventsTest.java b/app/src/test/java/io/apicurio/registry/event/sql/RegistryEventsTest.java new file mode 100644 index 0000000000..9e540d6539 --- /dev/null +++ b/app/src/test/java/io/apicurio/registry/event/sql/RegistryEventsTest.java @@ -0,0 +1,724 @@ +package io.apicurio.registry.event.sql; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.apicurio.registry.AbstractResourceTestBase; +import io.apicurio.registry.rest.client.models.CreateArtifactResponse; +import io.apicurio.registry.rest.client.models.EditableArtifactMetaData; +import io.apicurio.registry.rest.client.models.EditableGroupMetaData; +import io.apicurio.registry.rest.client.models.EditableVersionMetaData; +import io.apicurio.registry.rest.client.models.GroupMetaData; +import io.apicurio.registry.rest.client.models.Labels; +import io.apicurio.registry.rules.validity.ValidityLevel; +import io.apicurio.registry.storage.StorageEventType; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.types.ContentTypes; +import io.apicurio.registry.types.RuleType; +import io.apicurio.registry.utils.tests.ApicurioTestTags; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.rnorth.ducttape.unreliables.Unreliables; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static io.apicurio.registry.storage.StorageEventType.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@QuarkusTest +@TestProfile(EventsTestProfile.class) +@Tag(ApicurioTestTags.SLOW) +public class RegistryEventsTest extends AbstractResourceTestBase { + + protected KafkaConsumer consumer; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private static final String ARTIFACT_CONTENT = "{\"name\":\"redhat\"}"; + + @BeforeAll + public void init() { + consumer = getConsumer(System.getProperty("bootstrap.servers")); + consumer.subscribe(List.of("outbox.event.registry-events")); + } + + @Test + public void createGroup() throws Exception { + // Preparation + final String groupId = "createGroup"; + final String description = "createGroupDescription"; + + Labels labels = new Labels(); + + ensureGroupCreated(groupId, description, labels); + + // Consume the create event from the broker + List events = lookupEvent(consumer, GROUP_CREATED, Map.of("groupId", groupId)); + + Assertions.assertEquals(1, events.size()); + checkGroupEvent(groupId, events); + } + + @Test + public void updateGroupMetadata() throws Exception { + // Preparation + final String groupId = "updateGroupMetadata"; + final String description = "updateGroupMetadataDescription"; + + Labels labels = new Labels(); + + ensureGroupCreated(groupId, description, labels); + + EditableGroupMetaData emd = new EditableGroupMetaData(); + emd.setDescription("updateArtifactMetadataEventDescriptionEdited"); + clientV3.groups().byGroupId(groupId).put(emd); + + // Consume the create event from the broker + List events = lookupEvent(consumer, GROUP_METADATA_UPDATED, Map.of("groupId", groupId)); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("groupId").asText().equals(groupId) + && event.get("eventType").asText().equals(GROUP_METADATA_UPDATED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + Assertions.assertEquals(GROUP_METADATA_UPDATED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals("updateArtifactMetadataEventDescriptionEdited", + updateEvent.get("description").asText()); + } + + @Test + public void deleteGroupEvent() throws Exception { + // Preparation + final String groupId = "deleteGroupEvent"; + final String description = "deleteGroupEventDescription"; + + Labels labels = new Labels(); + + ensureGroupCreated(groupId, description, labels); + + clientV3.groups().byGroupId(groupId).delete(); + + // Consume the delete event from the broker + List deleteEvents = lookupEvent(consumer, GROUP_DELETED, Map.of("groupId", groupId)); + + JsonNode deleteEvent = null; + + for (JsonNode event : deleteEvents) { + if (event.get("groupId").asText().equals(groupId) + && event.get("eventType").asText().equals(GROUP_DELETED.name())) { + deleteEvent = event; + } + } + + Assertions.assertEquals(1, deleteEvents.size()); + Assertions.assertEquals(groupId, deleteEvent.get("groupId").asText()); + Assertions.assertEquals(GROUP_DELETED.name(), deleteEvent.get("eventType").asText()); + } + + @Test + void createArtifactEvent() throws Exception { + // Preparation + final String groupId = "testCreateArtifact"; + final String artifactId = generateArtifactId(); + + final String version = "1"; + final String name = "testCreateArtifactName"; + final String description = "testCreateArtifactDescription"; + + ensureArtifactCreated(groupId, artifactId, version, name, description); + List events = lookupEvent(consumer, ARTIFACT_CREATED, + Map.of("groupId", groupId, "artifactId", artifactId)); + Assertions.assertEquals(1, events.size()); + checkArtifactEvent(groupId, artifactId, name, events.get(0)); + } + + @Test + public void updateArtifactMetadataEvent() throws Exception { + // Preparation + final String groupId = "updateArtifactMetadataEvent"; + final String artifactId = generateArtifactId(); + + final String version = "1"; + final String name = "updateArtifactMetadataEventName"; + final String description = "updateArtifactMetadataEventDescription"; + + ensureArtifactCreated(groupId, artifactId, version, name, description); + + EditableArtifactMetaData emd = new EditableArtifactMetaData(); + emd.setName("updateArtifactMetadataEventNameEdited"); + clientV3.groups().byGroupId(groupId).artifacts().byArtifactId(artifactId).put(emd); + + // Consume the update events from the broker + List updateEvents = lookupEvent(consumer, ARTIFACT_METADATA_UPDATED, + Map.of("groupId", groupId, "artifactId", artifactId)); + + JsonNode updateEvent = null; + + for (JsonNode event : updateEvents) { + if (event.get("groupId").asText().equals(groupId) + && event.get("eventType").asText().equals(ARTIFACT_METADATA_UPDATED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, updateEvents.size()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + Assertions.assertEquals(ARTIFACT_METADATA_UPDATED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(artifactId, updateEvent.get("artifactId").asText()); + Assertions.assertEquals("updateArtifactMetadataEventNameEdited", updateEvent.get("name").asText()); + } + + @Test + public void deleteArtifactEvent() throws Exception { + // Preparation + final String groupId = "deleteArtifactEvent"; + final String artifactId = generateArtifactId(); + + final String version = "1"; + final String name = "deleteArtifactEventName"; + final String description = "deleteArtifactEventDescription"; + + ensureArtifactCreated(groupId, artifactId, version, name, description); + + clientV3.groups().byGroupId(groupId).artifacts().byArtifactId(artifactId).delete(); + + // Consume the delete event from the broker + List deleteEvents = lookupEvent(consumer, ARTIFACT_DELETED, + Map.of("groupId", groupId, "artifactId", artifactId)); + + JsonNode updateEvent = null; + + for (JsonNode event : deleteEvents) { + if (event.get("groupId").asText().equals(groupId) + && event.get("eventType").asText().equals(ARTIFACT_DELETED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, deleteEvents.size()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + Assertions.assertEquals(ARTIFACT_DELETED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(artifactId, updateEvent.get("artifactId").asText()); + } + + @Test + public void createArtifactVersion() throws Exception { + // Preparation + final String groupId = "createArtifactVersion"; + + final String artifactId = generateArtifactId(); + + String name = "createArtifactVersionName"; + String description = "createArtifactVersionDescription"; + + ensureArtifactCreated(groupId, artifactId, name, description); + // Consume the create event from the broker + List events = lookupEvent(consumer, ARTIFACT_VERSION_CREATED, + Map.of("groupId", groupId, "artifactId", artifactId)); + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("groupId").asText().equals(groupId) + && event.get("eventType").asText().equals(ARTIFACT_VERSION_CREATED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + Assertions.assertEquals(ARTIFACT_VERSION_CREATED.name(), updateEvent.get("eventType").asText()); + } + + @Test + public void updateArtifactVersionMetadata() throws Exception { + // Preparation + final String groupId = "updateArtifactVersionMetadata"; + final String artifactId = generateArtifactId(); + + String name = "updateArtifactVersionMetadataName"; + String description = "updateArtifactVersionMetadataDescription"; + + ensureArtifactCreated(groupId, artifactId, name, description); + + EditableVersionMetaData emd = new EditableVersionMetaData(); + emd.setDescription("updateArtifactVersionMetadataEventDescriptionEdited"); + clientV3.groups().byGroupId(groupId).artifacts().byArtifactId(artifactId).versions() + .byVersionExpression("1").put(emd); + + // Consume the create event from the broker + List events = lookupEvent(consumer, ARTIFACT_VERSION_METADATA_UPDATED, + Map.of("groupId", groupId, "artifactId", artifactId)); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("groupId").asText().equals(groupId) + && event.get("eventType").asText().equals(ARTIFACT_VERSION_METADATA_UPDATED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + Assertions.assertEquals(ARTIFACT_VERSION_METADATA_UPDATED.name(), + updateEvent.get("eventType").asText()); + Assertions.assertEquals("updateArtifactVersionMetadataEventDescriptionEdited", + updateEvent.get("description").asText()); + } + + @Test + public void deleteArtifactVersion() throws Exception { + // Preparation + final String groupId = "createArtifactVersion"; + final String artifactId = generateArtifactId(); + String name = "deleteArtifactVersionName"; + String description = "deleteArtifactVersionDescription"; + + ensureArtifactCreated(groupId, artifactId, name, description); + + clientV3.groups().byGroupId(groupId).artifacts().byArtifactId(artifactId).versions() + .byVersionExpression("1").delete(); + + // Consume the delete event from the broker + List deleteEvents = lookupEvent(consumer, ARTIFACT_VERSION_DELETED, + Map.of("groupId", groupId, "artifactId", artifactId)); + + JsonNode deleteEvent = null; + + for (JsonNode event : deleteEvents) { + if (event.get("groupId").asText().equals(groupId) + && event.get("eventType").asText().equals(ARTIFACT_VERSION_DELETED.name())) { + deleteEvent = event; + } + } + + Assertions.assertEquals(1, deleteEvents.size()); + Assertions.assertEquals(groupId, deleteEvent.get("groupId").asText()); + Assertions.assertEquals(ARTIFACT_VERSION_DELETED.name(), deleteEvent.get("eventType").asText()); + } + + @Test + public void globalRuleCreated() throws Exception { + createGlobalRule(RuleType.VALIDITY, "SYNTAX_ONLY"); + + // Consume the create event from the broker + List events = lookupEvent(consumer, GLOBAL_RULE_CONFIGURED, + Map.of("ruleType", RuleType.VALIDITY.value())); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(GLOBAL_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(GLOBAL_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + } + + @Test + public void globalRuleUpdated() throws Exception { + createGlobalRule(RuleType.VALIDITY, "SYNTAX_ONLY"); + + // Consume the create event from the broker + updateGlobalRule(RuleType.VALIDITY, ValidityLevel.FULL.name()); + + // Lookup for the update event + List events = lookupEvent(consumer, GLOBAL_RULE_CONFIGURED, + Map.of("rule", ValidityLevel.FULL.name())); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(GLOBAL_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(GLOBAL_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(ValidityLevel.FULL.name(), updateEvent.get("rule").asText()); + } + + @Test + public void globalRuleDeleted() throws Exception { + createGlobalRule(RuleType.VALIDITY, "SYNTAX_ONLY"); + + clientV3.admin().rules().byRuleType(RuleType.VALIDITY.name()).delete(); + + // Lookup for the update event + List events = lookupEvent(consumer, GLOBAL_RULE_CONFIGURED, + Map.of("rule", ValidityLevel.NONE.name())); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(GLOBAL_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(GLOBAL_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(ValidityLevel.NONE.name(), updateEvent.get("rule").asText()); + } + + @Test + public void groupRuleCreated() throws Exception { + // Preparation + final String groupId = "groupRuleConfigured"; + final String description = "groupRuleConfiguredDescription"; + + Labels labels = new Labels(); + + ensureGroupCreated(groupId, description, labels); + + createGroupRule(groupId, RuleType.VALIDITY, "SYNTAX_ONLY"); + + // Consume the create event from the broker + List events = lookupEvent(consumer, GROUP_RULE_CONFIGURED, Map.of("groupId", groupId)); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(GROUP_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(GROUP_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + } + + @Test + public void groupRuleUpdated() throws Exception { + // Preparation + final String groupId = "groupRuleUpdated"; + final String description = "groupRuleUpdatedDescription"; + + Labels labels = new Labels(); + + ensureGroupCreated(groupId, description, labels); + + createGroupRule(groupId, RuleType.VALIDITY, "SYNTAX_ONLY"); + + updateGroupRule(groupId, RuleType.VALIDITY, ValidityLevel.FULL.name()); + + // Lookup for the update event + List events = lookupEvent(consumer, GROUP_RULE_CONFIGURED, + Map.of("groupId", groupId, "rule", ValidityLevel.FULL.name())); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(GROUP_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(GROUP_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(ValidityLevel.FULL.name(), updateEvent.get("rule").asText()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + } + + @Test + public void groupRuleDeleted() throws Exception { + // Preparation + final String groupId = "groupRuleDeleted"; + final String description = "groupRuleDeletedDescription"; + + Labels labels = new Labels(); + + ensureGroupCreated(groupId, description, labels); + + createGroupRule(groupId, RuleType.VALIDITY, "SYNTAX_ONLY"); + + clientV3.groups().byGroupId(groupId).rules().byRuleType(RuleType.VALIDITY.name()).delete(); + + // Lookup for the update event + List events = lookupEvent(consumer, GROUP_RULE_CONFIGURED, + Map.of("groupId", groupId, "rule", ValidityLevel.NONE.name())); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(GROUP_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(GROUP_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(ValidityLevel.NONE.name(), updateEvent.get("rule").asText()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + } + + @Test + public void artifactRuleConfigured() throws Exception { + // Preparation + final String groupId = "artifactRuleConfigured"; + final String artifactId = generateArtifactId(); + + final String version = "1"; + final String name = "artifactRuleConfiguredName"; + final String description = "artifactRuleConfiguredDescription"; + + ensureArtifactCreated(groupId, artifactId, version, name, description); + createArtifactRule(groupId, artifactId, RuleType.VALIDITY, "SYNTAX_ONLY"); + + // Consume the create event from the broker + List events = lookupEvent(consumer, ARTIFACT_RULE_CONFIGURED, + Map.of("groupId", groupId, "artifactId", artifactId)); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(ARTIFACT_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(ARTIFACT_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + Assertions.assertEquals(artifactId, updateEvent.get("artifactId").asText()); + } + + @Test + public void artifactRuleUpdated() throws Exception { + // Preparation + final String groupId = "artifactRuleUpdated"; + final String description = "artifactRuleUpdatedDescription"; + final String artifactId = generateArtifactId(); + + final String version = "1"; + final String name = "artifactRuleUpdatedName"; + + ensureArtifactCreated(groupId, artifactId, version, name, description); + createArtifactRule(groupId, artifactId, RuleType.VALIDITY, "SYNTAX_ONLY"); + + updateArtifactRule(groupId, artifactId, RuleType.VALIDITY, ValidityLevel.FULL.name()); + + // Lookup for the update event + List events = lookupEvent(consumer, ARTIFACT_RULE_CONFIGURED, + Map.of("groupId", groupId, "rule", ValidityLevel.FULL.name(), "artifactId", artifactId)); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(ARTIFACT_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(ARTIFACT_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(ValidityLevel.FULL.name(), updateEvent.get("rule").asText()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + Assertions.assertEquals(artifactId, updateEvent.get("artifactId").asText()); + } + + @Test + public void artifactRuleDeleted() throws Exception { + // Preparation + final String groupId = "artifactRuleUpdated"; + final String description = "artifactRuleUpdatedDescription"; + final String artifactId = generateArtifactId(); + + final String version = "1"; + final String name = "artifactRuleUpdatedName"; + + ensureArtifactCreated(groupId, artifactId, version, name, description); + createArtifactRule(groupId, artifactId, RuleType.VALIDITY, "SYNTAX_ONLY"); + + clientV3.groups().byGroupId(groupId).artifacts().byArtifactId(artifactId).rules() + .byRuleType(RuleType.VALIDITY.name()).delete(); + + // Lookup for the update event + List events = lookupEvent(consumer, ARTIFACT_RULE_CONFIGURED, + Map.of("groupId", groupId, "rule", ValidityLevel.NONE.name(), "artifactId", artifactId)); + + JsonNode updateEvent = null; + + for (JsonNode event : events) { + if (event.get("eventType").asText().equals(ARTIFACT_RULE_CONFIGURED.name())) { + updateEvent = event; + } + } + + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(ARTIFACT_RULE_CONFIGURED.name(), updateEvent.get("eventType").asText()); + Assertions.assertEquals(ValidityLevel.NONE.name(), updateEvent.get("rule").asText()); + Assertions.assertEquals(groupId, updateEvent.get("groupId").asText()); + Assertions.assertEquals(artifactId, updateEvent.get("artifactId").asText()); + } + + private void checkGroupEvent(String groupId, List events) { + JsonNode createEvent = null; + for (JsonNode event : events) { + if (event.get("groupId").asText().equals(groupId)) { + createEvent = event; + } + } + + Assertions.assertEquals(groupId, createEvent.get("groupId").asText()); + Assertions.assertEquals(GROUP_CREATED.name(), createEvent.get("eventType").asText()); + } + + private void checkArtifactEvent(String groupId, String artifactId, String name, JsonNode event) { + Assertions.assertEquals(groupId, event.get("groupId").asText()); + Assertions.assertEquals(ARTIFACT_CREATED.name(), event.get("eventType").asText()); + Assertions.assertEquals(artifactId, event.get("artifactId").asText()); + Assertions.assertEquals(name, event.get("name").asText()); + } + + public CreateArtifactResponse ensureArtifactCreated(String groupId, String artifactId, String name, + String description) throws Exception { + CreateArtifactResponse created = createArtifact(groupId, artifactId, ArtifactType.JSON, + ARTIFACT_CONTENT, ContentTypes.APPLICATION_JSON, (createArtifact -> { + createArtifact.setName(name); + createArtifact.setDescription(description); + })); + + // Assertions + assertNotNull(created); + assertEquals(groupId, created.getArtifact().getGroupId()); + assertEquals(artifactId, created.getArtifact().getArtifactId()); + assertEquals(name, created.getArtifact().getName()); + assertEquals(description, created.getArtifact().getDescription()); + + return created; + } + + public CreateArtifactResponse ensureArtifactCreated(String groupId, String artifactId, String version, + String name, String description) throws Exception { + CreateArtifactResponse created = createArtifact(groupId, artifactId, ArtifactType.JSON, + ARTIFACT_CONTENT, ContentTypes.APPLICATION_JSON, (createArtifact -> { + createArtifact.setName(name); + createArtifact.setDescription(description); + createArtifact.getFirstVersion().setVersion(version); + })); + + // Assertions + assertNotNull(created); + assertEquals(groupId, created.getArtifact().getGroupId()); + assertEquals(artifactId, created.getArtifact().getArtifactId()); + assertEquals(version, created.getVersion().getVersion()); + assertEquals(name, created.getArtifact().getName()); + assertEquals(description, created.getArtifact().getDescription()); + assertEquals(ARTIFACT_CONTENT, + new String( + clientV3.groups().byGroupId(groupId).artifacts().byArtifactId(artifactId).versions() + .byVersionExpression("branch=latest").content().get().readAllBytes(), + StandardCharsets.UTF_8)); + + return created; + } + + public void ensureGroupCreated(String groupId, String description, Labels labels) throws Exception { + GroupMetaData created = createGroup(groupId, description, labels, (createGroup -> { + createGroup.setDescription(description); + createGroup.setGroupId(groupId); + createGroup.setLabels(labels); + })); + + // Assertions + assertNotNull(created); + assertEquals(groupId, created.getGroupId()); + assertEquals(description, created.getDescription()); + } + + protected KafkaConsumer getConsumer(String bootstrapServers) { + return new KafkaConsumer<>( + Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), + new StringDeserializer(), new StringDeserializer()); + } + + private List lookupEvent(KafkaConsumer consumer, String fieldLookupName, + String fieldValue, StorageEventType eventType) { + + List events = new ArrayList<>(); + + Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { + consumer.poll(Duration.ofMillis(50)).iterator().forEachRemaining(record -> { + events.add(readEventPayload(record)); + }); + + return events.stream().anyMatch(event -> event.get(fieldLookupName).asText().equals(fieldValue) + && event.get("eventType").asText().equals(eventType.name())); + + }); + return events; + } + + private List lookupEvent(KafkaConsumer consumer, StorageEventType eventType, + Map lookups) { + + List consumedEvents = new ArrayList<>(); + List lookedUpEvents = new ArrayList<>(); + + Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { + consumer.poll(Duration.ofMillis(50)).iterator().forEachRemaining(record -> { + consumedEvents.add(readEventPayload(record)); + }); + + boolean eventFound = false; + for (JsonNode event : consumedEvents) { + if (event.get("eventType").asText().equals(eventType.name()) && lookups.keySet().stream() + .allMatch(fieldName -> checkField(event, fieldName, lookups.get(fieldName)))) { + lookedUpEvents.add(event); + eventFound = true; + } else { + eventFound = false; + } + } + return eventFound; + }); + return lookedUpEvents; + } + + private boolean checkField(JsonNode event, String fieldName, String expectedFieldValue) { + return event.get(fieldName).asText().equals(expectedFieldValue); + } + + private JsonNode readEventPayload(ConsumerRecord event) { + String eventPayload = null; + try { + eventPayload = objectMapper.readTree(event.value()).asText(); + + if (eventPayload.isBlank()) { + eventPayload = event.value(); + } + + return objectMapper.readValue(eventPayload, JsonNode.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/app/src/test/java/io/apicurio/registry/storage/impl/readonly/ReadOnlyRegistryStorageTest.java b/app/src/test/java/io/apicurio/registry/storage/impl/readonly/ReadOnlyRegistryStorageTest.java index 04f1a698e8..ac40a41635 100644 --- a/app/src/test/java/io/apicurio/registry/storage/impl/readonly/ReadOnlyRegistryStorageTest.java +++ b/app/src/test/java/io/apicurio/registry/storage/impl/readonly/ReadOnlyRegistryStorageTest.java @@ -189,6 +189,8 @@ public class ReadOnlyRegistryStorageTest { entry("triggerSnapshotCreation0", new State(true, RegistryStorage::triggerSnapshotCreation)), entry("createSnapshot1", new State(true, s -> s.createSnapshot(null))), entry("upgradeData3", new State(true, s -> s.upgradeData(null, false, false))), + entry("createEvent1", new State(true, s -> s.createEvent(null))), + entry("supportsDatabaseEvents0", new State(true, s -> s.createEvent(null))), entry("getContentByReference1", new State(true, s -> s.getContentByReference(null)))); CURRENT_METHODS = Arrays.stream(RegistryStorage.class.getMethods()) diff --git a/common/src/main/java/io/apicurio/registry/events/dto/RegistryEventType.java b/common/src/main/java/io/apicurio/registry/events/dto/RegistryEventType.java deleted file mode 100644 index df57bf8a04..0000000000 --- a/common/src/main/java/io/apicurio/registry/events/dto/RegistryEventType.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.apicurio.registry.events.dto; - -import io.quarkus.runtime.annotations.RegisterForReflection; - -@RegisterForReflection -public enum RegistryEventType { - - GROUP_CREATED, GROUP_UPDATED, GROUP_DELETED, - - ARTIFACTS_IN_GROUP_DELETED, - - ARTIFACT_CREATED, ARTIFACT_UPDATED, ARTIFACT_DELETED, - - ARTIFACT_STATE_CHANGED, - - ARTIFACT_RULE_CREATED, ARTIFACT_RULE_UPDATED, ARTIFACT_RULE_DELETED, ALL_ARTIFACT_RULES_DELETED, - - GLOBAL_RULE_CREATED, GLOBAL_RULE_UPDATED, GLOBAL_RULE_DELETED, ALL_GLOBAL_RULES_DELETED; - - private String cloudEventType; - - private RegistryEventType() { - this.cloudEventType = "io.apicurio.registry." + this.name().toLowerCase().replace("_", "-"); - } - - public String cloudEventType() { - return this.cloudEventType; - } -} diff --git a/docs/modules/ROOT/partials/getting-started/ref-registry-all-configs.adoc b/docs/modules/ROOT/partials/getting-started/ref-registry-all-configs.adoc index c9cd394150..e9be6e73dd 100644 --- a/docs/modules/ROOT/partials/getting-started/ref-registry-all-configs.adoc +++ b/docs/modules/ROOT/partials/getting-started/ref-registry-all-configs.adoc @@ -689,6 +689,11 @@ The following {registry} configuration options are available for each component |`sa` |`3.0.0.Final` |Application datasource username +|`apicurio.events.kafka.topic` +|`string` +|`registry-events` +| +|Storage event topic |`apicurio.kafkasql.bootstrap.servers` |`string` | @@ -752,12 +757,12 @@ The following {registry} configuration options are available for each component |`apicurio.kafkasql.snapshot.every.seconds` |`string` |`86400s` -| +|`3.0.0` |Kafka sql journal topic snapshot every |`apicurio.kafkasql.snapshots.topic` |`string` |`kafkasql-snapshots` -| +|`3.0.0` |Kafka sql storage topic name |`apicurio.kafkasql.ssl.key.password` |`optional` @@ -812,7 +817,7 @@ The following {registry} configuration options are available for each component |`apicurio.storage.snapshot.location` |`string` |`./` -| +|`3.0.0` |Kafka sql snapshots store location |`apicurio.storage.sql.kind` |`string` diff --git a/docs/modules/ROOT/partials/getting-started/ref-registry-config-migration.adoc b/docs/modules/ROOT/partials/getting-started/ref-registry-config-migration.adoc index a57cdb4e02..82a6debec3 100644 --- a/docs/modules/ROOT/partials/getting-started/ref-registry-config-migration.adoc +++ b/docs/modules/ROOT/partials/getting-started/ref-registry-config-migration.adoc @@ -119,7 +119,7 @@ NOTE: For each configuration property you can override the value by using the co |Name |New Option |`registry.events.ksink` -|`apicurio.events.ksink` +|`removed` |=== == health @@ -183,7 +183,7 @@ NOTE: For each configuration property you can override the value by using the co |Name |New Option |`registry.events.kafka.topic` -|`Removed` +|`apicurio.events.kafka.topic` |`registry.events.kafka.topic-partition` |`Removed` |=== diff --git a/integration-tests/src/test/resources/infra/kafka/kafka.yml b/integration-tests/src/test/resources/infra/kafka/kafka.yml index 3909231a91..7c9a3d1347 100644 --- a/integration-tests/src/test/resources/infra/kafka/kafka.yml +++ b/integration-tests/src/test/resources/infra/kafka/kafka.yml @@ -26,6 +26,29 @@ spec: selector: app: kafka-service --- +apiVersion: v1 +kind: Service +metadata: + namespace: "apicurio-registry-e2e" + labels: + app: kafka-service + name: kafka-service-external +spec: + ports: + - protocol: TCP + port: 29092 + targetPort: 29092 + selector: + app: kafka-service + type: LoadBalancer + sessionAffinity: None + externalTrafficPolicy: Cluster + ipFamilies: + - IPv4 + ipFamilyPolicy: SingleStack + allocateLoadBalancerNodePorts: true + internalTrafficPolicy: Cluster +--- apiVersion: apps/v1 kind: Deployment metadata: @@ -46,11 +69,15 @@ spec: command: - /bin/sh - -c - - "export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS}" + - "export CLUSTER_ID=$$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $$CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override listener.security.protocol.map=$${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} --override listeners=$${KAFKA_LISTENERS}" env: - name: LOG_DIR value: /tmp/logs + - name: KAFKA_LISTENERS + value: "PLAINTEXT://:9092,PLAINTEXT_HOST://:29092,CONTROLLER://:9093" - name: KAFKA_ADVERTISED_LISTENERS - value: PLAINTEXT://kafka-service:9092 + value: "PLAINTEXT://kafka-service:9092,PLAINTEXT_HOST://localhost:29092" + - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP + value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" ports: - containerPort: 9092 \ No newline at end of file diff --git a/integration-tests/src/test/resources/infra/kafka/registry-kafka.yml b/integration-tests/src/test/resources/infra/kafka/registry-kafka.yml index 25b0ff60b3..0bdc180966 100644 --- a/integration-tests/src/test/resources/infra/kafka/registry-kafka.yml +++ b/integration-tests/src/test/resources/infra/kafka/registry-kafka.yml @@ -27,7 +27,7 @@ spec: - name: APICURIO_STORAGE_KIND value: "kafkasql" - name: QUARKUS_LOG_CATEGORY__IO_APICURIO__LEVEL - value: "INFO" + value: "DEBUG" - name: APICURIO_REST_DELETION_ARTIFACT_ENABLED value: "true" - name: APICURIO_REST_DELETION_ARTIFACTVERSION_ENABLED diff --git a/pom.xml b/pom.xml index db021555a6..94e1dc4c36 100644 --- a/pom.xml +++ b/pom.xml @@ -214,6 +214,7 @@ 0.1.18.Final 1.2.5 3.6.0 + 2.6.2.Final 3.3.1 2.20.2 @@ -786,6 +787,11 @@ pulsar ${test-containers.version} + + io.debezium + debezium-testing-testcontainers + ${debezium.version} + io.strimzi strimzi-test-container diff --git a/utils/tests/pom.xml b/utils/tests/pom.xml index 750a4d4e92..1306690a39 100644 --- a/utils/tests/pom.xml +++ b/utils/tests/pom.xml @@ -101,6 +101,17 @@ pulsar + + io.debezium + debezium-testing-testcontainers + + + + org.testcontainers + postgresql + test + + com.github.tomakehurst wiremock-jre8 @@ -128,6 +139,10 @@ io.apicurio apicurio-common-rest-client-common + + org.testcontainers + postgresql + diff --git a/utils/tests/src/main/java/io/apicurio/registry/utils/tests/DebeziumContainerResource.java b/utils/tests/src/main/java/io/apicurio/registry/utils/tests/DebeziumContainerResource.java new file mode 100644 index 0000000000..159448ab87 --- /dev/null +++ b/utils/tests/src/main/java/io/apicurio/registry/utils/tests/DebeziumContainerResource.java @@ -0,0 +1,78 @@ +package io.apicurio.registry.utils.tests; + +import io.debezium.testing.testcontainers.ConnectorConfiguration; +import io.debezium.testing.testcontainers.DebeziumContainer; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.util.Map; +import java.util.stream.Stream; + +public class DebeziumContainerResource implements QuarkusTestResourceLifecycleManager { + + private static final Network network = Network.newNetwork(); + + private static final KafkaContainer kafkaContainer = DebeziumKafkaContainer + .defaultKRaftContainer(network); + + public static PostgreSQLContainer postgresContainer = new PostgreSQLContainer<>( + DockerImageName.parse("quay.io/debezium/postgres:15").asCompatibleSubstituteFor("postgres")) + .withDatabaseName("registry").withUsername("postgres").withPassword("postgres") + .withNetwork(network).withNetworkAliases("postgres"); + + public static DebeziumContainer debeziumContainer = new DebeziumContainer( + "quay.io/debezium/connect:2.6.2.Final").withNetwork(network).withKafka(kafkaContainer) + .dependsOn(kafkaContainer); + + @Override + public Map start() { + // Start the postgresql database, kafka, and debezium + Startables.deepStart(Stream.of(kafkaContainer, postgresContainer, debeziumContainer)).join(); + + // Register the postgresql connector + ConnectorConfiguration connector = ConnectorConfiguration.forJdbcContainer(postgresContainer) + .with("topic.prefix", "registry").with("schema.include.list", "public") + .with("table.include.list", "public.outbox").with("transforms", "outbox") + .with("transforms.outbox.type", "io.debezium.transforms.outbox.EventRouter"); + + debeziumContainer.registerConnector("my-connector", connector); + + System.setProperty("bootstrap.servers", kafkaContainer.getBootstrapServers()); + + return Map.of("apicurio.datasource.url", postgresContainer.getJdbcUrl(), + "apicurio.datasource.username", "postgres", "apicurio.datasource.password", "postgres"); + } + + @Override + public void stop() { + debeziumContainer.stop(); + postgresContainer.stop(); + kafkaContainer.stop(); + } + + public class DebeziumKafkaContainer { + private static final String defaultImage = "confluentinc/cp-kafka:7.2.10"; + + public static KafkaContainer defaultKRaftContainer(Network network) { + try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(defaultImage)) + .withNetwork(network).withKraft()) { + return kafka; + } catch (Exception e) { + throw new RuntimeException("Cannot create KRaftContainer with default image.", e); + } + } + + public static KafkaContainer defaultKafkaContainer(Network network) { + try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(defaultImage)) + .withNetwork(network)) { + return kafka; + } catch (Exception e) { + throw new RuntimeException("Cannot create KafkaContainer with default image.", e); + } + } + } +} diff --git a/utils/tests/src/main/java/io/apicurio/registry/utils/tests/KafkaTestContainerManager.java b/utils/tests/src/main/java/io/apicurio/registry/utils/tests/KafkaTestContainerManager.java index df0d0432ac..2e910b2336 100644 --- a/utils/tests/src/main/java/io/apicurio/registry/utils/tests/KafkaTestContainerManager.java +++ b/utils/tests/src/main/java/io/apicurio/registry/utils/tests/KafkaTestContainerManager.java @@ -38,7 +38,6 @@ public Map start() { System.setProperty("bootstrap.servers.external", externalBootstrapServers); return Map.of("bootstrap.servers", externalBootstrapServers, - "apicurio.events.kafka.config.bootstrap.servers", externalBootstrapServers, "apicurio.kafkasql.bootstrap.servers", externalBootstrapServers); } else { return Collections.emptyMap();