Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parallel processing of GenomicsDB arrays using multiprocessing #70

Merged
merged 20 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/scripts/setup_azurite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,15 @@ export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
AZURE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=https://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=https://127.0.0.1:10001/devstoreaccount1;"
az storage container create -n test --connection-string $AZURE_CONNECTION_STRING

# Setup examples workspace on azurite
# Setup test workspaces on azurite
mkdir oldstyle_dir
tar xzvf $GITHUB_WORKSPACE/test/inputs/sanity.test.tgz -C oldstyle_dir
az storage blob upload-batch -d test/oldstyle-dir -s oldstyle_dir --connection-string $AZURE_CONNECTION_STRING
export OLDSTYLE_DIR=az://test/oldstyle-dir

cd $GITHUB_WORKSPACE/examples
tar xzvf examples_ws.tgz
echo "Azure Storage Blob upload-batch..."
az storage blob upload-batch -d test/ws -s ws --connection-string $AZURE_CONNECTION_STRING
echo "Azure Storage Blob upload-batch DONE"
export WORKSPACE=az://test/ws

popd
3 changes: 2 additions & 1 deletion .github/workflows/basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ jobs:
run: |
source .github/scripts/setup_azurite.sh
echo "Testing on Azurite..."
PYTHONPATH=. WORKSPACE=az://test/ws ./examples/test.sh
echo "WORKSPACE=$WORKSPACE OLDSTYLE_DIR=$OLDSTYLE_DIR"
PYTHONPATH=. ./examples/test.sh
ls -l /tmp/tiledb_bookkeeping
echo "Testing on Azurite DONE"

Expand Down
13 changes: 11 additions & 2 deletions examples/genomicsdb_cache
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def is_cloud_path(path):
return False


def get_arrays(interval, contigs_map, partitions):
_, _, _, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions)
return arrays


def main():
parser = argparse.ArgumentParser(
prog="cache",
Expand Down Expand Up @@ -125,8 +130,12 @@ def main():
contigs_map, intervals = genomicsdb_common.parse_vidmap_json(vidmap_file, args.interval or args.interval_list)
loader = json.loads(genomicsdb.read_entire_file("loader.json"))
partitions = loader["column_partitions"]

for array in genomicsdb_common.get_arrays(contigs_map, intervals, partitions):
arrays = {
arrays_for_interval
for interval in intervals
for arrays_for_interval in get_arrays(interval, contigs_map, partitions)
}
for array in arrays:
print(f"Caching fragments for array {array}")
if genomicsdb.array_exists(workspace, array):
genomicsdb.cache_array_metadata(workspace, array)
Expand Down
51 changes: 33 additions & 18 deletions examples/genomicsdb_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import json
import os
import re
import sys

import genomicsdb

Expand Down Expand Up @@ -85,25 +86,39 @@ def parse_interval(interval: str):
raise RuntimeError(f"Interval {interval} could not be parsed")


def get_arrays(contigs_map, intervals, partitions):
arrays = set()
for interval in intervals:
contig, start, end = parse_interval(interval)
if contig in contigs_map:
contig_offset = contigs_map[contig]["tiledb_column_offset"] + start - 1
length = contigs_map[contig]["length"]
if end and end < length + 1:
contig_end = contigs_map[contig]["tiledb_column_offset"] + end - 1
else:
end = length
contig_end = contigs_map[contig]["tiledb_column_offset"] + length - 1
def get_arrays(interval, contigs_map, partitions):
contig, start, end = parse_interval(interval)
if contig in contigs_map:
contig_offset = contigs_map[contig]["tiledb_column_offset"] + start - 1
length = contigs_map[contig]["length"]
if end and end < length + 1:
contig_end = contigs_map[contig]["tiledb_column_offset"] + end - 1
else:
print(f"Contig({contig}) not found in vidmap.json")
end = length
contig_end = contigs_map[contig]["tiledb_column_offset"] + length - 1
else:
print(f"Contig({contig}) not found in vidmap.json")

arrays = []
for idx, partition in enumerate(partitions):
if isinstance(partition["begin"], int): # Old style vidmap json
column_begin = partition["begin"]
if "end" in partition.keys():
column_end = partition["end"]
elif idx + 1 < len(partitions):
column_end = partitions[idx + 1]["begin"] - 1
else:
column_end = sys.maxsize
else: # Generated with vcf2genomicsdb_init
column_begin = partition["begin"]["tiledb_column"]
column_end = partition["end"]["tiledb_column"]

if contig_end < column_begin or contig_offset > column_end:
continue

for partition in partitions:
if contig_end < partition["begin"]["tiledb_column"] or contig_offset > partition["end"]["tiledb_column"]:
continue
arrays.add(partition["array_name"])
if "array_name" in partition.keys():
arrays.append(partition["array_name"])
elif "array" in partition.keys():
arrays.append(partition["array"])

return arrays
return contig, start, end, arrays
Loading