From 5f36025bdaa4a1e3366b72d8c04a228a005a9d8a Mon Sep 17 00:00:00 2001 From: Dan Fornika Date: Tue, 11 Jun 2024 16:24:08 -0700 Subject: [PATCH] troubleshooting inconsistent outputs --- .github/scripts/check_outputs.py | 61 ++++++++++++++++++++++++++++---- fluviewer/analysis.py | 54 ++++++++++++++-------------- fluviewer/fluviewer.py | 1 - 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/.github/scripts/check_outputs.py b/.github/scripts/check_outputs.py index 0387c44..c84c323 100755 --- a/.github/scripts/check_outputs.py +++ b/.github/scripts/check_outputs.py @@ -44,7 +44,12 @@ def check_expected_files_exist(output_dirs, sample_ids, output_file_mapping_by_s return expected_file_checks -def check_expected_md5sums_match(output_dirs, sample_ids, output_file_mapping_by_sample_id): +def check_expected_md5sums_match( + output_dirs, + sample_ids, + output_file_mapping_by_sample_id, + files_with_header_added_in_origin +): """ Check that the expected md5sums match the actual md5sums in the output directory. @@ -54,6 +59,9 @@ def check_expected_md5sums_match(output_dirs, sample_ids, output_file_mapping_by :type sample_ids: List[str] :param output_file_mapping_by_sample_id: Dictionary with keys as sample IDs and values as dictionaries. + :type output_file_mapping_by_sample_id: Dict[str, Dict[str, Dict[str, str]]] + :param files_with_header_added_in_origin: List of file types for which the header is added in the origin version of the file + :type files_with_header_added_in_origin: List[str] :return: List of dictionaries with keys ['sample_id', 'file_type', 'upstream_path', 'origin_path', 'upstream_md5sum', 'origin_md5sum', 'md5sums_match'] """ expected_md5sum_checks = [] @@ -61,10 +69,20 @@ def check_expected_md5sums_match(output_dirs, sample_ids, output_file_mapping_by for file_type, paths_by_pipeline in output_files.items(): upstream_path = os.path.join(output_dirs['upstream'], paths_by_pipeline['upstream']) origin_path = os.path.join(output_dirs['origin'], paths_by_pipeline['origin']) - with open(upstream_path, 'rb') as f: - upstream_md5sum = hashlib.md5(f.read()).hexdigest() - with open(origin_path, 'rb') as f: - origin_md5sum = hashlib.md5(f.read()).hexdigest() + upstream_md5sum = None + origin_md5sum = None + with open(upstream_path, 'r') as f: + upstream_md5sum = hashlib.md5(f.read().encode()).hexdigest() + with open(origin_path, 'r') as f: + # skip header when calculating checksum for + # files where header is added in the origin version + + if file_type in files_with_header_added_in_origin: + f.readline() + + origin_md5sum = hashlib.md5(f.read().encode()).hexdigest() + + expected_md5sum_check = { 'sample_id': sample_id, 'file_type': file_type, @@ -88,6 +106,16 @@ def main(args): sample_ids = [ 'MK58361X-H3N2' ] + analysis_stages = [ + '00_normalize_depth', + '01_assemble_contigs', + '02_blast_contigs', + '03_scaffolding', + '04_read_mapping', + '05_variant_calling', + '06_report_variants', + '07_summary_reporting' + ] output_file_mapping_by_sample_id = {} for sample_id in sample_ids: output_file_mapping = { @@ -161,6 +189,11 @@ def main(args): "analysis_by_stage", "00_normalize_depth", f"{sample_id}-normalized_R2.fastq.gz")}, + 'assembly_contigs': {"upstream": os.path.join(sample_id, "spades_output", "contigs.fasta"), + "origin": os.path.join(sample_id, + "analysis_by_stage", + "01_assemble_contigs", + f"{sample_id}_contigs.fasta")}, 'alignment_sam': {"upstream": os.path.join(sample_id, "alignment.sam"), "origin": os.path.join(sample_id, "analysis_by_stage", @@ -176,6 +209,11 @@ def main(args): "analysis_by_stage", "02_blast_contigs", f"{sample_id}_contigs_blast.tsv")}, + 'mapping_refs': {"upstream": os.path.join(sample_id, f"{sample_id}_mapping_refs.fa"), + "origin": os.path.join(sample_id, + "analysis_by_stage", + "04_read_mapping", + f"{sample_id}_mapping_refs.fa")}, 'depth_of_cov_freebayes': {"upstream": os.path.join(sample_id, "depth_of_cov_freebayes.tsv"), "origin": os.path.join(sample_id, "analysis_by_stage", @@ -250,6 +288,11 @@ def main(args): 'alignment_sam', 'pileup_vcf', ] + files_with_header_added_in_origin = [ + 'contigs_blast', + 'scaffolds_blast_tsv', + + ] output_file_mapping_by_sample_id_for_md5sum_check = {} for sample_id, output_files in output_file_mapping_by_sample_id.items(): for file_type, paths_by_pipeline in output_files.items(): @@ -262,14 +305,18 @@ def main(args): expected_md5sums_match_checks = check_expected_md5sums_match( pipeline_outdirs, sample_ids, - output_file_mapping_by_sample_id_for_md5sum_check + output_file_mapping_by_sample_id_for_md5sum_check, + files_with_header_added_in_origin ) + def get_analysis_stage_number_from_origin_path(origin_path): + return origin_path.split(os.sep)[-2] + expected_md5sums_match_checks_sorted = sorted(expected_md5sums_match_checks, key=lambda x: get_analysis_stage_number_from_origin_path(x['origin_path'])) expected_md5sums_match_output_path = os.path.join(args.outdir, "check_md5sums_match.csv") with open(expected_md5sums_match_output_path, 'w') as f: writer = csv.DictWriter(f, fieldnames=expected_md5sums_match_checks[0].keys(), extrasaction='ignore') writer.writeheader() - for check in expected_md5sums_match_checks: + for check in expected_md5sums_match_checks_sorted: writer.writerow(check) all_expected_md5sums_match = all([check['md5sums_match'] for check in expected_md5sums_match_checks]) diff --git a/fluviewer/analysis.py b/fluviewer/analysis.py index ec854d8..deb7456 100644 --- a/fluviewer/analysis.py +++ b/fluviewer/analysis.py @@ -310,25 +310,7 @@ def filter_contig_blast_results(blast_results, outdir, out_name, identity, lengt :return: Filtered BLASTn results. :rtype: pd.DataFrame """ - log.info('Filtering contig alignments...') - total_num_blast_results = len(blast_results) - - filtered_blast_results = blast_results[blast_results['pident'] >= identity] - filtered_blast_results = filtered_blast_results[filtered_blast_results['length'] >= length] - num_blast_results_after_identity_and_length_filter = len(filtered_blast_results) - log.info(f'Found {num_blast_results_after_identity_and_length_filter} matches with at least {identity}% identity and {length} bp alignment length.') - - percent_blast_results_retained = num_blast_results_after_identity_and_length_filter / total_num_blast_results * 100 - log.info(f'Retained {percent_blast_results_retained:.2f}% matches for further analysis.') - - if len(filtered_blast_results) == 0: - log.error(f'No contigs aligned to reference sequences! Aborting analysis.') - error_code = 7 - exit(error_code) - - # Re-naming the filtered blast results to blast_results - # For compatibility with the rest of this function - blast_results = filtered_blast_results + log.info('Filtering contig BLAST alignments...') # # Annotate each ref seq with its segment and subtype. @@ -592,12 +574,29 @@ def blast_contigs(inputs, outdir, out_name, threads, identity, length): error_code = 6 return_code = run(terminal_command, outdir, out_name, process_name, error_code) - blast_results = pd.read_csv(blast_output_path, sep='\t') + full_blast_results = pd.read_csv(blast_output_path, sep='\t') + + total_num_blast_results = len(full_blast_results) + + if total_num_blast_results == 0: + log.error(f'No contigs aligned to reference sequences! Aborting analysis.') + error_code = 7 + exit(error_code) - total_num_blast_results = len(blast_results) - log.info('Contigs aligned to reference sequences.') + log.info('Contigs BLASTed against reference sequences.') log.info(f'Found {total_num_blast_results} total matches.') + blast_results = full_blast_results[full_blast_results['pident'] >= identity] + blast_results = blast_results[blast_results['length'] >= length] + + identity_and_length_filtered_blast_output_path = os.path.join(outdir, f'{out_name}_contigs_blast_identity_and_length_filtered.tsv') + blast_results.to_csv(identity_and_length_filtered_blast_output_path, sep='\t', index=False) + num_blast_results_after_identity_and_length_filter = len(blast_results) + log.info(f'Found {num_blast_results_after_identity_and_length_filter} matches with at least {identity}% identity and {length} bp alignment length.') + + percent_blast_results_retained = num_blast_results_after_identity_and_length_filter / total_num_blast_results * 100 + log.info(f'Retained {percent_blast_results_retained:.2f}% matches for further analysis.') + filtered_blast_results = filter_contig_blast_results(blast_results, outdir, out_name, identity, length) timestamp_analysis_complete = datetime.datetime.now().isoformat() @@ -617,6 +616,7 @@ def blast_contigs(inputs, outdir, out_name, threads, identity, length): outputs = { 'all_contig_blast_results': os.path.abspath(blast_output_path), + 'identity_and_length_filtered_contig_blast_results': os.path.abspath(identity_and_length_filtered_blast_output_path), 'filtered_contig_blast_results': os.path.abspath(filtered_blast_output_path), } @@ -651,7 +651,7 @@ def filter_scaffold_blast_results(blast_results): :return: Filtered BLASTn results. :rtype: pd.DataFrame """ - log.info('Filtering scaffold alignments...') + log.info('Filtering scaffold BLAST alignments...') # Remove reversed alignments (they should be artefactual at this point). # check number of reversed alignments @@ -987,7 +987,7 @@ def blast_scaffolds(inputs, outdir, out_name, threads): analysis_summary['error_message'] = error_messages_by_code[error_code] return analysis_summary - blast_results = pd.read_csv(blast_output, names=cols, sep='\t') + blast_results = pd.read_csv(blast_output, names=cols, sep='\t', na_filter=False) num_blast_results = len(blast_results) if num_blast_results == 0: log.error(f'No scaffolds aligned to reference sequences! ' @@ -999,12 +999,9 @@ def blast_scaffolds(inputs, outdir, out_name, threads): log.info('Scaffolds aligned to reference sequences.') log.info(f'Found {num_blast_results} total matches.') - for segment in blast_results['qseqid'].unique(): - segment_results = blast_results[blast_results['qseqid']==segment] - ref_seq = segment_results['sseqid'].values[0] - log.info(f'Selected reference sequence for segment {segment}: {ref_seq}') filtered_blast_results = filter_scaffold_blast_results(blast_results) + num_filtered_blast_results = len(filtered_blast_results) log.info(f'Remaining scaffold alignments after filtering: {num_filtered_blast_results}.') filtered_blast_output = os.path.join(outdir, f'{out_name}_scaffolds_blast_filtered.tsv') @@ -1095,6 +1092,7 @@ def make_map_ref(data_frame): blast_results = blast_results.groupby(group_cols).apply(make_map_ref) blast_results = blast_results.reset_index() blast_results.columns = ['sseqid', 'mapping_seq'] + # Annotate segment and subtype. get_segment = lambda row: row['sseqid'].split('|')[2].split('_')[0] blast_results['segment'] = blast_results.apply(get_segment, axis=1) diff --git a/fluviewer/fluviewer.py b/fluviewer/fluviewer.py index 27a6f1b..6d40a57 100644 --- a/fluviewer/fluviewer.py +++ b/fluviewer/fluviewer.py @@ -191,7 +191,6 @@ def main(): shutil.copy(src_path, dest_path) log.info(f'Published log file: {log_file} -> {dest_path}') - log.info(f'Analysis stage complete: {current_analysis_stage}')