Skip to content

Commit

Permalink
don't merge, adjustment to mongo handling of document model
Browse files Browse the repository at this point in the history
  • Loading branch information
datomo committed Dec 4, 2023
1 parent ed350ca commit bd6e849
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.polypheny.db.plan.AlgOptCluster;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.rex.RexBuilder;
import org.polypheny.db.rex.RexDynamicParam;
import org.polypheny.db.rex.RexLiteral;
import org.polypheny.db.schema.trait.ModelTrait;
import org.polypheny.db.type.PolyType;
Expand All @@ -45,15 +46,24 @@ public abstract class DocumentValues extends AbstractAlgNode implements Document

public final List<PolyDocument> documents;

public final List<RexDynamicParam> dynamicDocuments;



/**
* Creates a {@link DocumentValues}.
* {@link ModelTrait#DOCUMENT} node, which contains values.
*/
public DocumentValues( AlgOptCluster cluster, AlgTraitSet traitSet, List<PolyDocument> documents ) {
this( cluster, traitSet, documents, new ArrayList<>() );
}


public DocumentValues( AlgOptCluster cluster, AlgTraitSet traitSet, List<PolyDocument> documents, List<RexDynamicParam> dynamicDocuments ) {
super( cluster, traitSet );
this.rowType = DocumentType.ofId();
this.documents = validate( documents );
this.dynamicDocuments = dynamicDocuments;
}


Expand All @@ -76,7 +86,7 @@ protected static List<PolyDocument> validate( List<PolyDocument> docs ) {


public boolean isPrepared() {
return documents.size() == 1 && documents.get( 0 ).asDocument().size() == 1 && documents.get( 0 ).asDocument().containsKey( PolyString.of( DocumentType.DOCUMENT_ID ) );
return !dynamicDocuments.isEmpty();//documents.size() == 1 && documents.get( 0 ).asDocument().size() == 1 && documents.get( 0 ).asDocument().containsKey( PolyString.of( DocumentType.DOCUMENT_ID ) );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.polypheny.db.plan.AlgOptCluster;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.plan.Convention;
import org.polypheny.db.rex.RexDynamicParam;
import org.polypheny.db.type.PolyType;
import org.polypheny.db.type.entity.document.PolyDocument;

Expand Down Expand Up @@ -65,6 +66,11 @@ public LogicalDocumentValues( AlgOptCluster cluster, AlgTraitSet traitSet, List<
}


public LogicalDocumentValues( AlgOptCluster cluster, AlgTraitSet traitSet, List<PolyDocument> documents, List<RexDynamicParam> dynamicDocuments ) {
super( cluster, traitSet, documents, dynamicDocuments );
}


public static AlgNode create( AlgOptCluster cluster, List<PolyDocument> documents ) {
final AlgTraitSet traitSet = cluster.traitSetOf( Convention.NONE );
return new LogicalDocumentValues( cluster, traitSet, documents );
Expand All @@ -82,6 +88,12 @@ public static LogicalDocumentValues createOneTuple( AlgOptCluster cluster ) {
}


public static AlgNode createDynamic( AlgOptCluster cluster, List<RexDynamicParam> ids ) {
final AlgTraitSet traitSet = cluster.traitSetOf( Convention.NONE );
return new LogicalDocumentValues( cluster, traitSet, List.of( new PolyDocument() ), ids );
}


@Override
public DataModel getModel() {
return DataModel.DOCUMENT;
Expand All @@ -92,7 +104,7 @@ public DataModel getModel() {
public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {
assert traitSet.containsIfApplicable( Convention.NONE );
assert inputs.isEmpty();
return new LogicalDocumentValues( getCluster(), traitSet, documents );
return new LogicalDocumentValues( getCluster(), traitSet, documents, dynamicDocuments );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.lang.reflect.Type;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -1083,18 +1082,7 @@ private Pair<AlgRoot, AlgDataType> parameterize( AlgRoot routedRoot, AlgDataType
AlgDataType newParameterRowType = statement.getTransaction().getTypeFactory().createStructType(
types.stream().map( t -> 1L ).collect( Collectors.toList() ),
types,
new AbstractList<>() {
@Override
public String get( int index ) {
return "?" + index;
}


@Override
public int size() {
return types.size();
}
} );
IntStream.range( 0, types.size() ).mapToObj( i -> "?" + i ).collect( Collectors.toList() ) );

return new Pair<>(
new AlgRoot( parameterized, routedRoot.validatedRowType, routedRoot.kind, routedRoot.fields, routedRoot.collation ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -322,11 +321,11 @@ public AlgNode visitAsymmetricModify( LogicalRelModify initial ) {
if ( input instanceof LogicalValues ) {
List<RexNode> projects = new ArrayList<>();
boolean firstRow = true;
HashMap<Integer, Integer> idxMapping = new HashMap<>();
Map<Integer, Integer> idxMapping = new HashMap<>();
this.batchSize = ((LogicalValues) input).tuples.size();

int entires = docs.size();
HashMap<Integer, List<ParameterValue>> doc = new HashMap<>();
Map<Integer, List<ParameterValue>> doc = new HashMap<>();

for ( ImmutableList<RexLiteral> node : ((LogicalValues) input).getTuples() ) {
int i = 0;
Expand Down Expand Up @@ -377,21 +376,21 @@ public AlgNode visitAsymmetricModify( LogicalRelModify initial ) {

@Override
public AlgNode visit( LogicalDocumentModify initial ) {
LogicalDocumentModify modify = (LogicalDocumentModify) super.visit( initial );
List<RexNode> newSourceExpression = null;
//if ( modify.getUpdates() != null ) {
newSourceExpression = new ArrayList<>();
for ( RexNode node : modify.getUpdates().values() ) {
newSourceExpression.add( node.accept( this ) );
}
//}
LogicalDocumentModify modify = super.visit( initial ).unwrap( LogicalDocumentModify.class ).orElseThrow();
List<RexNode> newSourceExpression;

newSourceExpression = new ArrayList<>();
for ( RexNode node : modify.getUpdates().values() ) {
newSourceExpression.add( node.accept( this ) );
}

AlgNode input = modify.getInput();
if ( input instanceof LogicalDocumentValues ) {
Map<String, RexNode> projects = new HashMap<>();
if ( input.unwrap( LogicalDocumentValues.class ).isPresent() ) {
Map<String, RexDynamicParam> projects = new HashMap<>();
boolean firstRow = true;
Map<Integer, Integer> idxMapping = new HashMap<>();
this.batchSize = ((LogicalDocumentValues) input).documents.size();
for ( PolyValue node : ((LogicalDocumentValues) input).documents ) {
for ( PolyValue node : input.unwrap( LogicalDocumentValues.class ).orElseThrow().documents ) {
int i = 0;
int idx;

Expand All @@ -408,17 +407,13 @@ public AlgNode visit( LogicalDocumentModify initial ) {
}
if ( !values.containsKey( idx ) ) {
types.add( type );
values.put( idx, new ArrayList<>( ((LogicalDocumentValues) input).documents.size() ) );
values.put( idx, new ArrayList<>( input.unwrap( LogicalDocumentValues.class ).orElseThrow().documents.size() ) );
}
values.get( idx ).add( new ParameterValue( idx, type, node ) );

firstRow = false;
}
input = LogicalDocumentValues.createOneTuple( input.getCluster() );
/*input = LogicalDocumentProject.create(
logicalValues,
projects,
List.of() );*/
input = LogicalDocumentValues.createDynamic( input.getCluster(), List.copyOf( projects.values() ) );
}
return new LogicalDocumentModify(
modify.getTraitSet(),
Expand Down Expand Up @@ -472,7 +467,7 @@ public RexNode visitCall( RexCall call ) {
types.add( call.type );
return new RexDynamicParam( call.type, i );
} else {
List<RexNode> newOperands = new LinkedList<>();
List<RexNode> newOperands = new ArrayList<>();
for ( RexNode operand : call.operands ) {
if ( operand instanceof RexLiteral && ((RexLiteral) operand).getPolyType() == PolyType.SYMBOL ) {
// Do not replace with dynamic param
Expand Down Expand Up @@ -503,7 +498,7 @@ private PolyList<PolyValue> createListForArrays( List<RexNode> operands ) {

@Override
public RexNode visitOver( RexOver over ) {
List<RexNode> newOperands = new LinkedList<>();
List<RexNode> newOperands = new ArrayList<>();
for ( RexNode operand : over.operands ) {
newOperands.add( operand.accept( this ) );
}
Expand Down Expand Up @@ -537,7 +532,7 @@ public RexNode visitFieldAccess( RexFieldAccess fieldAccess ) {

@Override
public RexNode visitSubQuery( RexSubQuery subQuery ) {
List<RexNode> newOperands = new LinkedList<>();
List<RexNode> newOperands = new ArrayList<>();
for ( RexNode operand : subQuery.operands ) {
newOperands.add( operand.accept( this ) );
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/test/java/org/polypheny/db/mql/AggregateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void groupSubFieldTest() {
"key" );
insertMany( DATA_0 );

DocResult result = aggregate( $group( "{\"_id\":\"$key.key\"}" ) );
DocResult result = aggregate( $group( document( kv( string( "_id" ), string( "$key.key" ) ) ) ) );

MongoConnection.checkDocResultSet( result, expected, false, true );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ public Enumerable<PolyValue[]> aggregate(
} else {
// prepared
preOps.stream()
.map( op -> new MongoDynamic( new BsonDocument( "$addFields", op ), mongoNamespace.getBucket(), dataContext ) )
.map( op -> new MongoDynamic( new BsonDocument( "$addFields", op ), mongoNamespace.getBucket(), dataContext, DataModel.DOCUMENT ) )
.forEach( util -> list.add( util.insert( parameterValues ) ) );

for ( String operation : operations ) {
MongoDynamic opUtil = new MongoDynamic( BsonDocument.parse( operation ), getMongoNamespace().getBucket(), dataContext );
MongoDynamic opUtil = new MongoDynamic( BsonDocument.parse( operation ), getMongoNamespace().getBucket(), dataContext, DataModel.DOCUMENT );
list.add( opUtil.insert( parameterValues ) );
}
}
Expand Down Expand Up @@ -517,7 +517,7 @@ private long doDML( Operation operation, String filter, List<String> operations,
if ( !dataContext.getParameterValues().isEmpty() ) {
assert operations.size() == 1;
// prepared
MongoDynamic util = new MongoDynamic( BsonDocument.parse( operations.get( 0 ) ), bucket, dataContext );
MongoDynamic util = new MongoDynamic( BsonDocument.parse( operations.get( 0 ) ), bucket, dataContext, getEntity().getDataModel() );
List<Document> inserts = util.getAll( dataContext.getParameterValues() );
entity.getCollection().insertMany( session, inserts );
return inserts.size();
Expand All @@ -533,8 +533,8 @@ private long doDML( Operation operation, String filter, List<String> operations,
// we use only update docs
if ( !dataContext.getParameterValues().isEmpty() ) {
// prepared we use document update not pipeline
MongoDynamic filterUtil = new MongoDynamic( BsonDocument.parse( filter ), bucket, dataContext );
MongoDynamic docUtil = new MongoDynamic( BsonDocument.parse( operations.get( 0 ) ), bucket, dataContext );
MongoDynamic filterUtil = new MongoDynamic( BsonDocument.parse( filter ), bucket, dataContext, getEntity().getDataModel() );
MongoDynamic docUtil = new MongoDynamic( BsonDocument.parse( operations.get( 0 ) ), bucket, dataContext, getEntity().getDataModel() );
for ( Map<Long, PolyValue> parameterValue : dataContext.getParameterValues() ) {
if ( onlyOne ) {
if ( needsDocument ) {
Expand Down Expand Up @@ -582,7 +582,7 @@ private long doDML( Operation operation, String filter, List<String> operations,
case DELETE:
if ( !dataContext.getParameterValues().isEmpty() ) {
// prepared
MongoDynamic filterUtil = new MongoDynamic( BsonDocument.parse( filter ), bucket, dataContext );
MongoDynamic filterUtil = new MongoDynamic( BsonDocument.parse( filter ), bucket, dataContext, getEntity().getDataModel() );
List<? extends WriteModel<Document>> filters;
if ( onlyOne ) {
filters = filterUtil.getAll( dataContext.getParameterValues(), DeleteOneModel::new );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public List<PhysicalEntity> createCollection( Context context, LogicalCollection
public void dropCollection( Context context, AllocationCollection allocation ) {
commitAll();
context.getStatement().getTransaction().registerInvolvedAdapter( this );
PhysicalCollection collection = storeCatalog.fromAllocation( allocation.id, PhysicalCollection.class );
PhysicalEntity collection = storeCatalog.fromAllocation( allocation.id, PhysicalEntity.class );
this.currentNamespace.database.getCollection( collection.name ).drop();

storeCatalog.removeAllocAndPhysical( allocation.id );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.bson.BsonDocument;
import org.polypheny.db.adapter.mongodb.MongoAlg;
import org.polypheny.db.adapter.mongodb.MongoEntity;
import org.polypheny.db.adapter.mongodb.bson.BsonDynamic;
import org.polypheny.db.adapter.mongodb.rules.MongoRules.MongoDocuments;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.core.document.DocumentModify;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.rex.RexNode;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.util.Pair;

public class MongoDocumentModify extends DocumentModify<MongoEntity> implements MongoAlg {

Expand Down Expand Up @@ -70,7 +72,11 @@ public void implement( Implementor implementor ) {


private void handleDelete( Implementor implementor ) {
throw new NotImplementedException();
if ( updates.isEmpty() ) {
implementor.list.add( Pair.of( null, "{}" ) );
} else {
throw new NotImplementedException();
}
}


Expand All @@ -93,6 +99,13 @@ public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {


private void handleInsert( Implementor implementor, MongoDocuments documents ) {
if ( documents.isPrepared() ) {
implementor.operations = documents.dynamicDocuments
.stream()
.map( BsonDynamic::new )
.collect( Collectors.toList() );
return;
}
implementor.operations = documents.documents
.stream()
.filter( PolyValue::isDocument )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ public AlgNode convert( AlgNode alg ) {
return new MongoDocuments(
alg.getCluster(),
documentValues.documents,
documentValues.dynamicDocuments,
alg.getTraitSet().replace( out )
);

Expand All @@ -525,8 +526,8 @@ public AlgNode convert( AlgNode alg ) {
public static class MongoDocuments extends DocumentValues implements MongoAlg {


public MongoDocuments( AlgOptCluster cluster, List<PolyDocument> documentTuples, AlgTraitSet traitSet ) {
super( cluster, traitSet, documentTuples );
public MongoDocuments( AlgOptCluster cluster, List<PolyDocument> documentTuples, List<RexDynamicParam> dynamicParams, AlgTraitSet traitSet ) {
super( cluster, traitSet, documentTuples, dynamicParams );
}


Expand All @@ -538,7 +539,7 @@ public void implement( Implementor implementor ) {

@Override
public AlgNode copy( AlgTraitSet traitSet, List<AlgNode> inputs ) {
return new MongoDocuments( getCluster(), documents, traitSet );
return new MongoDocuments( getCluster(), documents, dynamicDocuments, traitSet );
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.polypheny.db.adapter.DataContext;
import org.polypheny.db.adapter.mongodb.bson.BsonDynamic;
import org.polypheny.db.adapter.mongodb.bson.BsonFunctionHelper;
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.type.PolyType;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.util.BsonUtil;
Expand Down Expand Up @@ -67,12 +68,17 @@ public class MongoDynamic {
private final Map<Long, String> keyMap = new HashMap<>();


public MongoDynamic( BsonDocument document, GridFSBucket bucket, DataContext dataContext ) {
public MongoDynamic( BsonDocument document, GridFSBucket bucket, DataContext dataContext, DataModel dataModel ) {
this.dataContext = dataContext;
this.document = document.clone();
this.bucket = bucket;
this.isProject = !document.isEmpty() && document.getFirstKey().equals( "$project" );
this.document.forEach( ( k, bsonValue ) -> replaceDynamic( bsonValue, this.document, k, true, false ) );
if ( dataModel == DataModel.RELATIONAL ) {
this.document.forEach( ( k, bsonValue ) -> replaceDynamic( bsonValue, this.document, k, true, false ) );
} else {
handleDynamic( this.document, new BsonString( "" ), "", true, false );
}

}


Expand Down

0 comments on commit bd6e849

Please sign in to comment.