Skip to content

Commit

Permalink
Split rollup count lookup on source stable during indexing (#1105)
Browse files Browse the repository at this point in the history
  • Loading branch information
dexamundsen authored Dec 18, 2024
1 parent 7aa7228 commit 1e1ba42
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ private void updateNumChildren(boolean isDryRun) {
+ SqlQueryField.of(idField).renderForSelect(updateTableAlias)
+ " = "
+ SqlQueryField.of(parentField).renderForSelect(tempTableAlias);

LOGGER.info("update-num-children-from-select query: {}", updateFromSelectSql);

// Run the update-from-select to update the count for num children.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void run(boolean isDryRun) {
+ unionAllTable.render()
+ " GROUP BY "
+ SqlQueryField.of(tempTableIdField).renderForGroupBy(null, true);
LOGGER.info("idTextPairs union query: {}", selectTextConcatSql);

// Build an update-from-select query for the index entity main table and the id text pairs
// query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,13 @@ private void copyFieldsToEntityTable(boolean isDryRun) {
SqlField tempTableNumChildrenField = SqlField.of(entityTableNumChildrenField.getColumnName());
String tempTableSql =
"SELECT "
+ SqlQueryField.of(tempTableIdField).renderForSelect()
+ ", "
+ SqlQueryField.of(tempTablePathField).renderForSelect()
+ ", "
+ SqlQueryField.of(tempTableNumChildrenField).renderForSelect()
+ ", "
+ SqlQueryField.of(tempTableIdField).renderForSelect()
+ " FROM "
+ tempBQTable.render();
LOGGER.info("temp table query: {}", tempTableSql);

// Build an update-from-select query for the index entity main table and the
// id-path-num_children query.
Expand All @@ -418,14 +417,8 @@ private void copyFieldsToEntityTable(boolean isDryRun) {
+ SqlQueryField.of(entityTableNumChildrenField).renderForSelect(updateTableAlias)
+ " = "
+ SqlQueryField.of(tempTableNumChildrenField).renderForSelect(tempTableAlias)
+ " FROM (SELECT "
+ SqlQueryField.of(tempTablePathField).renderForSelect()
+ ", "
+ SqlQueryField.of(tempTableNumChildrenField).renderForSelect()
+ ", "
+ SqlQueryField.of(tempTableIdField).renderForSelect()
+ " FROM "
+ tempBQTable.render()
+ " FROM ("
+ tempTableSql
+ ") AS "
+ tempTableAlias
+ " WHERE "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@
import bio.terra.tanagra.underlay.indextable.ITRelationshipIdPairs;
import bio.terra.tanagra.underlay.serialization.SZIndexer;
import bio.terra.tanagra.underlay.sourcetable.*;
import com.google.api.*;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.Table;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,7 +72,7 @@ public class WriteRollupCounts extends BigQueryJob {
private static final Logger LOGGER = LoggerFactory.getLogger(WriteRollupCounts.class);

private static final String TEMP_TABLE_NAME = "RC";

private static final String INDEX_ID_REGEX_BRACES = "{indexIdRegex}";
private final EntityGroup entityGroup;
private final Entity entity;
private final Entity countedEntity;
Expand Down Expand Up @@ -370,43 +372,88 @@ private void copyFieldsToEntityTable(boolean isDryRun) {
rollupCountsTableIdField = relationshipRollupCountsSourceTable.getEntityIdField();
rollupCountsTableCountField = relationshipRollupCountsSourceTable.getCountField();
}
String rollupCountsTableSql =
"SELECT "
+ SqlQueryField.of(rollupCountsTableIdField).renderForSelect()
+ ", "
+ SqlQueryField.of(rollupCountsTableCountField).renderForSelect()
+ " FROM "
+ rollupCountsBQTable.render();
LOGGER.info("rollup counts table query: {}", rollupCountsTableSql);

// Build an update-from-select query for the index entity main table and the
// id-count query.
String updateTableAlias = "updatetable";
String tempTableAlias = "temptable";
String updateFromSelectSql =
"UPDATE "
+ indexTable.getTablePointer().render()
+ " AS "
+ updateTableAlias
+ " SET "
+ SqlQueryField.of(entityTableCountField).renderForSelect(updateTableAlias)
+ " = "
+ SqlQueryField.of(rollupCountsTableCountField).renderForSelect(tempTableAlias)
+ " FROM (SELECT "
+ SqlQueryField.of(rollupCountsTableCountField).renderForSelect()
+ ", "
+ SqlQueryField.of(rollupCountsTableIdField).renderForSelect()
+ " FROM "
+ rollupCountsBQTable.render()
+ ") AS "
+ tempTableAlias
+ " WHERE "
+ SqlQueryField.of(entityTableIdField).renderForSelect(updateTableAlias)
+ " = "
+ SqlQueryField.of(rollupCountsTableIdField).renderForSelect(tempTableAlias);
LOGGER.info("update-from-select query: {}", updateFromSelectSql);

// Run the update-from-select to write the count field in the index entity main table.
runQueryIfTableExists(indexTable.getTablePointer(), updateFromSelectSql, isDryRun);

// Some rollupCounts source table are very large (eg. variant_to_person)
// Split the read into smaller chunks: use Id field since the table may be clustered on it
// If such splitting of the update is needed, add a where clause regex in rollupCounts.sql
// format: WHERE REGEXP_CONTAINS(id, r"{indexIdRegex}")
String rollUpCountRawSql = rollupCountsBQTable.render();

List<String> regexSubValues = List.of(StringUtils.EMPTY); // default: no-op
if (rollUpCountRawSql.contains(INDEX_ID_REGEX_BRACES)) {
regexSubValues = generateRegexIdSubValues();
}

regexSubValues.forEach(
subValue -> {
String rollupCountsTableSql =
"SELECT "
+ SqlQueryField.of(rollupCountsTableCountField).renderForSelect()
+ ", "
+ SqlQueryField.of(rollupCountsTableIdField).renderForSelect()
+ " FROM "
+ rollUpCountRawSql.replace(INDEX_ID_REGEX_BRACES, subValue);
String updateFromSelectSql =
"UPDATE "
+ indexTable.getTablePointer().render()
+ " AS "
+ updateTableAlias
+ " SET "
+ SqlQueryField.of(entityTableCountField).renderForSelect(updateTableAlias)
+ " = "
+ SqlQueryField.of(rollupCountsTableCountField).renderForSelect(tempTableAlias)
+ " FROM ("
+ rollupCountsTableSql
+ ") AS "
+ tempTableAlias
+ " WHERE "
+ SqlQueryField.of(entityTableIdField).renderForSelect(updateTableAlias)
+ " = "
+ SqlQueryField.of(rollupCountsTableIdField).renderForSelect(tempTableAlias);
LOGGER.info("update-from-select query: {}", updateFromSelectSql);

// Run the update-from-select to write the count field in the index entity main table.
runQueryIfTableExists(indexTable.getTablePointer(), updateFromSelectSql, isDryRun);
});
}

private List<String> generateRegexIdSubValues() {
// This is currently used only for variant_id and hence optimized for the same
// If other entities need this, add regex values as a property of entityGroup config
// i.e. property SZRollupCountsSql.regexIdSub &entityGroup.json#rollupCountsSql#regexIdSub
Stream<String> digitStream =
IntStream.range(1, 10)
.mapToObj(
i -> {
List<String> forI = new ArrayList<>();
// regex: 1 digit followed by alphabet
forI.add(String.format("^%d[^0-9]", i));
// regex: 1 digit followed by digit
forI.addAll(
IntStream.range(0, 10).mapToObj(j -> String.format("^%d%d", i, j)).toList());
return forI;
})
.flatMap(List::stream);

Stream<String> alphabetStream =
"XY"
.chars()
.mapToObj(
c -> {
List<String> forC = new ArrayList<>();
// regex: 1 alphabet followed by digit
forC.add(String.format("^%c[0-9]", c));
// regex: 1 alphabet followed by alphabet
forC.add(String.format("^%c[^0-9]", c));
return forC;
})
.flatMap(List::stream);

return Stream.concat(digitStream, alphabetStream).toList();
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SELECT vid AS variant_id, ARRAY_LENGTH(person_ids) AS num_persons
/* Wrap variant_to_person table in a SELECT DISTINCT because there is a duplicate row in the test data. */
FROM (SELECT DISTINCT vid, person_ids FROM `${omopDataset}.variant_to_person`)
FROM (SELECT DISTINCT vid, person_ids FROM `${omopDataset}.variant_to_person` WHERE REGEXP_CONTAINS(vid, r"{indexIdRegex}"))
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
},
"indexData": {
"projectId": "prj-e-dexamundsen-hd2",
"datasetId": "aou_test_data_SC2023Q3R2_index_121624",
"datasetId": "aou_test_data_SC2023Q3R2_index_121724",
"tablePrefix": "T"
},
"queryProjectId": "prj-e-dexamundsen-hd2",
Expand Down

0 comments on commit 1e1ba42

Please sign in to comment.