diff --git a/.github/scripts/setup_azurite.sh b/.github/scripts/setup_azurite.sh index 50998b2..1f0bcbf 100644 --- a/.github/scripts/setup_azurite.sh +++ b/.github/scripts/setup_azurite.sh @@ -54,11 +54,15 @@ export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt AZURE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=https://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=https://127.0.0.1:10001/devstoreaccount1;" az storage container create -n test --connection-string $AZURE_CONNECTION_STRING -# Setup examples workspace on azurite +# Setup test workspaces on azurite +mkdir oldstyle_dir +tar xzvf $GITHUB_WORKSPACE/test/inputs/sanity.test.tgz -C oldstyle_dir +az storage blob upload-batch -d test/oldstyle-dir -s oldstyle_dir --connection-string $AZURE_CONNECTION_STRING +export OLDSTYLE_DIR=az://test/oldstyle-dir + cd $GITHUB_WORKSPACE/examples tar xzvf examples_ws.tgz -echo "Azure Storage Blob upload-batch..." az storage blob upload-batch -d test/ws -s ws --connection-string $AZURE_CONNECTION_STRING -echo "Azure Storage Blob upload-batch DONE" +export WORKSPACE=az://test/ws popd diff --git a/.github/workflows/basic.yml b/.github/workflows/basic.yml index 31093a3..57a600b 100644 --- a/.github/workflows/basic.yml +++ b/.github/workflows/basic.yml @@ -102,7 +102,8 @@ jobs: run: | source .github/scripts/setup_azurite.sh echo "Testing on Azurite..." - PYTHONPATH=. WORKSPACE=az://test/ws ./examples/test.sh + echo "WORKSPACE=$WORKSPACE OLDSTYLE_DIR=$OLDSTYLE_DIR" + PYTHONPATH=. ./examples/test.sh ls -l /tmp/tiledb_bookkeeping echo "Testing on Azurite DONE" diff --git a/examples/genomicsdb_cache b/examples/genomicsdb_cache index e8dd56f..1454f63 100755 --- a/examples/genomicsdb_cache +++ b/examples/genomicsdb_cache @@ -41,6 +41,11 @@ def is_cloud_path(path): return False +def get_arrays(interval, contigs_map, partitions): + _, _, _, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions) + return arrays + + def main(): parser = argparse.ArgumentParser( prog="cache", @@ -125,8 +130,12 @@ def main(): contigs_map, intervals = genomicsdb_common.parse_vidmap_json(vidmap_file, args.interval or args.interval_list) loader = json.loads(genomicsdb.read_entire_file("loader.json")) partitions = loader["column_partitions"] - - for array in genomicsdb_common.get_arrays(contigs_map, intervals, partitions): + arrays = { + arrays_for_interval + for interval in intervals + for arrays_for_interval in get_arrays(interval, contigs_map, partitions) + } + for array in arrays: print(f"Caching fragments for array {array}") if genomicsdb.array_exists(workspace, array): genomicsdb.cache_array_metadata(workspace, array) diff --git a/examples/genomicsdb_common.py b/examples/genomicsdb_common.py index c0617d2..803e145 100644 --- a/examples/genomicsdb_common.py +++ b/examples/genomicsdb_common.py @@ -27,6 +27,7 @@ import json import os import re +import sys import genomicsdb @@ -85,25 +86,39 @@ def parse_interval(interval: str): raise RuntimeError(f"Interval {interval} could not be parsed") -def get_arrays(contigs_map, intervals, partitions): - arrays = set() - for interval in intervals: - contig, start, end = parse_interval(interval) - if contig in contigs_map: - contig_offset = contigs_map[contig]["tiledb_column_offset"] + start - 1 - length = contigs_map[contig]["length"] - if end and end < length + 1: - contig_end = contigs_map[contig]["tiledb_column_offset"] + end - 1 - else: - end = length - contig_end = contigs_map[contig]["tiledb_column_offset"] + length - 1 +def get_arrays(interval, contigs_map, partitions): + contig, start, end = parse_interval(interval) + if contig in contigs_map: + contig_offset = contigs_map[contig]["tiledb_column_offset"] + start - 1 + length = contigs_map[contig]["length"] + if end and end < length + 1: + contig_end = contigs_map[contig]["tiledb_column_offset"] + end - 1 else: - print(f"Contig({contig}) not found in vidmap.json") + end = length + contig_end = contigs_map[contig]["tiledb_column_offset"] + length - 1 + else: + print(f"Contig({contig}) not found in vidmap.json") + + arrays = [] + for idx, partition in enumerate(partitions): + if isinstance(partition["begin"], int): # Old style vidmap json + column_begin = partition["begin"] + if "end" in partition.keys(): + column_end = partition["end"] + elif idx + 1 < len(partitions): + column_end = partitions[idx + 1]["begin"] - 1 + else: + column_end = sys.maxsize + else: # Generated with vcf2genomicsdb_init + column_begin = partition["begin"]["tiledb_column"] + column_end = partition["end"]["tiledb_column"] + + if contig_end < column_begin or contig_offset > column_end: continue - for partition in partitions: - if contig_end < partition["begin"]["tiledb_column"] or contig_offset > partition["end"]["tiledb_column"]: - continue - arrays.add(partition["array_name"]) + if "array_name" in partition.keys(): + arrays.append(partition["array_name"]) + elif "array" in partition.keys(): + arrays.append(partition["array"]) - return arrays + return contig, start, end, arrays diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index 62c89b4..39afb68 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -28,9 +28,12 @@ import argparse import json +import logging +import multiprocessing import os import re import sys +from typing import List, NamedTuple import genomicsdb_common import pyarrow as pa @@ -41,6 +44,12 @@ from genomicsdb import json_output_mode from genomicsdb.protobuf import genomicsdb_coordinates_pb2 as query_coords from genomicsdb.protobuf import genomicsdb_export_config_pb2 as query_pb +logging.basicConfig( + format="%(asctime)s.%(msecs)03d %(levelname)-5s GenomicsDB Python - pid=%(process)d tid=%(thread)d %(message)s", + level=logging.INFO, + datefmt="%H:%M:%S", +) + def parse_callset_json(callset_file): callset = json.loads(genomicsdb.read_entire_file(callset_file)) @@ -96,20 +105,18 @@ def parse_loader_json(loader_file): return [name.replace("$", ":", 1).replace("$", "-", 1) for name in array_names] -def genomicsdb_connect(workspace, callset_file, vidmap_file, filter): - export_config = query_pb.ExportConfiguration() - export_config.workspace = workspace - export_config.attributes.extend(["REF", "ALT", "GT"]) - export_config.vid_mapping_file = vidmap_file - export_config.callset_mapping_file = callset_file - export_config.bypass_intersecting_intervals_phase = False - export_config.enable_shared_posixfs_optimizations = True - if filter: - export_config.query_filter = filter - return genomicsdb.connect_with_protobuf(export_config) +def check_nproc(value): + ivalue = int(value) + if ivalue < 1: + raise argparse.ArgumentTypeError("%s specified nproc arg cannot be less than 1" % value) + elif ivalue > multiprocessing.cpu_count(): + raise argparse.ArgumentTypeError( + "%s specified nproc arg cannot exceed max number of available processing units" % value + ) + return ivalue -def setup_gdb(): +def setup(): parser = argparse.ArgumentParser( prog="query", description="GenomicsDB simple query with samples/intervals/filter as inputs", @@ -185,6 +192,13 @@ def setup_gdb(): required=False, help="Optional - genomic filter expression for the query, e.g. 'ISHOMREF' or 'ISHET' or 'REF == \"G\" && resolve(GT, REF, ALT) &= \"T/T\" && ALT |= \"T\"'", # noqa ) + parser.add_argument( + "-n", + "--nproc", + type=check_nproc, + default=8, + help="Optional - number of processing units for multiprocessing(default: %(default)s). Run nproc from command line to print the number of processing units available to a process for the user", # noqa + ) parser.add_argument( "-t", "--output-type", @@ -266,9 +280,7 @@ def setup_gdb(): loader = json.loads(genomicsdb.read_entire_file(loader_file)) partitions = loader["column_partitions"] - gdb = genomicsdb_connect(workspace, callset_file, vidmap_file, args.filter) - - return gdb, workspace, partitions, contigs_map, intervals, row_tuples, args + return workspace, callset_file, vidmap_file, partitions, contigs_map, intervals, row_tuples, args def generate_output_filename(output, output_type, interval, idx): @@ -304,116 +316,161 @@ def parse_args_for_max_bytes(max_arrow_byte_size): return int(float(number) * units[unit]) -def main(): - gdb, workspace, partitions, contigs_map, intervals, row_tuples, args = setup_gdb() +class GenomicsDBExportConfig(NamedTuple): + workspace: str + vidmap_file: str + callset_file: str - if row_tuples and len(row_tuples) == 0: - return - print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})") +class GenomicsDBQueryConfig(NamedTuple): + interval: str + contig: str + start: int + end: int + array_name: str + row_tuples: List[tuple] + filter: str + + +class OutputConfig(NamedTuple): + filename: str + type: str + json_type: str + max_arrow_bytes: int + + +class Config(NamedTuple): + export_config: GenomicsDBExportConfig + query_config: GenomicsDBQueryConfig + output_config: OutputConfig + +def configure_export(config: GenomicsDBExportConfig): + export_config = query_pb.ExportConfiguration() + export_config.workspace = config.workspace + export_config.attributes.extend(["REF", "ALT", "GT"]) + export_config.vid_mapping_file = config.vidmap_file + export_config.callset_mapping_file = config.callset_file + export_config.bypass_intersecting_intervals_phase = False + export_config.enable_shared_posixfs_optimizations = True + return export_config + + +def configure_query(config: GenomicsDBQueryConfig): + query_config = query_pb.QueryConfiguration() + query_config.array_name = config.array_name + contig_interval = query_coords.ContigInterval() + contig_interval.contig = config.contig + contig_interval.begin = config.start + contig_interval.end = config.end + query_config.query_contig_intervals.extend([contig_interval]) row_range_list = None - if row_tuples: + if config.row_tuples: row_range_list = query_pb.RowRangeList() - for tuple in row_tuples: + for tuple in config.row_tuples: range = query_pb.RowRange() range.low = tuple[0] range.high = tuple[1] row_range_list.range_list.extend([range]) + if row_range_list: + query_config.query_row_ranges.extend([row_range_list]) + return query_config + + +def process(config): + export_config = config.export_config + query_config = config.query_config + output_config = config.output_config + msg = f"array({query_config.array_name}) for interval({query_config.interval})" + if not genomicsdb.array_exists(export_config.workspace, query_config.array_name): + logging.error(msg + f" not imported into workspace({export_config.workspace})") + return + global gdb + try: + if gdb: + logging.info("Found gdb to process " + msg) + except NameError: + logging.info("Instantiating genomicsdb to process " + msg + "...") + gdb = genomicsdb.connect_with_protobuf(configure_export(export_config)) + logging.info("Instantiating genomicsdb to process " + msg + " DONE") + query_protobuf = configure_query(query_config) + if output_config.type == "csv": + df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) + df.to_csv(output_config.filename, index=False) + elif output_config.type == "json": + json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type) + with open(output_config.filename, "wb") as f: + f.write(json_output) + elif output_config.type == "arrow": + nbytes = 0 + writer = None + i = 0 + for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): + reader = pa.ipc.open_stream(out) + for batch in reader: + if nbytes > output_config.max_arrow_bytes: + i += 1 + nbytes = 0 + if writer: + writer.close() + writer = None + if not writer: + writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) + nbytes += batch.nbytes + writer.write_batch(batch) + if writer: + writer.close() + writer = None + + logging.info(f"Processed array {query_config.array_name} for interval {query_config.interval}") + + +def main(): + workspace, callset_file, vidmap_file, partitions, contigs_map, intervals, row_tuples, args = setup() + + if row_tuples is not None and len(row_tuples) == 0: + return + print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})") + + output_type = args.output_type + output = args.output + json_type = None + if output_type == "json": + json_type = parse_args_for_json_type(args.json_output_type) + max_arrow_bytes = -1 + if output_type == "arrow": + if not os.path.exists(output): + os.mkdir(output) + max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size) + print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") + + export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file) + configs = [] for interval in intervals: print(f"Processing interval({interval})...") - # get tiledb offsets for interval - contig, start, end = genomicsdb_common.parse_interval(interval) - if contig in contigs_map: - contig_offset = contigs_map[contig]["tiledb_column_offset"] + start - 1 - length = contigs_map[contig]["length"] - if end and end < length + 1: - contig_end = contigs_map[contig]["tiledb_column_offset"] + end - 1 - else: - end = length - contig_end = contigs_map[contig]["tiledb_column_offset"] + length - 1 - else: - print(f"Contig({contig}) not found in vidmap.json") - continue - arrays = [] - for idx, partition in enumerate(partitions): - if isinstance(partition["begin"], int): # Old style vidmap json - column_begin = partition["begin"] - if "end" in partition.keys(): - column_end = partition["end"] - elif idx + 1 < len(partitions): - column_end = partitions[idx + 1]["begin"] - 1 - else: - column_end = sys.maxsize - else: # Generated with vcf2genomicsdb_init - column_begin = partition["begin"]["tiledb_column"] - column_end = partition["end"]["tiledb_column"] - if contig_end < column_begin or contig_offset > column_end: - continue - if "array_name" in partition.keys(): - arrays.append(partition["array_name"]) - elif "array" in partition.keys(): - arrays.append(partition["array"]) - - arrays_length = len(arrays) - if arrays_length == 0: + contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions) + if len(arrays) == 0: print(f"No arrays in the workspace matched input interval({interval})") continue - print(f"\tArrays:{arrays} under consideration for interval({interval})") - - output_type = args.output_type - output = args.output - if output_type == "json": - json_type = parse_args_for_json_type(args.json_output_type) - if output_type == "arrow": - max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size) - print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") + print(f"\tArrays:{arrays} under consideration for interval({interval})") for idx, array in enumerate(arrays): - if not genomicsdb.array_exists(workspace, array): - print(f"\tArray({array}) not imported into workspace({workspace}) for interval({interval})") - continue - query_config = query_pb.QueryConfiguration() - query_config.array_name = array - contig_interval = query_coords.ContigInterval() - contig_interval.contig = contig - contig_interval.begin = start - contig_interval.end = end - query_config.query_contig_intervals.extend([contig_interval]) - if row_range_list: - query_config.query_row_ranges.extend([row_range_list]) - output_filename = generate_output_filename(output, output_type, interval, idx) - if output_type == "csv": - df = gdb.query_variant_calls(query_protobuf=query_config, flatten_intervals=True) - df.to_csv(output_filename, index=False) - elif output_type == "json": - json_output = gdb.query_variant_calls(query_protobuf=query_config, json_output=json_type) - with open(output_filename, "wb") as f: - f.write(json_output) - elif output_type == "arrow": - if not os.path.exists(output): - os.mkdir(output) - nbytes = 0 - writer = None - i = 0 - for out in gdb.query_variant_calls(query_protobuf=query_config, arrow_output=True, batching=True): - reader = pa.ipc.open_stream(out) - for batch in reader: - if nbytes > max_arrow_bytes: - i += 1 - nbytes = 0 - if writer: - writer.close() - writer = None - if not writer: - print(f"Writing out batch {i}...") - writer = pq.ParquetWriter(f"{output_filename}__{i}.parquet", batch.schema) - nbytes += batch.nbytes - writer.write_batch(batch) - if writer: - writer.close() + query_config = GenomicsDBQueryConfig(interval, contig, start, end, array, row_tuples, filter) + output_config = OutputConfig( + generate_output_filename(output, output_type, interval, idx), + output_type, + json_type, + max_arrow_bytes, + ) + configs.append(Config(export_config, query_config, output_config)) + + if len(configs) == 1: + process(configs[0]) + else: + with multiprocessing.Pool(processes=min(len(configs), args.nproc)) as pool: + pool.map(process, configs) print(f"genomicsdb_query for workspace({workspace}) and intervals({intervals}) completed successfully") diff --git a/examples/run.sh b/examples/run.sh index 0689575..da38252 100755 --- a/examples/run.sh +++ b/examples/run.sh @@ -45,8 +45,10 @@ INTERVALS=("1:1-1000000") #declare -a SAMPLES #SAMPLES=("HG00096" "HG00097" "HG00099") +#SAMPLES_LIST=samples.list #FILTER='resolve(GT, REF, ALT) &= "T/T"' +FILTER='!ISHOMREF' export OUTPUT_FILE=${OUTPUT_FILE:-my_output} export OUTPUT_FILE_TYPE=${OUTPUT_FILE_TYPE:-json} @@ -54,16 +56,18 @@ export OUTPUT_FILE_TYPE=${OUTPUT_FILE_TYPE:-json} export TILEDB_CACHE=1 NTHREADS=${NTHREADS:-8} +VENV=${VENV:-env} + ########################################### # Should not have to change anything below ########################################### -if [[ ! -d env ]]; then - python3 -m venv env +if [[ ! -d $VENV ]]; then + python3 -m venv $VENV source env/bin/activate pip install genomicsdb else - source env/bin/activate + source $VENV/bin/activate fi PATH=$(dirname $0):$PATH @@ -75,8 +79,12 @@ if [[ ! -z ${SAMPLES} ]]; then done fi +if [[ ! -z $SAMPLES_LIST ]]; then + export SAMPLE_ARGS="-S $SAMPLE_LIST" +fi + if [[ ! -z ${FILTER} ]]; then - FILTER_EXPR="-f $FILTER" + export FILTER_EXPR="-f $FILTER" fi echo $LOADER_FILE $CALLSET_FILE $VIDMAP_FILE diff --git a/examples/test.sh b/examples/test.sh index 85b2667..a65a2f6 100755 --- a/examples/test.sh +++ b/examples/test.sh @@ -158,20 +158,17 @@ if [[ $WORKSPACE == *://* ]]; then fi rm -f loader.json callset.json vidmap.json -if [[ $WORKSPACE == *://* ]]; then - cleanup - exit 0 -fi - #################################################################### # # Check old style workspaces with genomicsdb_query/genomicsdb_cache # #################################################################### -OLDSTYLE_DIR=$TEMP_DIR/old_style -mkdir -p $OLDSTYLE_DIR -tar xzf $(dirname $0)/../test/inputs/sanity.test.tgz -C $OLDSTYLE_DIR +if [[ -z $OLDSTYLE_DIR ]]; then + OLDSTYLE_DIR=$TEMP_DIR/old_style + mkdir -p $OLDSTYLE_DIR + tar xzf $(dirname $0)/../test/inputs/sanity.test.tgz -C $OLDSTYLE_DIR +fi WORKSPACE=$OLDSTYLE_DIR/ws run_command "genomicsdb_query -w $WORKSPACE --list-samples" @@ -181,6 +178,7 @@ run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT" OLDSTYLE_JSONS="-l $OLDSTYLE_DIR/loader.json -c $OLDSTYLE_DIR/callset_t0_1_2.json -v $OLDSTYLE_DIR/vid.json" +run_command "genomicsdb_cache -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS" run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS --list-samples" run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS --list-contigs" run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS --list-partitions" diff --git a/test/inputs/sanity.test.tgz b/test/inputs/sanity.test.tgz index 68ed4df..e999175 100644 Binary files a/test/inputs/sanity.test.tgz and b/test/inputs/sanity.test.tgz differ