Skip to content

Commit

Permalink
troubleshooting inconsistent outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
dfornika committed Jun 11, 2024
1 parent 9e35ea3 commit 5f36025
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 36 deletions.
61 changes: 54 additions & 7 deletions .github/scripts/check_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ def check_expected_files_exist(output_dirs, sample_ids, output_file_mapping_by_s
return expected_file_checks


def check_expected_md5sums_match(output_dirs, sample_ids, output_file_mapping_by_sample_id):
def check_expected_md5sums_match(
output_dirs,
sample_ids,
output_file_mapping_by_sample_id,
files_with_header_added_in_origin
):
"""
Check that the expected md5sums match the actual md5sums in the output directory.
Expand All @@ -54,17 +59,30 @@ def check_expected_md5sums_match(output_dirs, sample_ids, output_file_mapping_by
:type sample_ids: List[str]
:param output_file_mapping_by_sample_id: Dictionary with keys as sample IDs
and values as dictionaries.
:type output_file_mapping_by_sample_id: Dict[str, Dict[str, Dict[str, str]]]
:param files_with_header_added_in_origin: List of file types for which the header is added in the origin version of the file
:type files_with_header_added_in_origin: List[str]
:return: List of dictionaries with keys ['sample_id', 'file_type', 'upstream_path', 'origin_path', 'upstream_md5sum', 'origin_md5sum', 'md5sums_match']
"""
expected_md5sum_checks = []
for sample_id, output_files in output_file_mapping_by_sample_id.items():
for file_type, paths_by_pipeline in output_files.items():
upstream_path = os.path.join(output_dirs['upstream'], paths_by_pipeline['upstream'])
origin_path = os.path.join(output_dirs['origin'], paths_by_pipeline['origin'])
with open(upstream_path, 'rb') as f:
upstream_md5sum = hashlib.md5(f.read()).hexdigest()
with open(origin_path, 'rb') as f:
origin_md5sum = hashlib.md5(f.read()).hexdigest()
upstream_md5sum = None
origin_md5sum = None
with open(upstream_path, 'r') as f:
upstream_md5sum = hashlib.md5(f.read().encode()).hexdigest()
with open(origin_path, 'r') as f:
# skip header when calculating checksum for
# files where header is added in the origin version

if file_type in files_with_header_added_in_origin:
f.readline()

origin_md5sum = hashlib.md5(f.read().encode()).hexdigest()


expected_md5sum_check = {
'sample_id': sample_id,
'file_type': file_type,
Expand All @@ -88,6 +106,16 @@ def main(args):
sample_ids = [
'MK58361X-H3N2'
]
analysis_stages = [
'00_normalize_depth',
'01_assemble_contigs',
'02_blast_contigs',
'03_scaffolding',
'04_read_mapping',
'05_variant_calling',
'06_report_variants',
'07_summary_reporting'
]
output_file_mapping_by_sample_id = {}
for sample_id in sample_ids:
output_file_mapping = {
Expand Down Expand Up @@ -161,6 +189,11 @@ def main(args):
"analysis_by_stage",
"00_normalize_depth",
f"{sample_id}-normalized_R2.fastq.gz")},
'assembly_contigs': {"upstream": os.path.join(sample_id, "spades_output", "contigs.fasta"),
"origin": os.path.join(sample_id,
"analysis_by_stage",
"01_assemble_contigs",
f"{sample_id}_contigs.fasta")},
'alignment_sam': {"upstream": os.path.join(sample_id, "alignment.sam"),
"origin": os.path.join(sample_id,
"analysis_by_stage",
Expand All @@ -176,6 +209,11 @@ def main(args):
"analysis_by_stage",
"02_blast_contigs",
f"{sample_id}_contigs_blast.tsv")},
'mapping_refs': {"upstream": os.path.join(sample_id, f"{sample_id}_mapping_refs.fa"),
"origin": os.path.join(sample_id,
"analysis_by_stage",
"04_read_mapping",
f"{sample_id}_mapping_refs.fa")},
'depth_of_cov_freebayes': {"upstream": os.path.join(sample_id, "depth_of_cov_freebayes.tsv"),
"origin": os.path.join(sample_id,
"analysis_by_stage",
Expand Down Expand Up @@ -250,6 +288,11 @@ def main(args):
'alignment_sam',
'pileup_vcf',
]
files_with_header_added_in_origin = [
'contigs_blast',
'scaffolds_blast_tsv',

]
output_file_mapping_by_sample_id_for_md5sum_check = {}
for sample_id, output_files in output_file_mapping_by_sample_id.items():
for file_type, paths_by_pipeline in output_files.items():
Expand All @@ -262,14 +305,18 @@ def main(args):
expected_md5sums_match_checks = check_expected_md5sums_match(
pipeline_outdirs,
sample_ids,
output_file_mapping_by_sample_id_for_md5sum_check
output_file_mapping_by_sample_id_for_md5sum_check,
files_with_header_added_in_origin
)
def get_analysis_stage_number_from_origin_path(origin_path):
return origin_path.split(os.sep)[-2]
expected_md5sums_match_checks_sorted = sorted(expected_md5sums_match_checks, key=lambda x: get_analysis_stage_number_from_origin_path(x['origin_path']))

expected_md5sums_match_output_path = os.path.join(args.outdir, "check_md5sums_match.csv")
with open(expected_md5sums_match_output_path, 'w') as f:
writer = csv.DictWriter(f, fieldnames=expected_md5sums_match_checks[0].keys(), extrasaction='ignore')
writer.writeheader()
for check in expected_md5sums_match_checks:
for check in expected_md5sums_match_checks_sorted:
writer.writerow(check)
all_expected_md5sums_match = all([check['md5sums_match'] for check in expected_md5sums_match_checks])

Expand Down
54 changes: 26 additions & 28 deletions fluviewer/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,25 +310,7 @@ def filter_contig_blast_results(blast_results, outdir, out_name, identity, lengt
:return: Filtered BLASTn results.
:rtype: pd.DataFrame
"""
log.info('Filtering contig alignments...')
total_num_blast_results = len(blast_results)

filtered_blast_results = blast_results[blast_results['pident'] >= identity]
filtered_blast_results = filtered_blast_results[filtered_blast_results['length'] >= length]
num_blast_results_after_identity_and_length_filter = len(filtered_blast_results)
log.info(f'Found {num_blast_results_after_identity_and_length_filter} matches with at least {identity}% identity and {length} bp alignment length.')

percent_blast_results_retained = num_blast_results_after_identity_and_length_filter / total_num_blast_results * 100
log.info(f'Retained {percent_blast_results_retained:.2f}% matches for further analysis.')

if len(filtered_blast_results) == 0:
log.error(f'No contigs aligned to reference sequences! Aborting analysis.')
error_code = 7
exit(error_code)

# Re-naming the filtered blast results to blast_results
# For compatibility with the rest of this function
blast_results = filtered_blast_results
log.info('Filtering contig BLAST alignments...')

#
# Annotate each ref seq with its segment and subtype.
Expand Down Expand Up @@ -592,12 +574,29 @@ def blast_contigs(inputs, outdir, out_name, threads, identity, length):
error_code = 6
return_code = run(terminal_command, outdir, out_name, process_name, error_code)

blast_results = pd.read_csv(blast_output_path, sep='\t')
full_blast_results = pd.read_csv(blast_output_path, sep='\t')

total_num_blast_results = len(full_blast_results)

if total_num_blast_results == 0:
log.error(f'No contigs aligned to reference sequences! Aborting analysis.')
error_code = 7
exit(error_code)

total_num_blast_results = len(blast_results)
log.info('Contigs aligned to reference sequences.')
log.info('Contigs BLASTed against reference sequences.')
log.info(f'Found {total_num_blast_results} total matches.')

blast_results = full_blast_results[full_blast_results['pident'] >= identity]
blast_results = blast_results[blast_results['length'] >= length]

identity_and_length_filtered_blast_output_path = os.path.join(outdir, f'{out_name}_contigs_blast_identity_and_length_filtered.tsv')
blast_results.to_csv(identity_and_length_filtered_blast_output_path, sep='\t', index=False)
num_blast_results_after_identity_and_length_filter = len(blast_results)
log.info(f'Found {num_blast_results_after_identity_and_length_filter} matches with at least {identity}% identity and {length} bp alignment length.')

percent_blast_results_retained = num_blast_results_after_identity_and_length_filter / total_num_blast_results * 100
log.info(f'Retained {percent_blast_results_retained:.2f}% matches for further analysis.')

filtered_blast_results = filter_contig_blast_results(blast_results, outdir, out_name, identity, length)

timestamp_analysis_complete = datetime.datetime.now().isoformat()
Expand All @@ -617,6 +616,7 @@ def blast_contigs(inputs, outdir, out_name, threads, identity, length):

outputs = {
'all_contig_blast_results': os.path.abspath(blast_output_path),
'identity_and_length_filtered_contig_blast_results': os.path.abspath(identity_and_length_filtered_blast_output_path),
'filtered_contig_blast_results': os.path.abspath(filtered_blast_output_path),
}

Expand Down Expand Up @@ -651,7 +651,7 @@ def filter_scaffold_blast_results(blast_results):
:return: Filtered BLASTn results.
:rtype: pd.DataFrame
"""
log.info('Filtering scaffold alignments...')
log.info('Filtering scaffold BLAST alignments...')

# Remove reversed alignments (they should be artefactual at this point).
# check number of reversed alignments
Expand Down Expand Up @@ -987,7 +987,7 @@ def blast_scaffolds(inputs, outdir, out_name, threads):
analysis_summary['error_message'] = error_messages_by_code[error_code]
return analysis_summary

blast_results = pd.read_csv(blast_output, names=cols, sep='\t')
blast_results = pd.read_csv(blast_output, names=cols, sep='\t', na_filter=False)
num_blast_results = len(blast_results)
if num_blast_results == 0:
log.error(f'No scaffolds aligned to reference sequences! '
Expand All @@ -999,12 +999,9 @@ def blast_scaffolds(inputs, outdir, out_name, threads):

log.info('Scaffolds aligned to reference sequences.')
log.info(f'Found {num_blast_results} total matches.')
for segment in blast_results['qseqid'].unique():
segment_results = blast_results[blast_results['qseqid']==segment]
ref_seq = segment_results['sseqid'].values[0]
log.info(f'Selected reference sequence for segment {segment}: {ref_seq}')

filtered_blast_results = filter_scaffold_blast_results(blast_results)

num_filtered_blast_results = len(filtered_blast_results)
log.info(f'Remaining scaffold alignments after filtering: {num_filtered_blast_results}.')
filtered_blast_output = os.path.join(outdir, f'{out_name}_scaffolds_blast_filtered.tsv')
Expand Down Expand Up @@ -1095,6 +1092,7 @@ def make_map_ref(data_frame):
blast_results = blast_results.groupby(group_cols).apply(make_map_ref)
blast_results = blast_results.reset_index()
blast_results.columns = ['sseqid', 'mapping_seq']

# Annotate segment and subtype.
get_segment = lambda row: row['sseqid'].split('|')[2].split('_')[0]
blast_results['segment'] = blast_results.apply(get_segment, axis=1)
Expand Down
1 change: 0 additions & 1 deletion fluviewer/fluviewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ def main():
shutil.copy(src_path, dest_path)
log.info(f'Published log file: {log_file} -> {dest_path}')


log.info(f'Analysis stage complete: {current_analysis_stage}')


Expand Down

0 comments on commit 5f36025

Please sign in to comment.