diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index c7e8006..029067d 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -55,7 +55,7 @@ def parse_callset_json(callset_file): callset = json.loads(genomicsdb.read_entire_file(callset_file)) callsets = callset["callsets"] samples = [callset if isinstance(callset, str) else callset["sample_name"] for callset in callsets] - return samples +< return samples def parse_callset_json_for_row_ranges(callset_file, samples=None): @@ -338,6 +338,11 @@ class OutputConfig(NamedTuple): json_type: str | None max_arrow_bytes: int +class Config(NamedTuple): + export_config: GenomicsDBExportConfig + query_config: GenomicsDBQueryConfig + output_config:OutputConfig + def configure_export(config: GenomicsDBExportConfig): export_config = query_pb.ExportConfiguration() @@ -371,11 +376,14 @@ def configure_query(config: GenomicsDBQueryConfig): return query_config -def process(export_config, query_config, output_config): +def process(config): + export_config = config.export_config + query_config = config.query_config + output_config = config.output_config msg = f"array({query_config.array_name}) for interval({query_config.interval})" if not genomicsdb.array_exists(export_config.workspace, query_config.array_name): logging.error(msg + f" not imported into workspace({export_config.workspace})") - return 1 + return global gdb try: if gdb: @@ -417,7 +425,6 @@ def process(export_config, query_config, output_config): writer = None logging.info(f"Processed array {query_config.array_name} for interval {query_config.interval}") - return 0 def main(): @@ -439,27 +446,33 @@ def main(): print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file) - with multiprocessing.Pool(processes=args.nproc) as pool: - for interval in intervals: - print(f"Processing interval({interval})...") - - contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions) - if len(arrays) == 0: - print(f"No arrays in the workspace matched input interval({interval})") - continue - - print(f"\tArrays:{arrays} under consideration for interval({interval})") - - for idx, array in enumerate(arrays): - query_config = GenomicsDBQueryConfig(interval, contig, start, end, array, row_tuples, filter) - output_config = OutputConfig( - generate_output_filename(output, output_type, interval, idx), - output_type, - json_type, - max_arrow_bytes, - ) - res = pool.apply_async(process, args=(export_config, query_config, output_config)) - print(res.get()) + configs = [] + for interval in intervals: + print(f"Processing interval({interval})...") + + contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions) + if len(arrays) == 0: + print(f"No arrays in the workspace matched input interval({interval})") + continue + + print(f"\tArrays:{arrays} under consideration for interval({interval})") + for idx, array in enumerate(arrays): + query_config = GenomicsDBQueryConfig(interval, contig, start, end, array, row_tuples, filter) + output_config = OutputConfig( + generate_output_filename(output, output_type, interval, idx), + output_type, + json_type, + max_arrow_bytes, + ) + configs.append(Config(export_config, query_config, output_config)) + + if len(configs) == 1: + process(configs[0]) + else: + nprocs = min(len(configs), args.nproc) + with multiprocessing.Pool(processes=args.nproc) as pool: + pool.map(process, configs) + print(f"genomicsdb_query for workspace({workspace}) and intervals({intervals}) completed successfully")