Skip to content

Commit

Permalink
Add version and commit information to written values
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Hafner committed Jan 7, 2025
1 parent 46dca98 commit e41a96a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.AlgProducingVisitor.Function3;
import org.polypheny.db.algebra.core.common.Identifier;
import org.polypheny.db.algebra.metadata.AlgMetadataQuery;
import org.polypheny.db.catalog.Catalog;
Expand All @@ -34,6 +35,7 @@
import org.polypheny.db.plan.AlgPlanner;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.transaction.locking.IdentifierUtils;
import org.polypheny.db.transaction.locking.VersionedEntryIdentifier;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.type.entity.document.PolyDocument;
import org.polypheny.db.type.entity.graph.GraphPropertyHolder;
Expand All @@ -42,8 +44,8 @@

public class EnumerableIdentifier extends Identifier implements EnumerableAlg {

protected EnumerableIdentifier( AlgCluster cluster, AlgTraitSet traits, Entity entity, AlgNode input ) {
super( cluster, traits, entity, input );
protected EnumerableIdentifier( AlgCluster cluster, AlgTraitSet traits, long version, Entity entity, AlgNode input ) {
super( cluster, traits, version, entity, input );
assert getConvention() instanceof EnumerableConvention;
}

Expand All @@ -57,7 +59,7 @@ public AlgOptCost computeSelfCost( AlgPlanner planner, AlgMetadataQuery mq ) {

@Override
public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {
return new EnumerableIdentifier( inputs.get( 0 ).getCluster(), traitSet, entity, inputs.get( 0 ) );
return new EnumerableIdentifier( inputs.get( 0 ).getCluster(), traitSet, version, entity, inputs.get( 0 ) );
}


Expand All @@ -70,21 +72,22 @@ public Result implement( EnumerableAlgImplementor implementor, Prefer pref ) {

Expression input_ = builder.append( "input", result.block() );
Expression entityId_ = Expressions.constant( entity.getId() );
Expression version_ = Expressions.constant( version );
Expression identification_ = null;
switch ( input.getModel() ) {
case RELATIONAL -> {
Expression addRelIdentifiers_ = Expressions.call( EnumerableIdentifier.class, "addRelIdentifiers" );
Expression addRelIdentifierWithId_ = Expressions.call( EnumerableIdentifier.class, "bindLogicalId", addRelIdentifiers_, entityId_ );
Expression addRelIdentifierWithId_ = Expressions.call( EnumerableIdentifier.class, "bindMvccConstants", addRelIdentifiers_, entityId_, version_ );
identification_ = builder.append( "identification", Expressions.call( BuiltInMethod.PROCESS_AND_STREAM_RIGHT.method, input_, addRelIdentifierWithId_ ) );
}
case DOCUMENT -> {
Expression addDocIdentifiers_ = Expressions.call( EnumerableIdentifier.class, "addDocIdentifiers" );
Expression addDocIdentifierWithId_ = Expressions.call( EnumerableIdentifier.class, "bindLogicalId", addDocIdentifiers_, entityId_ );
Expression addDocIdentifierWithId_ = Expressions.call( EnumerableIdentifier.class, "bindMvccConstants", addDocIdentifiers_, entityId_, version_ );
identification_ = builder.append( "identification", Expressions.call( BuiltInMethod.PROCESS_AND_STREAM_RIGHT.method, input_, addDocIdentifierWithId_ ) );
}
case GRAPH -> {
Expression addLpgIdentifiers_ = Expressions.call( EnumerableIdentifier.class, "addLpgIdentifiers" );
Expression addLpgIdentifiersWithId_ = Expressions.call( EnumerableIdentifier.class, "bindLogicalId", addLpgIdentifiers_, entityId_ );
Expression addLpgIdentifiersWithId_ = Expressions.call( EnumerableIdentifier.class, "bindMvccConstants", addLpgIdentifiers_, entityId_, version_ );
identification_ = builder.append( "identification", Expressions.call( BuiltInMethod.PROCESS_AND_STREAM_RIGHT.method, input_, addLpgIdentifiersWithId_ ) );
}
}
Expand All @@ -94,71 +97,65 @@ public Result implement( EnumerableAlgImplementor implementor, Prefer pref ) {

}


public static Function1<Enumerable<PolyValue[]>, Enumerable<PolyValue[]>> bindLogicalId(
Function2<Enumerable<PolyValue[]>, Long, Enumerable<PolyValue[]>> function,
long logicalId ) {
return input -> function.apply( input, logicalId );
public static Function1<Enumerable<PolyValue[]>, Enumerable<PolyValue[]>> bindMvccConstants(
Function3<Enumerable<PolyValue[]>, Long, Long, Enumerable<PolyValue[]>> function,
long logicalId,
long versionId ) {
return input -> function.apply( input, logicalId, versionId );
}


public static Function2<Enumerable<PolyValue[]>, Long, Enumerable<PolyValue[]>> addRelIdentifiers() {
return ( input, logicalId ) -> {
public static Function3<Enumerable<PolyValue[]>, Long, Long, Enumerable<PolyValue[]>> addRelIdentifiers() {
return ( input, logicalId, versionId ) -> {
LogicalEntity entity = Catalog.getInstance().getSnapshot()
.getLogicalEntity( logicalId )
.orElseThrow();
return input.select( row -> {
row[0] = entity.getEntryIdentifiers()
.getNextEntryIdentifier()
.getEntryIdentifierAsPolyLong();
row[1] = new PolyLong( IdentifierUtils.MISSING_IDENTIFIER );
VersionedEntryIdentifier identifier = entity.getEntryIdentifiers()
.getNextEntryIdentifier(versionId, false);
row[0] = identifier.getEntryIdentifierAsPolyLong();
row[1] = identifier.getVersionAsPolyLong();
return row;
} );
};
}


public static Function2<Enumerable<PolyValue[]>, Long, Enumerable<PolyValue[]>> addDocIdentifiers() {
return ( input, logicalId ) -> {
public static Function3<Enumerable<PolyValue[]>, Long, Long, Enumerable<PolyValue[]>> addDocIdentifiers() {
return ( input, logicalId, versionId ) -> {
LogicalEntity entity = Catalog.getInstance().getSnapshot()
.getLogicalEntity( logicalId )
.orElseThrow();
return input.select( row -> {
for ( PolyValue value : row ) {
PolyLong entryIdentifier = entity.getEntryIdentifiers()
.getNextEntryIdentifier()
.getEntryIdentifierAsPolyLong();
VersionedEntryIdentifier identifier = entity.getEntryIdentifiers()
.getNextEntryIdentifier(versionId, false);
if ( value instanceof PolyDocument ) {
((PolyDocument) value).put( IdentifierUtils.getIdentifierKeyAsPolyString(), entryIdentifier );
((PolyDocument) value).put( IdentifierUtils.getVersionKeyAsPolyString(), new PolyLong( IdentifierUtils.MISSING_VERSION ) );
((PolyDocument) value).put( IdentifierUtils.getIdentifierKeyAsPolyString(), identifier.getEntryIdentifierAsPolyLong() );
((PolyDocument) value).put( IdentifierUtils.getVersionKeyAsPolyString(), identifier.getVersionAsPolyLong() );
}
}
return row;
} );
};
}


public static Function2<Enumerable<PolyValue[]>, Long, Enumerable<PolyValue[]>> addLpgIdentifiers() {
return ( input, logicalId ) -> {
public static Function3<Enumerable<PolyValue[]>, Long, Long, Enumerable<PolyValue[]>> addLpgIdentifiers() {
return ( input, logicalId, versionId ) -> {
LogicalEntity entity = Catalog.getInstance().getSnapshot()
.getLogicalEntity( logicalId )
.orElseThrow();
return input.select( row -> {
for ( PolyValue value : row ) {
PolyLong entryIdentifier = entity.getEntryIdentifiers()
.getNextEntryIdentifier()
.getEntryIdentifierAsPolyLong();
VersionedEntryIdentifier identifier = entity.getEntryIdentifiers()
.getNextEntryIdentifier(versionId, false);
if ( value instanceof GraphPropertyHolder ) {
((GraphPropertyHolder) value).getProperties()
.put( IdentifierUtils.getIdentifierKeyAsPolyString(), entryIdentifier );
.put( IdentifierUtils.getIdentifierKeyAsPolyString(), identifier.getEntryIdentifierAsPolyLong() );
((GraphPropertyHolder) value).getProperties()
.put( IdentifierUtils.getVersionKeyAsPolyString(), new PolyLong( IdentifierUtils.MISSING_VERSION ) );
.put( IdentifierUtils.getVersionKeyAsPolyString(), identifier.getVersionAsPolyLong() );
}
}
return row;
} );
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public AlgNode convert( AlgNode alg ) {
final Identifier identifier = (Identifier) alg;
final AlgTraitSet traits = identifier.getTraitSet().replace( EnumerableConvention.INSTANCE );
final AlgNode input = convert(identifier.getInput(), identifier.getInput().getTraitSet().replace( EnumerableConvention.INSTANCE ));
return new EnumerableIdentifier( identifier.getCluster(), traits, identifier.getEntity(), input );
return new EnumerableIdentifier( identifier.getCluster(), traits, identifier.getVersion(), identifier.getEntity(), input );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@ public EntryIdentifierRegistry(long entityId, long maxIdentifierValue ) {
this.entityId = entityId;
}

public VersionedEntryIdentifier getNextEntryIdentifier(long version, boolean isCommitted) {
long nextIdentifier = getNextIdentifier();
return new VersionedEntryIdentifier(entityId, nextIdentifier, version, isCommitted );
}


public VersionedEntryIdentifier getNextEntryIdentifier() {
while ( !availableIdentifiers.first().hasNextIdentifier() ) {
availableIdentifiers.pollFirst();
if ( availableIdentifiers.isEmpty() ) {
throw new IllegalStateException( "No identifiers available" );
}
}
long nextIdentifier = availableIdentifiers.first().getNextIdentifier();
return new VersionedEntryIdentifier(entityId, nextIdentifier );
return new VersionedEntryIdentifier(entityId, getNextIdentifier() );
}


public void releaseEntryIdentifiers( Set<Long> identifiers ) {
if ( identifiers.isEmpty() ) {
return;
Expand All @@ -58,6 +55,16 @@ public void releaseEntryIdentifiers( Set<Long> identifiers ) {
}
}

private long getNextIdentifier() {
while ( !availableIdentifiers.first().hasNextIdentifier() ) {
availableIdentifiers.pollFirst();
if ( availableIdentifiers.isEmpty() ) {
throw new IllegalStateException( "No identifiers available" );
}
}
return availableIdentifiers.first().getNextIdentifier();
}


private boolean mergeWithLowerInterval( IdentifierInterval lowerInterval, long currentIdentifier ) {
if ( lowerInterval == null ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public static PolyString getVersionKeyAsPolyString() {
return PolyString.of( IDENTIFIER_KEY );
}


public static void throwIllegalFieldName() {
throw new IllegalArgumentException( MessageFormat.format(
"The field {0} is reserved for internal use and cannot be used.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ public class VersionedEntryIdentifier {
private final long version;


public VersionedEntryIdentifier(long entityId, long entryIdentifier, long version ) {
public VersionedEntryIdentifier( long entityId, long entryIdentifier, long version, boolean isComitted ) {
this.entityId = entityId;
this.entryIdentifier = entryIdentifier;
this.version = version;
this.version = isComitted ? version : version * -1;
}


public VersionedEntryIdentifier(long entityId, long entryIdentifier ) {
public VersionedEntryIdentifier( long entityId, long entryIdentifier ) {
this.entityId = entityId;
this.entryIdentifier = entryIdentifier;
this.version = 0;
this.version = IdentifierUtils.MISSING_VERSION;
}


Expand All @@ -65,6 +65,12 @@ public PolyLong getEntryIdentifierAsPolyLong() {
return PolyLong.of( entryIdentifier );
}


public PolyLong getVersionAsPolyLong() {
return PolyLong.of( version );
}


@Override
public String toString() {
return "VersionedEntryIdentifier{entity=" + entityId + ", entryIdentifier=" + entryIdentifier + ", version=" + version + '}';
Expand Down

0 comments on commit e41a96a

Please sign in to comment.