Skip to content

Commit

Permalink
Support kafka sql snapshotting (#4665)
Browse files Browse the repository at this point in the history
* Add basic structure for kafkasql snapshotting

* Use strimzi container for testing

* Add snapshot creation and basic testing structure

* Implement restoring h2 database from snapshot

* Improve snapshot creation process

* Finish coordination mechanism for snapshot creation and add snapshot creation test

* Make the replica that triggered the snapshot actually create it

* Add some log messages to improve traceability

* Fix snapshotting unit test

* Enable ryuk in ci

* Randomize db test url

* Fix registry loader

* Add kafkasql snapshotting integration tests
  • Loading branch information
carlesarnal authored May 29, 2024
1 parent cb2879b commit 2ab68fa
Show file tree
Hide file tree
Showing 54 changed files with 5,621 additions and 227 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ jobs:
- name: Run Integration Tests - migration - Kafkasql
run: ./mvnw -T 1.5C verify -am --no-transfer-progress -Pintegration-tests -Pmigration -Dregistry-kafkasql-image=ttl.sh/${{ github.sha }}/apicurio/apicurio-registry:1d -Premote-kafka -pl integration-tests -Dmaven.javadoc.skip=true

- name: Run Integration Tests - snapshotting - Kafkasql
run: ./mvnw -T 1.5C verify -am --no-transfer-progress -Pintegration-tests -Pkafkasql-snapshotting -Dregistry-kafkasql-image=ttl.sh/${{ github.sha }}/apicurio/apicurio-registry:1d -Premote-kafka -pl integration-tests -Dmaven.javadoc.skip=true

- name: Collect logs
if: failure()
run: sh ./.github/scripts/collect_logs.sh
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

env:
TESTCONTAINERS_RYUK_DISABLED: true

jobs:
build-verify:
name: Verify Application Build
Expand Down
1 change: 0 additions & 1 deletion app/.gitignore

This file was deleted.

8 changes: 2 additions & 6 deletions app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-jwt</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
Expand Down Expand Up @@ -294,8 +290,8 @@
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>redpanda</artifactId>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void run() {

private void refresh() {
Instant now = Instant.now();
if (lastRefresh != null) {
if (lastRefresh != null && this.delegate != null && this.delegate.isReady()) {
List<DynamicConfigPropertyDto> staleConfigProperties = this.getStaleConfigProperties(lastRefresh);
if (!staleConfigProperties.isEmpty()) {
invalidateCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public List<ArtifactTypeInfo> listArtifactTypes() {

}

@Override
@Authorized(style=AuthorizedStyle.None, level=AuthorizedLevel.Admin)
public void triggerSnapshot() {
storage.triggerSnapshotCreation();
}

/**
* @see io.apicurio.registry.rest.v3.AdminResource#listGlobalRules()
*/
Expand Down
50 changes: 18 additions & 32 deletions app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,8 @@
import io.apicurio.registry.model.GA;
import io.apicurio.registry.model.GAV;
import io.apicurio.registry.model.VersionId;
import io.apicurio.registry.storage.dto.ArtifactMetaDataDto;
import io.apicurio.registry.storage.dto.ArtifactReferenceDto;
import io.apicurio.registry.storage.dto.ArtifactSearchResultsDto;
import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto;
import io.apicurio.registry.storage.dto.CommentDto;
import io.apicurio.registry.storage.dto.ContentWrapperDto;
import io.apicurio.registry.storage.dto.DownloadContextDto;
import io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto;
import io.apicurio.registry.storage.dto.EditableGroupMetaDataDto;
import io.apicurio.registry.storage.dto.EditableVersionMetaDataDto;
import io.apicurio.registry.storage.dto.GroupMetaDataDto;
import io.apicurio.registry.storage.dto.GroupSearchResultsDto;
import io.apicurio.registry.storage.dto.OrderBy;
import io.apicurio.registry.storage.dto.OrderDirection;
import io.apicurio.registry.storage.dto.RoleMappingDto;
import io.apicurio.registry.storage.dto.RoleMappingSearchResultsDto;
import io.apicurio.registry.storage.dto.RuleConfigurationDto;
import io.apicurio.registry.storage.dto.SearchFilter;
import io.apicurio.registry.storage.dto.StoredArtifactVersionDto;
import io.apicurio.registry.storage.dto.VersionSearchResultsDto;
import io.apicurio.registry.storage.error.ArtifactAlreadyExistsException;
import io.apicurio.registry.storage.error.ArtifactNotFoundException;
import io.apicurio.registry.storage.error.ContentNotFoundException;
import io.apicurio.registry.storage.error.GroupAlreadyExistsException;
import io.apicurio.registry.storage.error.GroupNotFoundException;
import io.apicurio.registry.storage.error.RegistryStorageException;
import io.apicurio.registry.storage.error.RuleAlreadyExistsException;
import io.apicurio.registry.storage.error.RuleNotFoundException;
import io.apicurio.registry.storage.error.VersionAlreadyExistsException;
import io.apicurio.registry.storage.error.VersionNotFoundException;
import io.apicurio.registry.storage.dto.*;
import io.apicurio.registry.storage.error.*;
import io.apicurio.registry.storage.impexp.EntityInputStream;
import io.apicurio.registry.types.RuleType;
import io.apicurio.registry.utils.impexp.ArtifactBranchEntity;
Expand Down Expand Up @@ -399,7 +371,7 @@ VersionSearchResultsDto searchVersions(String groupId, String artifactId, OrderB
/**
* Gets the stored meta-data for a single version of an artifact. This will return all meta-data for the
* version, including any user edited meta-data along with anything generated by the artifactStore.
*
*
* @param globalId
* @throws VersionNotFoundException
* @throws RegistryStorageException
Expand Down Expand Up @@ -581,7 +553,7 @@ VersionSearchResultsDto searchVersions(String groupId, String artifactId, OrderB
* @param limit the result size limit
*/
RoleMappingSearchResultsDto searchRoleMappings(int offset, int limit) throws RegistryStorageException;

/**
* Gets the details of a single role mapping.
*
Expand Down Expand Up @@ -865,6 +837,20 @@ VersionSearchResultsDto searchVersions(String groupId, String artifactId, OrderB
*/
void deleteArtifactBranch(GA ga, BranchId branchId);

/**
* Triggers a snapshot creation of the internal database.
*
* @throws RegistryStorageException
*/
String triggerSnapshotCreation() throws RegistryStorageException;

/**
* Creates the snapshot of the internal database based on configuration.
*
* @param snapshotLocation
* @throws RegistryStorageException
*/
String createSnapshot(String snapshotLocation) throws RegistryStorageException;

enum ArtifactRetrievalBehavior {
DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,16 @@ public void deleteArtifactBranch(GA ga, BranchId branchId) {
checkReadOnly();
delegate.deleteArtifactBranch(ga, branchId);
}

@Override
public String triggerSnapshotCreation() throws RegistryStorageException {
checkReadOnly();
return delegate.triggerSnapshotCreation();
}

@Override
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
checkReadOnly();
return delegate.createSnapshot(snapshotLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,14 @@ public void createOrReplaceArtifactBranch(GA ga, BranchId branchId, List<Version
public void deleteArtifactBranch(GA ga, BranchId branchId) {
delegate.deleteArtifactBranch(ga, branchId);
}

@Override
public String triggerSnapshotCreation() throws RegistryStorageException {
return delegate.triggerSnapshotCreation();
}

@Override
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
return delegate.createSnapshot(snapshotLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -512,4 +512,14 @@ public GAV getArtifactBranchTip(GA ga, BranchId branchId, ArtifactRetrievalBehav
public List<GAV> getArtifactBranch(GA ga, BranchId branchId, ArtifactRetrievalBehavior behavior) {
return proxy(storage -> storage.getArtifactBranch(ga, branchId, behavior));
}

@Override
public String triggerSnapshotCreation() throws RegistryStorageException {
return proxy((RegistryStorage::triggerSnapshotCreation));
}

@Override
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
return proxy((storage -> storage.createSnapshot(snapshotLocation)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import java.util.Properties;

public interface KafkaSqlConfiguration {

String bootstrapServers();
String topic();
String snapshotsTopic();
String snapshotEvery();
String snapshotLocation();
Properties topicProperties();
boolean isTopicAutoCreate();
Integer pollTimeout();
Integer responseTimeout();
Properties producerProperties();
Properties consumerProperties();
Properties adminProperties();

}
Loading

0 comments on commit 2ab68fa

Please sign in to comment.