Skip to content

Commit

Permalink
Refactor loops and use Pool.map for better performance
Browse files Browse the repository at this point in the history
  • Loading branch information
nalinigans committed Dec 27, 2024
1 parent 1924ec5 commit 3d93a58
Showing 1 changed file with 38 additions and 25 deletions.
63 changes: 38 additions & 25 deletions examples/genomicsdb_query
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def parse_callset_json(callset_file):
callset = json.loads(genomicsdb.read_entire_file(callset_file))
callsets = callset["callsets"]
samples = [callset if isinstance(callset, str) else callset["sample_name"] for callset in callsets]
return samples
< return samples


def parse_callset_json_for_row_ranges(callset_file, samples=None):
Expand Down Expand Up @@ -338,6 +338,11 @@ class OutputConfig(NamedTuple):
json_type: str | None
max_arrow_bytes: int

class Config(NamedTuple):
export_config: GenomicsDBExportConfig
query_config: GenomicsDBQueryConfig
output_config:OutputConfig


def configure_export(config: GenomicsDBExportConfig):
export_config = query_pb.ExportConfiguration()
Expand Down Expand Up @@ -371,11 +376,14 @@ def configure_query(config: GenomicsDBQueryConfig):
return query_config


def process(export_config, query_config, output_config):
def process(config):
export_config = config.export_config
query_config = config.query_config
output_config = config.output_config
msg = f"array({query_config.array_name}) for interval({query_config.interval})"
if not genomicsdb.array_exists(export_config.workspace, query_config.array_name):
logging.error(msg + f" not imported into workspace({export_config.workspace})")
return 1
return
global gdb
try:
if gdb:
Expand Down Expand Up @@ -417,7 +425,6 @@ def process(export_config, query_config, output_config):
writer = None

logging.info(f"Processed array {query_config.array_name} for interval {query_config.interval}")
return 0


def main():
Expand All @@ -439,27 +446,33 @@ def main():
print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files")

export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file)
with multiprocessing.Pool(processes=args.nproc) as pool:
for interval in intervals:
print(f"Processing interval({interval})...")

contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions)
if len(arrays) == 0:
print(f"No arrays in the workspace matched input interval({interval})")
continue

print(f"\tArrays:{arrays} under consideration for interval({interval})")

for idx, array in enumerate(arrays):
query_config = GenomicsDBQueryConfig(interval, contig, start, end, array, row_tuples, filter)
output_config = OutputConfig(
generate_output_filename(output, output_type, interval, idx),
output_type,
json_type,
max_arrow_bytes,
)
res = pool.apply_async(process, args=(export_config, query_config, output_config))
print(res.get())
configs = []
for interval in intervals:
print(f"Processing interval({interval})...")

contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions)
if len(arrays) == 0:
print(f"No arrays in the workspace matched input interval({interval})")
continue

print(f"\tArrays:{arrays} under consideration for interval({interval})")
for idx, array in enumerate(arrays):
query_config = GenomicsDBQueryConfig(interval, contig, start, end, array, row_tuples, filter)
output_config = OutputConfig(
generate_output_filename(output, output_type, interval, idx),
output_type,
json_type,
max_arrow_bytes,
)
configs.append(Config(export_config, query_config, output_config))

if len(configs) == 1:
process(configs[0])
else:
nprocs = min(len(configs), args.nproc)
with multiprocessing.Pool(processes=args.nproc) as pool:
pool.map(process, configs)


print(f"genomicsdb_query for workspace({workspace}) and intervals({intervals}) completed successfully")

Expand Down

0 comments on commit 3d93a58

Please sign in to comment.