Skip to content

Commit

Permalink
Implement basic commit validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Hafner committed Jan 9, 2025
1 parent 79f64c3 commit ba0fea4
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.polypheny.db.processing.QueryContext.ParsedQueryContext;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.transaction.TransactionManager;
import org.polypheny.db.transaction.locking.AlgTreeRewriter;
import org.polypheny.db.util.DeadlockException;
import org.polypheny.db.util.Pair;
Expand Down Expand Up @@ -186,7 +185,9 @@ public List<ImplementationContext> anyPrepareQuery( QueryContext context, Statem
}

AlgRoot root = processor.translate( statement, parsed );
root = new AlgTreeRewriter( transaction ).process( root );
if ( !context.isMvccInternal ) {
root = new AlgTreeRewriter( transaction ).process( root );
}

if ( transaction.isAnalyze() ) {
statement.getOverviewDuration().stop( "Translation" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ public class QueryContext {
@Builder.Default
boolean usesCache = true;

@Builder.Default
public boolean isMvccInternal = false;

@Builder.Default
long userId = Catalog.defaultUserId;

@Builder.Default
Statement statement = null;


@NotNull
String origin;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import org.polypheny.db.PolyImplementation;
import org.polypheny.db.ResultIterator;
import org.polypheny.db.adapter.Adapter;
import org.polypheny.db.adapter.index.IndexManager;
import org.polypheny.db.adapter.java.JavaTypeFactory;
Expand All @@ -52,17 +53,21 @@
import org.polypheny.db.catalog.snapshot.Snapshot;
import org.polypheny.db.config.RuntimeConfig;
import org.polypheny.db.information.InformationManager;
import org.polypheny.db.languages.LanguageManager;
import org.polypheny.db.languages.QueryLanguage;
import org.polypheny.db.monitoring.core.MonitoringServiceProvider;
import org.polypheny.db.monitoring.events.StatementEvent;
import org.polypheny.db.prepare.JavaTypeFactoryImpl;
import org.polypheny.db.processing.ConstraintEnforceAttacher;
import org.polypheny.db.processing.DataMigrator;
import org.polypheny.db.processing.DataMigratorImpl;
import org.polypheny.db.processing.ImplementationContext;
import org.polypheny.db.processing.Processor;
import org.polypheny.db.processing.QueryContext;
import org.polypheny.db.processing.QueryProcessor;
import org.polypheny.db.transaction.locking.Lockable;
import org.polypheny.db.transaction.locking.MonotonicNumberSource;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.type.entity.category.PolyNumber;
import org.polypheny.db.util.DeadlockException;
import org.polypheny.db.util.Pair;
Expand Down Expand Up @@ -180,6 +185,7 @@ public void commit() throws TransactionException {

if ( !writtenEntities.isEmpty() ) {
okToCommit &= validateWriteSet();
updateWrittenVersionIds();
}

Pair<Boolean, String> isValid = catalog.checkIntegrity();
Expand Down Expand Up @@ -242,8 +248,6 @@ public void commit() throws TransactionException {
// Free resources hold by statements
statements.forEach( Statement::close );

updateCommitInstantLog();

// Release locks
releaseAllLocks();
// Remove transaction
Expand All @@ -255,40 +259,71 @@ public void commit() throws TransactionException {
}

private boolean validateWriteSet() {
/*
ToDo TH: get the write set based on the transaction id and the written entities and compare to other comitted entities
pseudocode:
for each entity in writtenEntities:
max = String query = """
SELECT MAX(_vid) AS max_vid
FROM entity
WHERE _eid IN (
SELECT _eid FROM main_table WHERE _vid = ?

String queryTemplate = """
SELECT MAX(_vid) AS max_vid
FROM %s
WHERE _eid IN (
SELECT _eid FROM %s WHERE _vid = %d
)
""";

if (max >= TxId)
return false
long maxVersion = 0;

for (Entity writtenEntity : writtenEntities) {
String query = String.format(queryTemplate, writtenEntity.getName(), writtenEntity.getName(), getSequenceNumber());
ImplementationContext context = LanguageManager.getINSTANCE().anyPrepareQuery(
QueryContext.builder()
.query( query )
.language( QueryLanguage.from( "sql" ) )
.origin( this.getOrigin() )
.namespaceId( writtenEntity.getNamespaceId() )
.transactionManager( this.getTransactionManager() )
.isMvccInternal( true )
.build(), this ).get( 0 );

if ( context.getException().isPresent() ) {
//ToDo TH: properly handle this
throw new RuntimeException( context.getException().get() );
}

return true
*/
ResultIterator iterator = context.execute( context.getStatement() ).getIterator();
List<List<PolyValue>> res = iterator.getNextBatch();
maxVersion = Math.max(maxVersion, res.get(0).get(0).asLong().getValue() ); // Make this save
iterator.close();
}

return true;
return maxVersion <= getSequenceNumber();
}

private void updateCommitInstantLog() {
/*
ToDo TH: update the vids of each written entity
1) get read set as parameter for efficiency
2) flip the sign of each of the -vid entries to vid
private void updateWrittenVersionIds() {
String queryTemplate = """
UPDATE %s
SET _vid = %d
WHERE _vid = %d
""";

long commitSequenceNumber = MonotonicNumberSource.getInstance().getNextNumber();

for (Entity writtenEntity : writtenEntities) {
String query = String.format(queryTemplate, writtenEntity.getName(), commitSequenceNumber, -getSequenceNumber());

ImplementationContext context = LanguageManager.getINSTANCE().anyPrepareQuery(
QueryContext.builder()
.query(query)
.language(QueryLanguage.from("sql"))
.origin(this.getOrigin())
.namespaceId(writtenEntity.getNamespaceId())
.transactionManager(this.getTransactionManager())
.isMvccInternal(true)
.build(), this).get(0);

if (context.getException().isPresent()) {
throw new RuntimeException("Query preparation failed: " + context.getException().get());
}

pseudocode:
for each entity in writeSet:
UPDATE entity
SET _vid = TxCommitTimestamp
WHERE _vid = -TxID;
*/
context.execute(context.getStatement());
}
}


Expand Down

0 comments on commit ba0fea4

Please sign in to comment.