Skip to content

Commit

Permalink
Promotion 2024-12-10 prod (#6752, PR #6756)
Browse files Browse the repository at this point in the history
  • Loading branch information
achave11-ucsc committed Dec 13, 2024
2 parents e9759a9 + e5da98e commit d99ce0a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 42 deletions.
54 changes: 28 additions & 26 deletions scripts/reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
"""

import argparse
from collections import (
defaultdict,
)
import fnmatch
import logging
import sys

from azul import (
config,
reject,
require,
)
from azul.args import (
Expand Down Expand Up @@ -83,7 +81,9 @@
default=False,
action='store_true',
help='Delete all Elasticsearch indices in the current deployment. '
'Implies --create when combined with --index.')
'Implies --create when combined with --index. '
'Behaves like --deindex instead if the operations is limited to a subset '
'of the configured sources.')
parser.add_argument('--index',
default=False,
action='store_true',
Expand Down Expand Up @@ -131,48 +131,50 @@ def main(argv: list[str]):

azul = AzulClient(num_workers=args.num_workers)

sources_by_catalog = {
catalog: azul.catalog_sources(catalog)
for catalog in args.catalogs
}
source_globs = set(args.sources)
if not args.local or args.deindex:
sources_by_catalog = defaultdict(set)
globs_matched = set()
for catalog in args.catalogs:
sources = azul.catalog_sources(catalog)
globs_matched = set()
every_source = '*' in source_globs
if not every_source:
if args.local:
parser.error('Cannot specify sources when performing a local reindex')
assert False
for catalog, sources in sources_by_catalog.items():
catalog_matches = set()
for source_glob in source_globs:
matches = fnmatch.filter(sources, source_glob)
if matches:
globs_matched.add(source_glob)
log.debug('Source glob %r matched sources %r in catalog %r',
source_glob, matches, catalog)
sources_by_catalog[catalog].update(matches)
catalog_matches.update(matches)
sources_by_catalog[catalog] = catalog_matches
unmatched = source_globs - globs_matched
if unmatched:
log.warning('Source(s) not found in any catalog: %r', unmatched)
require(any(sources_by_catalog.values()),
'No valid sources specified for any catalog')
else:
if source_globs == {'*'}:
sources_by_catalog = {
catalog: azul.catalog_sources(catalog)
for catalog in args.catalogs
}
else:
parser.error('Cannot specify sources when performing a local reindex')
assert False

if args.deindex:
require(not any((args.index, args.delete, args.create)),
'--deindex is incompatible with --index, --create, and --delete.')
require('*' not in source_globs, '--deindex is incompatible with source `*`. '
'Use --delete instead.')
reject(args.deindex and (args.delete or args.create),
'--deindex is incompatible with --create and --delete.')

deindex = args.deindex or (args.delete and not every_source)
delete = args.delete and every_source

if deindex:
reject(every_source, '--deindex is incompatible with source `*`. '
'Use --delete instead.')
for catalog, sources in sources_by_catalog.items():
if sources:
azul.deindex(catalog, sources)

azul.reset_indexer(args.catalogs,
purge_queues=args.purge,
delete_indices=args.delete,
create_indices=args.create or args.index and args.delete)
delete_indices=delete,
create_indices=args.create or args.index and delete)

if args.index:
log.info('Queuing notifications for reindexing ...')
Expand Down
16 changes: 11 additions & 5 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2005,11 +2005,17 @@ def implicit_hub_type(self) -> str:

@property
def include_orphans(self) -> bool:
# When filtering only by project/dataset ID, we need to include
# *everything* in the selected projects/datasets, even rows that don't
# appear anywhere in the rest of the service response.
special_fields = self.service.metadata_plugin(self.catalog).special_fields
return self.filters.explicit.keys() == {special_fields.implicit_hub_id}
# When filtering exclusively by properties of implicit hubs, e.g.,
# data sets for AnVIL or projects for HCA, we include replicas of all
# entities implicitly connected to the matching hubs, even replicas of
# orphans, i.e., entities that aren't connected to files.
plugin = self.service.metadata_plugin(self.catalog)
implicit_hub_fields = {
field_name
for field_name, field_path in plugin.field_mapping.items()
if field_path[0] == 'contents' and field_path[1] == plugin.implicit_hub_type
}
return self.filters.explicit.keys() < implicit_hub_fields

@attrs.frozen(kw_only=True)
class ReplicaKeys:
Expand Down
2 changes: 0 additions & 2 deletions terraform/api_gateway.tf.json.template.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def for_domain(cls, domain):
api_gateway_log_format = {
'accountId': '$context.accountId',
'apiId': '$context.apiId',
'awsEndpointRequestId': '$context.awsEndpointRequestId',
'domainName': '$context.domainName',
'domainPrefix': '$context.domainPrefix',
'error_message': '$context.error.message',
Expand All @@ -105,7 +104,6 @@ def for_domain(cls, domain):
'integration_latency': '$context.integration.latency',
'integration_requestId': '$context.integration.requestId',
'integration_status': '$context.integration.status',
'integrationLatency': '$context.integrationLatency',
'integrationStatus': '$context.integrationStatus',
'path': '$context.path',
'protocol': '$context.protocol',
Expand Down
48 changes: 39 additions & 9 deletions test/service/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@
from azul.types import (
JSON,
JSONs,
MutableCompositeJSON,
MutableJSON,
MutableJSONs,
)
from indexer import (
AnvilCannedBundleTestCase,
Expand Down Expand Up @@ -147,13 +149,17 @@ class CannedManifestTestCase(CannedFileTestCase):
def _canned_manifest_path(self, *path: str) -> Path:
return self._data_path('service', 'manifest', *path)

def _load_canned_manifest(self, *path: str) -> MutableJSON:
def _load_canned_manifest(self, *path: str) -> MutableCompositeJSON:
with open(self._canned_manifest_path(*path)) as f:
return json.load(f)
manifest = json.load(f)
assert isinstance(manifest, (dict, list)), type(manifest)
return manifest

def _load_canned_pfb(self, *path: str) -> tuple[MutableJSON, MutableJSON]:
def _load_canned_pfb(self, *path: str) -> tuple[MutableJSON, MutableJSONs]:
schema = self._load_canned_manifest(*path, 'pfb_schema.json')
assert isinstance(schema, dict), type(schema)
entities = self._load_canned_manifest(*path, 'pfb_entities.json')
assert isinstance(entities, list), type(entities)
return schema, entities

def _assert_pfb_schema(self, schema):
Expand Down Expand Up @@ -2118,13 +2124,26 @@ def test_compact_manifest(self):
]
self._assert_tsv(expected, response)

dataset_id_filters: FiltersJSON = {
'datasets.dataset_id': {'is': ['52ee7665-7033-63f2-a8d9-ce8e32666739']}
}

dataset_title_filters: FiltersJSON = {
'datasets.title': {'is': ['ANVIL_CMG_UWASH_DS_BDIS']}
}

neutral_file_filters: FiltersJSON = {
'files.is_supplementary': {'is': [True, False]}
}

def test_verbatim_jsonl_manifest(self):
all_entities, linked_entities = self._canned_entities()
cases = [
({}, False),
({'datasets.title': {'is': ['ANVIL_CMG_UWASH_DS_BDIS']}}, False),
# Orphans should be included only when filtering by dataset ID
({'datasets.dataset_id': {'is': ['52ee7665-7033-63f2-a8d9-ce8e32666739']}}, True)
({}, True),
(self.dataset_title_filters, True),
(self.dataset_id_filters, True),
(self.neutral_file_filters, False),
({**self.neutral_file_filters, **self.dataset_title_filters}, False),
]
for filters, expect_orphans in cases:
with self.subTest(filters=filters):
Expand Down Expand Up @@ -2163,7 +2182,13 @@ def test(filters):
self._assert_pfb(expected_schema, expected_entities, response)

with self.subTest(orphans=True):
test({'datasets.dataset_id': {'is': ['52ee7665-7033-63f2-a8d9-ce8e32666739']}})
for filters in [
{},
self.dataset_id_filters,
self.dataset_title_filters
]:
with self.subTest(filters=filters):
test(filters)

with self.subTest(orphans=False):
# Dynamically edit out references to the orphaned entities (and
Expand All @@ -2189,7 +2214,12 @@ def test(filters):
filtered = [e for e in part if e['name'] != 'non_schema_orphan_table']
assert len(filtered) < len(part), 'Expected to filter orphan references'
part[:] = filtered
test({})
for filters in [
self.neutral_file_filters,
{**self.neutral_file_filters, **self.dataset_title_filters}
]:
with self.subTest(filters=filters):
test(filters)


class TestPFB(CannedManifestTestCase):
Expand Down

0 comments on commit d99ce0a

Please sign in to comment.