Skip to content

Commit

Permalink
Support parallel processing of GenomicsDB arrays using multiprocessing (
Browse files Browse the repository at this point in the history
#70)

* Support for old syle jsons
* Initial pass at supporting multiprocessing in genomicsdb_query
  • Loading branch information
nalinigans authored Dec 27, 2024
1 parent bd2fab8 commit 4e6aa87
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 144 deletions.
10 changes: 7 additions & 3 deletions .github/scripts/setup_azurite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,15 @@ export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
AZURE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=https://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=https://127.0.0.1:10001/devstoreaccount1;"
az storage container create -n test --connection-string $AZURE_CONNECTION_STRING

# Setup examples workspace on azurite
# Setup test workspaces on azurite
mkdir oldstyle_dir
tar xzvf $GITHUB_WORKSPACE/test/inputs/sanity.test.tgz -C oldstyle_dir
az storage blob upload-batch -d test/oldstyle-dir -s oldstyle_dir --connection-string $AZURE_CONNECTION_STRING
export OLDSTYLE_DIR=az://test/oldstyle-dir

cd $GITHUB_WORKSPACE/examples
tar xzvf examples_ws.tgz
echo "Azure Storage Blob upload-batch..."
az storage blob upload-batch -d test/ws -s ws --connection-string $AZURE_CONNECTION_STRING
echo "Azure Storage Blob upload-batch DONE"
export WORKSPACE=az://test/ws

popd
3 changes: 2 additions & 1 deletion .github/workflows/basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ jobs:
run: |
source .github/scripts/setup_azurite.sh
echo "Testing on Azurite..."
PYTHONPATH=. WORKSPACE=az://test/ws ./examples/test.sh
echo "WORKSPACE=$WORKSPACE OLDSTYLE_DIR=$OLDSTYLE_DIR"
PYTHONPATH=. ./examples/test.sh
ls -l /tmp/tiledb_bookkeeping
echo "Testing on Azurite DONE"
Expand Down
13 changes: 11 additions & 2 deletions examples/genomicsdb_cache
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def is_cloud_path(path):
return False


def get_arrays(interval, contigs_map, partitions):
_, _, _, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions)
return arrays


def main():
parser = argparse.ArgumentParser(
prog="cache",
Expand Down Expand Up @@ -125,8 +130,12 @@ def main():
contigs_map, intervals = genomicsdb_common.parse_vidmap_json(vidmap_file, args.interval or args.interval_list)
loader = json.loads(genomicsdb.read_entire_file("loader.json"))
partitions = loader["column_partitions"]

for array in genomicsdb_common.get_arrays(contigs_map, intervals, partitions):
arrays = {
arrays_for_interval
for interval in intervals
for arrays_for_interval in get_arrays(interval, contigs_map, partitions)
}
for array in arrays:
print(f"Caching fragments for array {array}")
if genomicsdb.array_exists(workspace, array):
genomicsdb.cache_array_metadata(workspace, array)
Expand Down
51 changes: 33 additions & 18 deletions examples/genomicsdb_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import json
import os
import re
import sys

import genomicsdb

Expand Down Expand Up @@ -85,25 +86,39 @@ def parse_interval(interval: str):
raise RuntimeError(f"Interval {interval} could not be parsed")


def get_arrays(contigs_map, intervals, partitions):
arrays = set()
for interval in intervals:
contig, start, end = parse_interval(interval)
if contig in contigs_map:
contig_offset = contigs_map[contig]["tiledb_column_offset"] + start - 1
length = contigs_map[contig]["length"]
if end and end < length + 1:
contig_end = contigs_map[contig]["tiledb_column_offset"] + end - 1
else:
end = length
contig_end = contigs_map[contig]["tiledb_column_offset"] + length - 1
def get_arrays(interval, contigs_map, partitions):
contig, start, end = parse_interval(interval)
if contig in contigs_map:
contig_offset = contigs_map[contig]["tiledb_column_offset"] + start - 1
length = contigs_map[contig]["length"]
if end and end < length + 1:
contig_end = contigs_map[contig]["tiledb_column_offset"] + end - 1
else:
print(f"Contig({contig}) not found in vidmap.json")
end = length
contig_end = contigs_map[contig]["tiledb_column_offset"] + length - 1
else:
print(f"Contig({contig}) not found in vidmap.json")

arrays = []
for idx, partition in enumerate(partitions):
if isinstance(partition["begin"], int): # Old style vidmap json
column_begin = partition["begin"]
if "end" in partition.keys():
column_end = partition["end"]
elif idx + 1 < len(partitions):
column_end = partitions[idx + 1]["begin"] - 1
else:
column_end = sys.maxsize
else: # Generated with vcf2genomicsdb_init
column_begin = partition["begin"]["tiledb_column"]
column_end = partition["end"]["tiledb_column"]

if contig_end < column_begin or contig_offset > column_end:
continue

for partition in partitions:
if contig_end < partition["begin"]["tiledb_column"] or contig_offset > partition["end"]["tiledb_column"]:
continue
arrays.add(partition["array_name"])
if "array_name" in partition.keys():
arrays.append(partition["array_name"])
elif "array" in partition.keys():
arrays.append(partition["array"])

return arrays
return contig, start, end, arrays
Loading

0 comments on commit 4e6aa87

Please sign in to comment.