Skip to content

Commit

Permalink
Fix document id collection
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Hafner committed Dec 31, 2024
1 parent 8d25a70 commit 777adcb
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.polypheny.db.plan.AlgPlanner;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.transaction.TransactionManager;
import org.polypheny.db.transaction.TransactionManagerProvider;
import org.polypheny.db.transaction.locking.IdentifierUtils;
import org.polypheny.db.transaction.locking.VersionedEntryIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {
return new LogicalDocumentAggregate( inputs.get( 0 ).getCluster(), traitSet, inputs.get( 0 ), getGroup().orElse( null ), aggCalls );
}

public LogicalDocumentAggregate copy(List<AlgNode> inputs ) {
return new LogicalDocumentAggregate( inputs.get( 0 ).getCluster(), traitSet, inputs.get( 0 ), getGroup().orElse( null ), aggCalls );
}


@Override
public AlgNode accept( AlgShuttle shuttle ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {
}


public LogicalDocumentFilter copy(List<AlgNode> inputs ) {
return new LogicalDocumentFilter( getCluster(), getTraitSet(), inputs.get(0), condition );
}


@Override
public AlgNode accept( AlgShuttle shuttle ) {
return shuttle.visit( this );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {
return new LogicalDocumentModify( traitSet, entity, inputs.get( 0 ), operation, updates, removes, renames );
}

public LogicalDocumentModify copy(List<AlgNode> inputs ) {
return new LogicalDocumentModify( traitSet, entity, inputs.get( 0 ), operation, updates, removes, renames );
}


@Override
public List<AlgNode> getRelationalEquivalent( List<AlgNode> values, List<Entity> entities, Snapshot snapshot ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public LogicalDocumentProject copy( AlgTraitSet traitSet, List<AlgNode> inputs )
return new LogicalDocumentProject( inputs.get( 0 ).getCluster(), traitSet, inputs.get( 0 ), includes, excludes );
}

public LogicalDocumentProject copy( List<AlgNode> inputs ) {
return new LogicalDocumentProject( inputs.get( 0 ).getCluster(), traitSet, inputs.get( 0 ), includes, excludes );
}


@Override
public AlgNode accept( AlgShuttle shuttle ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {
}


public LogicalDocumentSort copy( List<AlgNode> inputs ) {
return new LogicalDocumentSort( inputs.get( 0 ).getCluster(), traitSet, inputs.get( 0 ), collation, fieldExps, offset, fetch );
}


@Override
public DocType getDocType() {
return DocType.SORT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {
}


public LogicalDocumentTransformer copy( List<AlgNode> inputs ) {
return new LogicalDocumentTransformer( inputs.get( 0 ).getCluster(), inputs, traitSet, rowType );
}


@Override
public AlgNode accept( AlgShuttle shuttle ) {
return shuttle.visit( this );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ public void reset() {

public AlgRoot process( AlgRoot root ) {
AlgNode rootAlg = root.alg.accept( this );

if ( collectorInsertPosition == null ) {
reset();
return root.withAlg( rootAlg );
}
if (!collectorInsertPosition.equals( rootAlg ) ) {
if (!collectorInsertPosition.equals( rootAlg )) {
throw new RuntimeException("Should never throw!");
}
AlgNode newRootAlg = null;
Expand All @@ -110,7 +109,7 @@ public AlgRoot process( AlgRoot root ) {

private List<AlgNode> getLastNNodes( int count ) {
List<AlgNode> result = new ArrayList<>();
for ( Iterator<AlgNode> it = stack.descendingIterator(); it.hasNext(); ) {
for ( Iterator<AlgNode> it = stack.iterator(); it.hasNext(); ) {
AlgNode node = it.next();
if ( count-- <= 0 ) {
break;
Expand All @@ -121,21 +120,21 @@ private List<AlgNode> getLastNNodes( int count ) {
}


private void updateCollectorInsertPosition() {
List<AlgNode> trace = getLastNNodes( 2 );
private void updateCollectorInsertPosition(AlgNode current) {
List<AlgNode> trace = getLastNNodes( 1 );
switch ( trace.size() ) {
case 1:
case 0:
// add collector in front of this node
collectorInsertPosition = trace.get(0);
collectorInsertPosition = current;
break;
case 2:
case 1:
// if previous node is a filter place collector in front of filter
if ( trace.get( 1 ) instanceof Filter) {
collectorInsertPosition = trace.get( 1 );
if ( trace.get( 0 ) instanceof Filter) {
collectorInsertPosition = trace.get( 0 );
return;
}
// else place in front of this
collectorInsertPosition = trace.get( 0 );
collectorInsertPosition = current;
break;
}
}
Expand Down Expand Up @@ -205,7 +204,7 @@ public AlgNode visit( LogicalRelMatch match ) {

@Override
public AlgNode visit( LogicalRelScan scan ) {
updateCollectorInsertPosition();
updateCollectorInsertPosition(scan);
return scan;
}

Expand Down Expand Up @@ -381,6 +380,7 @@ public AlgNode visit( LogicalLpgModify modify ) {

@Override
public AlgNode visit( LogicalLpgScan scan ) {
updateCollectorInsertPosition(scan);
return scan;
}

Expand Down Expand Up @@ -435,56 +435,80 @@ public AlgNode visit( LogicalLpgTransformer transformer ) {

@Override
public AlgNode visit( LogicalDocumentModify modify ) {
switch ( modify.getOperation() ) {
LogicalDocumentModify modify1 = visitChildren( modify );
if ( isTarget( modify1 ) ) {
modify1 = modify1.copy( modifyInputs(modify1.getInputs()) );
}
switch ( modify1.getOperation() ) {
case INSERT:
AlgNode input = modify.getInput();
AlgNode input = modify1.getInput();
LogicalDocIdentifier identifier = LogicalDocIdentifier.create(
modify.getEntity(),
modify1.getEntity(),
input
);
return modify.copy( modify.getTraitSet(), List.of( identifier ) );
return modify1.copy( modify1.getTraitSet(), List.of( identifier ) );
case UPDATE:
IdentifierUtils.throwIfContainsIdentifierKey( modify.getUpdates().keySet() );
return visitChildren( modify );
default:
return visitChildren( modify );
IdentifierUtils.throwIfContainsIdentifierKey( modify1.getUpdates().keySet() );

}
return modify1;
}


@Override
public AlgNode visit( LogicalDocumentAggregate aggregate ) {
return visitChildren( aggregate );
LogicalDocumentAggregate aggregate1 = visitChildren( aggregate );
if ( isTarget( aggregate1 ) ) {
aggregate1 = aggregate1.copy( modifyInputs(aggregate1.getInputs()) );
}
return aggregate1;
}


@Override
public AlgNode visit( LogicalDocumentFilter filter ) {
return visitChildren( filter );
LogicalDocumentFilter filter1 = visitChildren( filter );
if ( isTarget( filter1 ) ) {
filter1 = filter1.copy( modifyInputs(filter1.getInputs()) );
}
return filter1;
}


@Override
public AlgNode visit( LogicalDocumentProject project ) {
return visitChildren( project );
LogicalDocumentProject project1 = visitChildren( project );
if ( isTarget( project1 ) ) {
project1 = project1.copy( modifyInputs(project1.getInputs()) );
}
return project1;
}


@Override
public AlgNode visit( LogicalDocumentScan scan ) {
updateCollectorInsertPosition(scan);
return visitChildren( scan );
}


@Override
public AlgNode visit( LogicalDocumentSort sort ) {
return visitChildren( sort );
LogicalDocumentSort sort1 = visitChildren( sort );
if ( isTarget( sort1 ) ) {
sort1 = sort1.copy( modifyInputs(sort1.getInputs()) );
}
return sort1;
}


@Override
public AlgNode visit( LogicalDocumentTransformer transformer ) {
return visitChildren( transformer );
LogicalDocumentTransformer sort1 = visitChildren( transformer );
if ( isTarget( sort1 ) ) {
sort1 = sort1.copy( modifyInputs(sort1.getInputs()) );
}
return sort1;
}


Expand All @@ -497,7 +521,6 @@ public AlgNode visit( LogicalDocumentValues values ) {
@Override
public AlgNode visit( AlgNode other ) {
return visitChildren( other );

}

}

0 comments on commit 777adcb

Please sign in to comment.