Skip to content

Commit

Permalink
Add support for rollup instances level hints
Browse files Browse the repository at this point in the history
This allows survey versions to be rolled up to questions (and topics).
Currently there is only support for a single hierarchy but that
limitation exists elsewhere already.

This will require re-indexing once the config has been updated.
  • Loading branch information
Timothy Jennison authored and tjennison-work committed Nov 28, 2024
1 parent 02dce51 commit 1005c12
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 40 deletions.
5 changes: 5 additions & 0 deletions docs/generated/UNDERLAY_CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,11 @@ Names of attributes that we want to calculate instance-level hints for.

Instance-level hints are ranges of possible values for a particular criteria instance. They are used to support criteria-specific modifiers (e.g. range of values for measurement code "glucose test").

### SZOccurrenceEntity.attributesWithRollupInstanceLevelHints
**required** Set [ String ]

Names of attributes that we want to calculate instance-level hints for which values should be rolled up and included in their ancestors hints as well.

### SZOccurrenceEntity.criteriaRelationship
**required** [SZCriteriaRelationship](#szcriteriarelationship)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import bio.terra.tanagra.underlay.Underlay;
import bio.terra.tanagra.underlay.entitymodel.Attribute;
import bio.terra.tanagra.underlay.entitymodel.Entity;
import bio.terra.tanagra.underlay.entitymodel.Hierarchy;
import bio.terra.tanagra.underlay.entitymodel.Relationship;
import bio.terra.tanagra.underlay.entitymodel.entitygroup.CriteriaOccurrence;
import bio.terra.tanagra.underlay.entitymodel.entitygroup.EntityGroup;
Expand Down Expand Up @@ -421,6 +422,12 @@ public static SequencedJobSet getJobSetForCriteriaOccurrence(
// TODO: Handle >1 occurrence entity.
Entity occurrenceEntity = criteriaOccurrence.getOccurrenceEntities().get(0);
if (criteriaOccurrence.hasInstanceLevelDisplayHints(occurrenceEntity)) {
// TODO: Handle >1 hierarchy.
Hierarchy hierarchy =
criteriaOccurrence.getCriteriaEntity().hasHierarchies()
? criteriaOccurrence.getCriteriaEntity().getHierarchies().get(0)
: null;

Relationship occurrenceCriteriaRelationship =
criteriaOccurrence.getOccurrenceCriteriaRelationship(occurrenceEntity.getName());
Relationship occurrencePrimaryRelationship =
Expand Down Expand Up @@ -458,7 +465,14 @@ public static SequencedJobSet getJobSetForCriteriaOccurrence(
.getInstanceLevelDisplayHints(
criteriaOccurrence.getName(),
occurrenceEntity.getName(),
criteriaOccurrence.getCriteriaEntity().getName())));
criteriaOccurrence.getCriteriaEntity().getName()),
hierarchy,
hierarchy != null
? underlay
.getIndexSchema()
.getHierarchyAncestorDescendant(
criteriaOccurrence.getCriteriaEntity().getName(), hierarchy.getName())
: null));
}

if (criteriaOccurrence.getCriteriaEntity().hasHierarchies()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,30 @@
import bio.terra.tanagra.api.shared.DataType;
import bio.terra.tanagra.indexing.job.BigQueryJob;
import bio.terra.tanagra.indexing.job.dataflow.beam.BigQueryBeamUtils;
import bio.terra.tanagra.indexing.job.dataflow.beam.CountUtils;
import bio.terra.tanagra.indexing.job.dataflow.beam.DataflowUtils;
import bio.terra.tanagra.query.sql.SqlField;
import bio.terra.tanagra.query.sql.SqlQueryField;
import bio.terra.tanagra.underlay.entitymodel.Attribute;
import bio.terra.tanagra.underlay.entitymodel.Entity;
import bio.terra.tanagra.underlay.entitymodel.Hierarchy;
import bio.terra.tanagra.underlay.entitymodel.Relationship;
import bio.terra.tanagra.underlay.entitymodel.entitygroup.CriteriaOccurrence;
import bio.terra.tanagra.underlay.indextable.ITEntityMain;
import bio.terra.tanagra.underlay.indextable.ITHierarchyAncestorDescendant;
import bio.terra.tanagra.underlay.indextable.ITInstanceLevelDisplayHints;
import bio.terra.tanagra.underlay.indextable.ITRelationshipIdPairs;
import bio.terra.tanagra.underlay.serialization.SZIndexer;
import com.google.api.services.bigquery.model.TableRow;
import jakarta.annotation.Nullable;
import java.io.Serializable;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
Expand All @@ -48,6 +53,8 @@ public class WriteInstanceLevelDisplayHints extends BigQueryJob {
private final @Nullable ITRelationshipIdPairs occurrenceCriteriaRelationshipIdPairsTable;
private final @Nullable ITRelationshipIdPairs occurrencePrimaryRelationshipIdPairsTable;
private final ITInstanceLevelDisplayHints indexTable;
private final @Nullable Hierarchy hierarchy;
private final @Nullable ITHierarchyAncestorDescendant ancestorDescendantTable;

@SuppressWarnings("checkstyle:ParameterNumber")
public WriteInstanceLevelDisplayHints(
Expand All @@ -59,7 +66,9 @@ public WriteInstanceLevelDisplayHints(
ITEntityMain primaryEntityIndexTable,
@Nullable ITRelationshipIdPairs occurrenceCriteriaRelationshipIdPairsTable,
@Nullable ITRelationshipIdPairs occurrencePrimaryRelationshipIdPairsTable,
ITInstanceLevelDisplayHints indexTable) {
ITInstanceLevelDisplayHints indexTable,
@Nullable Hierarchy hierarchy,
@Nullable ITHierarchyAncestorDescendant ancestorDescendantTable) {
super(indexerConfig);
this.criteriaOccurrence = criteriaOccurrence;
this.occurrenceEntity = occurrenceEntity;
Expand All @@ -69,6 +78,8 @@ public WriteInstanceLevelDisplayHints(
this.occurrenceCriteriaRelationshipIdPairsTable = occurrenceCriteriaRelationshipIdPairsTable;
this.occurrencePrimaryRelationshipIdPairsTable = occurrencePrimaryRelationshipIdPairsTable;
this.indexTable = indexTable;
this.hierarchy = hierarchy;
this.ancestorDescendantTable = ancestorDescendantTable;
}

@Override
Expand Down Expand Up @@ -119,8 +130,8 @@ public void run(boolean isDryRun) {
readInRelationshipIdPairs(
pipeline, occCriIdPairsSql, entityAIdColumnName, entityBIdColumnName);

// Build a query to select all occurrence-criteria id pairs, and the pipeline steps to read the
// results and build a (occurrence id, criteria id) KV PCollection.
// Build a query to select all occurrence-primary id pairs, and the pipeline steps to read the
// results and build a (occurrence id, primary id) KV PCollection.
String occPriIdPairsSql =
getQueryRelationshipIdPairs(
entityAIdColumnName,
Expand All @@ -134,17 +145,32 @@ public void run(boolean isDryRun) {
readInRelationshipIdPairs(
pipeline, occPriIdPairsSql, entityAIdColumnName, entityBIdColumnName);

PCollection<KV<Long, Long>> rollupOccCriIdPairKVs = null;
if (hierarchy != null
&& criteriaOccurrence.hasRollupInstanceLevelDisplayHints(occurrenceEntity)) {
PCollection<KV<Long, Long>> descendantAncestorRelationshipsPC =
BigQueryBeamUtils.readDescendantAncestorRelationshipsFromBQ(
pipeline, ancestorDescendantTable);

// Expand the set of occurrences to include a repeat for each ancestor.
rollupOccCriIdPairKVs =
CountUtils.repeatOccurrencesForHints(occCriIdPairKVs, descendantAncestorRelationshipsPC);
}
final PCollection<KV<Long, Long>> finalRollupOccCriIdPairKVs = rollupOccCriIdPairKVs;

criteriaOccurrence
.getAttributesWithInstanceLevelDisplayHints(occurrenceEntity)
.forEach(
attribute -> {
(attribute, rollup) -> {
PCollection<KV<Long, Long>> idPairsKVs =
rollup ? finalRollupOccCriIdPairKVs : occCriIdPairKVs;
if (attribute.isValueDisplay()) {
LOGGER.info("enum val hint: {}", attribute.getName());
enumValHint(occCriIdPairKVs, occPriIdPairKVs, occIdRowKVs, attribute);
enumValHint(idPairsKVs, occPriIdPairKVs, occIdRowKVs, attribute);
} else if (DataType.INT64.equals(attribute.getDataType())
|| DataType.DOUBLE.equals(attribute.getDataType())) {
LOGGER.info("numeric range hint: {}", attribute.getName());
numericRangeHint(occCriIdPairKVs, occIdRowKVs, attribute);
numericRangeHint(idPairsKVs, occIdRowKVs, attribute);
} // TODO: Calculate display hints for other data types.
});

Expand Down Expand Up @@ -287,13 +313,15 @@ private void numericRangeHint(
occIdAndNumValCriId
.apply(Filter.by(cogb -> cogb.getValue().getAll(numValTag).iterator().hasNext()))
.apply(
MapElements.into(
FlatMapElements.into(
TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.doubles()))
.via(
cogb ->
KV.of(
cogb.getValue().getOnly(criIdTag),
cogb.getValue().getOnly(numValTag))));
cogb -> {
Iterable<Long> criIds = cogb.getValue().getAll(criIdTag);
return StreamSupport.stream(criIds.spliterator(), false)
.map((Long criId) -> KV.of(criId, cogb.getValue().getOnly(numValTag)))
.toList();
}));

// Compute numeric range for each criteriaId.
PCollection<IdNumericRange> numericRanges = numericRangeHint(criteriaValuePairs);
Expand Down Expand Up @@ -361,23 +389,28 @@ private void enumValHint(
.and(criIdTag, occCriIdPairs)
.and(priIdTag, occPriIdPairs)
.apply(CoGroupByKey.create());

PCollection<KV<IdEnumValue, Long>> criteriaEnumPrimaryPairs =
occIdAndAttrsCriIdPriId
.apply(Filter.by(cogb -> cogb.getValue().getAll(occAttrsTag).iterator().hasNext()))
.apply(
MapElements.into(
FlatMapElements.into(
TypeDescriptors.kvs(
new TypeDescriptor<IdEnumValue>() {}, TypeDescriptors.longs()))
.via(
cogb -> {
Long criId = cogb.getValue().getOnly(criIdTag);
Iterable<Long> criIds = cogb.getValue().getAll(criIdTag);
Long priId = cogb.getValue().getOnly(priIdTag);

TableRow occAttrs = cogb.getValue().getOnly(occAttrsTag);
String enumValue = (String) occAttrs.get(enumValColName);
String enumDisplay = (String) occAttrs.get(enumDisplayColName);

return KV.of(new IdEnumValue(criId, enumValue, enumDisplay), priId);
return StreamSupport.stream(criIds.spliterator(), false)
.map(
(Long criId) ->
KV.of(new IdEnumValue(criId, enumValue, enumDisplay), priId))
.toList();
}));

// Compute enum values and counts for each criteriaId.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,13 @@ private void writeFieldsToTempTable(boolean isDryRun) {

// Optionally handle a hierarchy for the rollup entity.
if (hierarchy != null) {
// Build a query to select all ancestor-descendant pairs from the ancestor-descendant table,
// and the pipeline step to read the results.
String ancestorDescendantSql =
"SELECT * FROM " + ancestorDescendantTable.getTablePointer().render();
LOGGER.info("ancestor-descendant query: {}", ancestorDescendantSql);
PCollection<KV<Long, Long>> ancestorDescendantRelationshipsPC =
BigQueryBeamUtils.readTwoFieldRowsFromBQ(
pipeline,
ancestorDescendantSql,
ITHierarchyAncestorDescendant.Column.DESCENDANT.getSchema().getColumnName(),
ITHierarchyAncestorDescendant.Column.ANCESTOR.getSchema().getColumnName());
PCollection<KV<Long, Long>> descendantAncestorRelationshipsPC =
BigQueryBeamUtils.readDescendantAncestorRelationshipsFromBQ(
pipeline, ancestorDescendantTable);

// Expand the set of occurrences to include a repeat for each ancestor.
idPairsPC =
CountUtils.repeatOccurrencesForHierarchy(idPairsPC, ancestorDescendantRelationshipsPC);
CountUtils.repeatOccurrencesForHierarchy(idPairsPC, descendantAncestorRelationshipsPC);
}

// Count the number of distinct occurrences per entity id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import bio.terra.tanagra.api.shared.DataType;
import bio.terra.tanagra.exception.SystemException;
import bio.terra.tanagra.underlay.ColumnSchema;
import bio.terra.tanagra.underlay.indextable.ITHierarchyAncestorDescendant;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
Expand All @@ -20,8 +21,11 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BigQueryBeamUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryBeamUtils.class);

private BigQueryBeamUtils() {}

Expand Down Expand Up @@ -65,6 +69,22 @@ public static PCollection<KV<Long, Long>> readTwoFieldRowsFromBQ(
}));
}

/**
* Build a query to select all descendant-ancestor pairs from the ancestor-descendant table, and
* the pipeline step to read the results.
*/
public static PCollection<KV<Long, Long>> readDescendantAncestorRelationshipsFromBQ(
Pipeline pipeline, ITHierarchyAncestorDescendant ancestorDescendantTable) {
String descendantAncestorSql =
"SELECT * FROM " + ancestorDescendantTable.getTablePointer().render();
LOGGER.info("descendant-ancestor query: {}", descendantAncestorSql);
return BigQueryBeamUtils.readTwoFieldRowsFromBQ(
pipeline,
descendantAncestorSql,
ITHierarchyAncestorDescendant.Column.DESCENDANT.getSchema().getColumnName(),
ITHierarchyAncestorDescendant.Column.ANCESTOR.getSchema().getColumnName());
}

public static String getTableSqlPath(String projectId, String datasetId, String tableName) {
final String template = "${projectId}:${datasetId}.${tableName}";
Map<String, String> params =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.KvSwap;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
Expand Down Expand Up @@ -136,4 +137,53 @@ public static PCollection<KV<Long, Long>> repeatOccurrencesForHierarchy(
.and(ancestorOccurrences)
.apply(Flatten.pCollections());
}

/**
* For each occurrence (occurrence, criteria), generate a new occurrence for each ancestor of the
* criteria node (occurrence, ancestor).
*
* <p>This is the same concept as repeatOccurrencesForHierarchy but over occurrence ids.
*
* @param occurrences a collection of all occurrences that we want to count and the criteria
* they're associated with
* @param descendantAncestor a collection of (descendant, ancestor) pairs for the criteria nodes
* that we want a count for. note that this is the expanded set of all transitive
* relationships in the hierarchy, not just the parent/child pairs
* @return an expanded collection of occurrences (occurrence, ancestor), where each occurrence has
* been repeated for each ancestor of its primary node. note for later steps that this will
* contain multiple keys
*/
public static PCollection<KV<Long, Long>> repeatOccurrencesForHints(
PCollection<KV<Long, Long>> occurrences, PCollection<KV<Long, Long>> descendantAncestor) {
// Remove duplicate occurrences.
PCollection<KV<Long, Long>> distinctOccurrences =
occurrences.apply(
"remove duplicate occurrences before repeating for hints", Distinct.create());

// Swap (occurrence, criteria) to (criteria, occurrence). Duplicate keys are allowed at this
// point.
PCollection<KV<Long, Long>> criteriaOccurrences =
distinctOccurrences.apply(
"swap (occurrence, criteria) to (criteria, occurrence)", KvSwap.create());

// JOIN: distinctOccurrences (criteria, occurrence) INNER JOIN descendantAncestor (descendant,
// ancestor)
// ON criteria=descendant
// RESULT: occurrenceToAncestorAndOccurrence (criteria=descendant, (occurrence, ancestor))
PCollection<KV<Long, KV<Long, Long>>> criteriaToOccurrenceAndAncestor =
Join.innerJoin(
"inner join occurrences with ancestors", criteriaOccurrences, descendantAncestor);

// Get rid of the descendant node. That was only needed as the innerJoin field.
// RESULT: (occurrence, ancestor)
PCollection<KV<Long, Long>> occurrenceAncestors =
criteriaToOccurrenceAndAncestor.apply(Values.create());

// The descendant-ancestor pairs don't include a self-reference row (i.e. descendant=ancestor).
// So to get the full set of occurrences, concatenate the original occurrences with the ancestor
// duplicates.
return PCollectionList.of(distinctOccurrences)
.and(occurrenceAncestors)
.apply(Flatten.pCollections());
}
}
1 change: 1 addition & 0 deletions ui/src/tanagra-underlay/underlayConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ export type SZMetadata = {

export type SZOccurrenceEntity = {
attributesWithInstanceLevelHints: string[];
attributesWithRollupInstanceLevelHints: string[];
criteriaRelationship: SZCriteriaRelationship;
occurrenceEntity: string;
primaryRelationship: SZPrimaryRelationship;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,16 @@ private SZCriteriaOccurrence deserializeCriteriaOccurrence(String criteriaOccurr
? new HashSet<>()
: szCriteriaOccurrence.occurrenceEntities;
szCriteriaOccurrence.occurrenceEntities.forEach(
szOccurrenceEntity ->
szOccurrenceEntity.attributesWithInstanceLevelHints =
szOccurrenceEntity.attributesWithInstanceLevelHints == null
? new HashSet<>()
: szOccurrenceEntity.attributesWithInstanceLevelHints);
szOccurrenceEntity -> {
szOccurrenceEntity.attributesWithInstanceLevelHints =
szOccurrenceEntity.attributesWithInstanceLevelHints == null
? new HashSet<>()
: szOccurrenceEntity.attributesWithInstanceLevelHints;
szOccurrenceEntity.attributesWithRollupInstanceLevelHints =
szOccurrenceEntity.attributesWithRollupInstanceLevelHints == null
? new HashSet<>()
: szOccurrenceEntity.attributesWithRollupInstanceLevelHints;
});

return szCriteriaOccurrence;
} catch (IOException ioEx) {
Expand Down
Loading

0 comments on commit 1005c12

Please sign in to comment.