From b1258ff5146f8ba80524f1d4ecbf12b55296a4f7 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Tue, 30 Apr 2024 12:06:55 +0200 Subject: [PATCH 01/17] Some refactoring and fixes Issues addressed here: - Off-target amplicons are now reported as intended in the logs - Off-target amplicons are now always considered last for the final scheme (no penalty is used, but the fact that they had BLAST matches gets recorded) - Amplicon numbering proceeds from 5' to 3' even across pools - In the primer bed file, primers are now ordered according to the amplicon number without taking the pool into account For this, the internal representation of amplicon schemes had to be unified across the different modes and steps of the analysis. --- varvamp/command.py | 21 ++- varvamp/scripts/blast.py | 76 ++++------- varvamp/scripts/qpcr.py | 53 ++++---- varvamp/scripts/reporting.py | 248 ++++++++++++++++++++--------------- varvamp/scripts/scheme.py | 145 ++++++++++---------- 5 files changed, 271 insertions(+), 272 deletions(-) diff --git a/varvamp/command.py b/varvamp/command.py index ee9159b..5dbda5f 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -342,7 +342,7 @@ def single_workflow(args, amplicons, all_primers, log_file): log_file, progress=0.9, job="Finding amplicons with low penalties.", - progress_text=f"{len(amplicon_scheme[0])} amplicons." + progress_text=f"{len(amplicon_scheme)} amplicons." ) return amplicon_scheme @@ -359,8 +359,7 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida # search for amplicon scheme coverage, amplicon_scheme = scheme.find_best_covering_scheme( amplicons, - amplicon_graph, - all_primers + amplicon_graph ) # check for dimers @@ -382,7 +381,7 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida log_file, progress=0.9, job="Creating amplicon scheme.", - progress_text=f"{percent_coverage} % total coverage with {len(amplicon_scheme[0]) + len(amplicon_scheme[1])} amplicons" + progress_text=f"{percent_coverage} % total coverage with {len(amplicon_scheme)} amplicons" ) if percent_coverage < 70: logging.raise_error( @@ -452,7 +451,7 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori # create blast query query_path = blast.create_BLAST_query_qpcr(qpcr_scheme_candidates, data_dir) # perform primer blast - amplicons, off_target_amplicons = blast.primer_blast( + qpcr_scheme_candidates, off_target_amplicons = blast.primer_blast( data_dir, args.database, query_path, @@ -470,9 +469,6 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori log_file, exit=True ) - # report potential blast warnings - if args.database is not None: - blast.write_BLAST_warning(off_target_amplicons, final_schemes, log_file) logging.varvamp_progress( log_file, progress=0.9, @@ -533,15 +529,15 @@ def main(sysargs=sys.argv[1:]): log_file, results_dir ) - if args.database is not None: - blast.write_BLAST_warning(off_target_amplicons, amplicon_scheme, log_file) # write files + amplicon_scheme.sort(key=lambda x: x["LEFT"][1]) reporting.write_all_primers(data_dir, all_primers) reporting.write_scheme_to_files( results_dir, amplicon_scheme, ambiguous_consensus, - args.mode + args.mode, + log_file ) reporting.varvamp_plot( results_dir, @@ -565,8 +561,9 @@ def main(sysargs=sys.argv[1:]): log_file ) # write files + final_schemes.sort(key=lambda x: x["LEFT"][1]) reporting.write_regions_to_bed(probe_regions, data_dir, "probe") - reporting.write_qpcr_to_files(results_dir, final_schemes, ambiguous_consensus) + reporting.write_qpcr_to_files(results_dir, final_schemes, ambiguous_consensus, log_file) reporting.varvamp_plot( results_dir, alignment_cleaned, diff --git a/varvamp/scripts/blast.py b/varvamp/scripts/blast.py index 943ca5d..08a54b4 100644 --- a/varvamp/scripts/blast.py +++ b/varvamp/scripts/blast.py @@ -33,19 +33,16 @@ def create_BLAST_query(all_primers, amplicons, data_dir): """ create a query for the BLAST search (tiled, single mode) """ - already_written = [] + already_written = set() query_path = os.path.join(data_dir, "BLAST_query.fasta") with open(query_path, "w") as query: for amp in amplicons: - fw_primer, rv_primer = amplicons[amp][2], amplicons[amp][3] - if fw_primer not in already_written: - print(f">{fw_primer}\n{all_primers['+'][fw_primer][0]}", file=query) - already_written.append(fw_primer) - if rv_primer not in already_written: - print(f">{rv_primer}\n{all_primers['-'][rv_primer][0]}", file=query) - already_written.append(rv_primer) - + for primer_type in ["LEFT", "RIGHT"]: + name = f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}" + if name not in already_written: + print(f">{name}\n{amp[primer_type][0]}", file=query) + already_written.add(name) return query_path @@ -53,17 +50,16 @@ def create_BLAST_query_qpcr(qpcr_scheme_candidates, data_dir): """ create a query for the BLAST search (qpcr mode) """ - already_written = [] + already_written = set() query_path = os.path.join(data_dir, "BLAST_query.fasta") with open(query_path, "w") as query: for amp in qpcr_scheme_candidates: for primer_type in ["PROBE", "LEFT", "RIGHT"]: - name = f"{primer_type}_{qpcr_scheme_candidates[amp][primer_type][1]}_{qpcr_scheme_candidates[amp][primer_type][2]}" - if name in already_written: - continue - print(f">{name}\n{qpcr_scheme_candidates[amp][primer_type][0]}", file=query) - already_written.append(name) + name = f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}" + if name not in already_written: + print(f">{name}\n{amp[primer_type][0]}", file=query) + already_written.add(name) return query_path @@ -168,21 +164,24 @@ def predict_non_specific_amplicons_worker(amp, blast_df, max_length, mode): """ Worker function to predict unspecific targets for a single amplicon. """ - name, data = amp # get correct primers if mode == "single_tiled": - primers = [data[2], data[3]] + primer_types = ["LEFT", "RIGHT"] elif mode == "qpcr": - primers = [] - for primer_type in ["PROBE", "LEFT", "RIGHT"]: - primers.append(f"{primer_type}_{data[primer_type][1]}_{data[primer_type][2]}") + primer_types = ["PROBE", "LEFT", "RIGHT"] + primers = [] + for primer_type in primer_types: + primers.append(f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}") # subset df for primers df_amp_primers = blast_df[blast_df["query"].isin(primers)] # sort by reference and ref start df_amp_primers_sorted = df_amp_primers.sort_values(["ref", "ref_start"]) # check for off-targets for specific primers if check_off_targets(df_amp_primers_sorted, max_length, primers): - return name + amp["off_targets"] = True + else: + amp["off_targets"] = False + return amp def predict_non_specific_amplicons(amplicons, blast_df, max_length, mode, n_threads): @@ -190,22 +189,16 @@ def predict_non_specific_amplicons(amplicons, blast_df, max_length, mode, n_thre Main function to predict unspecific targets within a size range and give these primers a high penalty. Uses multiprocessing for parallelization. """ - off_targets = [] # process amplicons concurrently with multiprocessing.Pool(processes=n_threads) as pool: - amp_items = amplicons.items() - results = pool.starmap(predict_non_specific_amplicons_worker, [(amp, blast_df, max_length, mode) for amp in amp_items]) - # check results - for off_target in results: - if off_target is None: - continue - off_targets.append(off_target) - if mode == "single_tiled": - amplicons[off_target][5] = amplicons[off_target][5] + config.BLAST_PENALTY - elif mode == "qpcr": - amplicons[off_target]["penalty"] = amplicons[off_target]["penalty"] + config.BLAST_PENALTY - - return off_targets, amplicons + annotated_amps = [ + result for result in pool.starmap( + predict_non_specific_amplicons_worker, + [(amp, blast_df, max_length, mode) for amp in amplicons] + ) if result is not None + ] + off_targets = {amp["id"] for amp in annotated_amps if amp["off_targets"]} + return off_targets, annotated_amps def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log_file, mode): @@ -255,16 +248,3 @@ def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log return amplicons, off_target_amplicons - -def write_BLAST_warning(off_target_amplicons, amplicon_scheme, log_file): - """ - for each primer pair that has potential unspecific amplicons - write warnings to file. - """ - for amp in off_target_amplicons: - if amp in amplicon_scheme: - logging.raise_error( - f"{amp} could produce off-targets. No better amplicon in this area was found.", - log_file, - exit=False, - ) diff --git a/varvamp/scripts/qpcr.py b/varvamp/scripts/qpcr.py index 424136c..858765f 100644 --- a/varvamp/scripts/qpcr.py +++ b/varvamp/scripts/qpcr.py @@ -258,7 +258,7 @@ def find_qcr_schemes(qpcr_probes, left_primer_candidates, right_primer_candidate there is no need to consider this primer probe combination. """ - qpcr_scheme_candidates = {} + qpcr_scheme_candidates = [] found_amplicons = [] amplicon_nr = -1 @@ -279,15 +279,16 @@ def find_qcr_schemes(qpcr_probes, left_primer_candidates, right_primer_candidate # populate the primer dictionary: amplicon_nr += 1 found_amplicons.append(primer_combination) - qpcr_scheme_candidates[f"AMPLICON_{amplicon_nr}"] = { - "penalty": qpcr_probes[probe][3] + primer_combination[0][3] + primer_combination[1][3], - "PROBE": qpcr_probes[probe], - "LEFT": primer_combination[0], - "RIGHT": primer_combination[1] - } + qpcr_scheme_candidates.append( + { + "id": f"AMPLICON_{amplicon_nr}", + "penalty": qpcr_probes[probe][3] + primer_combination[0][3] + primer_combination[1][3], + "PROBE": qpcr_probes[probe], + "LEFT": primer_combination[0], + "RIGHT": primer_combination[1] + } + ) # and again sort by total penalty (left + right + probe) - qpcr_scheme_candidates = dict(sorted(qpcr_scheme_candidates.items(), key=lambda x: x[1]["penalty"])) - return qpcr_scheme_candidates @@ -296,21 +297,20 @@ def process_single_amplicon_deltaG(amplicon, majority_consensus): Process a single amplicon to test its deltaG and apply filtering. This function will be called concurrently by multiple threads. """ - name, data = amplicon - start = data["LEFT"][1] - stop = data["RIGHT"][2] + start = amplicon["LEFT"][1] + stop = amplicon["RIGHT"][2] seq = majority_consensus[start:stop] seq = seq.replace("N", "") seq = seq.replace("n", "") amp_positions = list(range(start, stop + 1)) # check if the amplicon overlaps with an amplicon that was previously # found and had a high enough deltaG - min_temp = min((primers.calc_temp(data["LEFT"][0]), - primers.calc_temp(data["RIGHT"][0]))) + min_temp = min((primers.calc_temp(amplicon["LEFT"][0]), + primers.calc_temp(amplicon["RIGHT"][0]))) # calculate deltaG at the minimal primer temp - deltaG = seqfold.dg(seq, min_temp) + amplicon["deltaG"] = seqfold.dg(seq, min_temp) - return deltaG, amp_positions, name + return amp_positions, amplicon def test_amplicon_deltaG_parallel(qpcr_schemes_candidates, majority_consensus, n_to_test, deltaG_cutoff, n_threads): @@ -319,29 +319,30 @@ def test_amplicon_deltaG_parallel(qpcr_schemes_candidates, majority_consensus, n and filters if they fall below the cutoff. Multiple processes are used for processing amplicons in parallel. """ - final_schemes = {} - passed_counter = 0 # counter for re-naming amplicons that passed deltaG cutoff + final_amplicons = [] amplicon_set = set() # Create a pool of processes to handle the concurrent processing with multiprocessing.Pool(processes=n_threads) as pool: # Create a list of the first n amplicon tuples for processing - amplicons = itertools.islice(qpcr_schemes_candidates.items(), n_to_test) + # The list is sorted first on whether offset targets were predicted for the amplicon, + # then by penalty. This ensures that amplicons with offset targets are always considered last + amplicons = itertools.islice( + sorted(qpcr_schemes_candidates, key=lambda x: (x.get("offset_targets", False), x["penalty"])), + n_to_test + ) # process amplicons concurrently results = pool.starmap(process_single_amplicon_deltaG, [(amp, majority_consensus) for amp in amplicons]) # Process the results - for deltaG, amp_positions, amp_name in results: + for amp_positions, amp in results: # check if the amplicon overlaps with an amplicon that was previously # found and had a high enough deltaG if any(x in amp_positions for x in amplicon_set): continue # and if this passes cutoff make a dict entry and do not allow further # amplicons in that region (they will have a lower penalty) - if deltaG > deltaG_cutoff: - new_name = f"QPCR_SCHEME_{passed_counter}" - final_schemes[new_name] = qpcr_schemes_candidates[amp_name] - final_schemes[new_name]["deltaG"] = deltaG + if amp["deltaG"] > deltaG_cutoff: + final_amplicons.append(amp) amplicon_set.update(amp_positions) - passed_counter += 1 - return final_schemes + return final_amplicons diff --git a/varvamp/scripts/reporting.py b/varvamp/scripts/reporting.py index d379be6..7034ee3 100644 --- a/varvamp/scripts/reporting.py +++ b/varvamp/scripts/reporting.py @@ -16,6 +16,7 @@ # varVAMP from varvamp.scripts import primers from varvamp.scripts import config +from varvamp.scripts import logging def write_fasta(path, seq_id, seq): @@ -48,10 +49,9 @@ def write_regions_to_bed(primer_regions, path, mode=None): outfile = f"{path}probe_regions.bed" else: outfile = f"{path}primer_regions.bed" - counter = 0 with open(outfile, 'w') as o: - for region in primer_regions: + for counter, region in enumerate(primer_regions): print( "ambiguous_consensus", region[0], @@ -60,7 +60,6 @@ def write_regions_to_bed(primer_regions, path, mode=None): sep="\t", file=o ) - counter += 1 def write_primers_to_bed(outfile, primer_name, primer_properties, direction): @@ -121,7 +120,7 @@ def calc_mean_stats(permutations): return round(gc/len(permutations), 1), round(temp/len(permutations), 1) -def write_qpcr_to_files(path, final_schemes, ambiguous_consensus): +def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, log_file): """ write all relevant bed files and tsv file for the qPCR design """ @@ -138,28 +137,38 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus): file=tsv2 ) print( - "qpcr_scheme\tpenalty\tdeltaG\tlength\tstart\tstop\tseq", + "qpcr_scheme\toff_target\tpenalty\tdeltaG\tlength\tstart\tstop\tseq", file=tsv ) - for scheme in final_schemes: + for n, amp in enumerate(final_schemes): + amp_name = f"QPCR_SCHEME_{n}" # write bed amplicon file print( "ambiguous_consensus", - final_schemes[scheme]["LEFT"][1], - final_schemes[scheme]["RIGHT"][2], - scheme, - round(final_schemes[scheme]["penalty"], 1), + amp["LEFT"][1], + amp["RIGHT"][2], + amp_name, + round(amp["penalty"], 1), sep="\t", file=bed ) # write tsv - amplicon_start = final_schemes[scheme]["LEFT"][1] - amplicon_stop = final_schemes[scheme]["RIGHT"][2] + amplicon_start = amp["LEFT"][1] + amplicon_stop = amp["RIGHT"][2] + if "off_targets" in amp: + if amp["off_targets"]: + amplicon_has_off_target = "Yes" + write_BLAST_warning(amp_name, log_file) + else: + amplicon_has_off_target = "No" + else: + amplicon_has_off_target = "NA" amplicon_seq = ambiguous_consensus[amplicon_start:amplicon_stop] print( - scheme, - round(final_schemes[scheme]["penalty"], 1), - final_schemes[scheme]["deltaG"], + amp_name, + amplicon_has_off_target, + round(amp["penalty"], 1), + amp["deltaG"], len(amplicon_seq), amplicon_start + 1, amplicon_stop, @@ -168,11 +177,11 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus): file=tsv ) # write tsv2 - for oligo_type in final_schemes[scheme]: + for oligo_type in ["PROBE", "LEFT", "RIGHT"]: if oligo_type == "penalty" or oligo_type == "deltaG": continue - seq = ambiguous_consensus[final_schemes[scheme][oligo_type][1]:final_schemes[scheme][oligo_type][2]] - if oligo_type == "RIGHT" or all([oligo_type == "PROBE", final_schemes[scheme]["PROBE"][5] == "-"]): + seq = ambiguous_consensus[amp[oligo_type][1]:amp[oligo_type][2]] + if oligo_type == "RIGHT" or (oligo_type == "PROBE" and amp["PROBE"][5] == "-"): seq = primers.rev_complement(seq) direction = "-" else: @@ -182,32 +191,32 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus): gc, temp = calc_mean_stats(permutations) print( - scheme, + amp_name, oligo_type, - final_schemes[scheme][oligo_type][1] + 1, - final_schemes[scheme][oligo_type][2], + amp[oligo_type][1] + 1, + amp[oligo_type][2], seq.upper(), len(seq), - round(primers.calc_gc(final_schemes[scheme][oligo_type][0]), 1), - round(primers.calc_temp(final_schemes[scheme][oligo_type][0]), 1), + round(primers.calc_gc(amp[oligo_type][0]), 1), + round(primers.calc_temp(amp[oligo_type][0]), 1), gc, temp, - round(final_schemes[scheme][oligo_type][3], 1), + round(amp[oligo_type][3], 1), sep="\t", file=tsv2 ) # write primer bed file write_primers_to_bed( primer_bed_file, - f"{scheme}_{oligo_type}", - final_schemes[scheme][oligo_type], + f"{amp_name}_{oligo_type}", + amp[oligo_type], direction ) # write fasta - print(f">{scheme}_{oligo_type}\n{seq.upper()}", file=fasta) + print(f">{amp_name}_{oligo_type}\n{seq.upper()}", file=fasta) -def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode): +def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode, log_file): """ write all relevant bed files and a tsv file with all primer stats """ @@ -216,8 +225,6 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode): amplicon_bed_file = os.path.join(path, "amplicons.bed") tabular_file = os.path.join(path, "primer_to_amplicon_assignment.tabular") - counter = 0 - # open files to write with open(tsv_file, "w") as tsv, open(amplicon_bed_file, "w") as bed, open(tabular_file, "w") as tabular: # write header for primer tsv @@ -225,46 +232,47 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode): "amlicon_name\tamplicon_length\tprimer_name\talternate_primer_name\tpool\tstart\tstop\tseq\tsize\tgc_best\ttemp_best\tmean_gc\tmean_temp\tpenalty", file=tsv ) - - for pool in amplicon_scheme: + amplicon_bed_records = [] + primer_bed_records = [] + primer_assignment_records = [] + pools = {amp.get("pool", 0) for amp in amplicon_scheme} + for pool in pools: if mode == "single": primer_fasta_file = os.path.join(path, "primers.fasta") else: primer_fasta_file = os.path.join(path, f"primers_pool_{pool}.fasta") with open(primer_fasta_file, "w") as primer_fasta: - for amp in amplicon_scheme[pool]: + for counter, amp in enumerate(amplicon_scheme[pool::len(pools)]): # give a new amplicon name - new_name = f"AMPLICON_{str(counter)}" - counter += 1 + amplicon_index = counter*len(pools) + pool + new_name = f"AMPLICON_{amplicon_index}" # get left and right primers and their names - primer_names = list(amplicon_scheme[pool][amp].keys()) - left = (primer_names[0], amplicon_scheme[pool][amp][primer_names[0]]) - right = (primer_names[1], amplicon_scheme[pool][amp][primer_names[1]]) - amp_length = right[1][2] - left[1][1] + amp_length = amp["RIGHT"][2] - amp["LEFT"][1] + if amp.get("off_targets"): + write_BLAST_warning(new_name, log_file) # write amplicon bed if mode == "tiled": bed_score = pool elif mode == "single": - bed_score = round(left[1][3] + right[1][3], 1) - print( - "ambiguous_consensus", - left[1][1], - right[1][2], - new_name, - bed_score, - sep="\t", - file=bed + bed_score = round(amp["LEFT"][3] + amp["RIGHT"][3], 1) + amplicon_bed_records.append( + ( + amp["LEFT"][1], + amp["RIGHT"][2], + new_name, + bed_score + ) ) - # write primer assignments tabular file - print( - f"{new_name}_LEFT", - f"{new_name}_RIGHT", - sep="\t", - file=tabular + primer_assignment_records.append( + ( + # will need amplicon_index for sorting + amplicon_index, + (f"{new_name}_LEFT", f"{new_name}_RIGHT") + ) ) # write primer tsv and primer bed - for direction, primer in [("+", left), ("-", right)]: - seq = ambiguous_consensus[primer[1][1]:primer[1][2]] + for direction, primer in [("+", amp["LEFT"]), ("-", amp["RIGHT"])]: + seq = ambiguous_consensus[primer[1]:primer[2]] if direction == "-": seq = primers.rev_complement(seq) primer_name = f"{new_name}_RIGHT" @@ -280,27 +288,48 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode): new_name, amp_length, primer_name, - primer[0], + primer[-1], pool, - primer[1][1] + 1, - primer[1][2], + primer[1] + 1, + primer[2], seq.upper(), - len(primer[1][0]), - round(primers.calc_gc(primer[1][0]), 1), - round(primers.calc_temp(primer[1][0]), 1), + len(primer[0]), + round(primers.calc_gc(primer[0]), 1), + round(primers.calc_temp(primer[0]), 1), gc, temp, - round(primer[1][3], 1), + round(primer[3], 1), sep="\t", file=tsv ) - # write primer bed file - write_primers_to_bed( - primer_bed_file, - primer_name, - primer[1], - direction + primer_bed_records.append( + ( + # will need amplicon_index for sorting + amplicon_index, + (primer_name, primer, direction) + ) ) + # write amplicon bed with amplicons sorted by start position + for record in sorted(amplicon_bed_records, key=lambda x: x[0]): + print( + "ambiguous_consensus", + *record, + sep="\t", + file=bed + ) + # use sorting by amplicon index for primer assignment file + for record in sorted(primer_assignment_records): + print( + *record[1], + sep="\t", + file=tabular + ) + # same for primer bed + for record in sorted(primer_bed_records): + write_primers_to_bed( + primer_bed_file, + *record[1] + ) def write_dimers(path, primer_dimers): @@ -418,26 +447,23 @@ def amplicon_subplot(ax, amplicon_scheme): """ creates the amplicon subplot """ - counter = 0 - for pool in amplicon_scheme: - for amp in amplicon_scheme[pool]: - if pool == 0: - position_amp = 0.7 - position_text = 0.6 - elif pool == 1: - position_amp = 0.6 - position_text = 0.65 - primer_names = [i for i in amplicon_scheme[pool][amp]] - left = amplicon_scheme[pool][amp][primer_names[0]] - right = amplicon_scheme[pool][amp][primer_names[1]] - # amplicons - ax[1].hlines(position_amp, left[1], right[2], linewidth=5) - # text - ax[1].text(right[2] - (right[2]-left[1])/2, position_text, str(counter), fontsize=8) - # primers - ax[1].hlines(position_amp, left[1], left[2], linewidth=5, color="red") - ax[1].hlines(position_amp, right[1], right[2], linewidth=5, color="red") - counter += 1 + for counter, amp in enumerate(amplicon_scheme): + pool = amp.get("pool", 0) + if pool == 0: + position_amp = 0.7 + position_text = 0.6 + elif pool == 1: + position_amp = 0.6 + position_text = 0.65 + left = amp["LEFT"] + right = amp["RIGHT"] + # amplicons + ax[1].hlines(position_amp, left[1], right[2], linewidth=5) + # text + ax[1].text(right[2] - (right[2]-left[1])/2, position_text, str(counter), fontsize=8) + # primers + ax[1].hlines(position_amp, left[1], left[2], linewidth=5, color="red") + ax[1].hlines(position_amp, right[1], right[2], linewidth=5, color="red") # legends ax[1].hlines(position_amp, left[1]+config.PRIMER_SIZES[1], right[2]-config.PRIMER_SIZES[1], linewidth=5, label="amplicons") ax[1].hlines(position_amp, left[1], left[2], linewidth=5, color="red", label="primers") @@ -447,12 +473,10 @@ def qpcr_subplot(ax, amplicon_scheme): """ creates the qpcr subplot """ - counter = 0 - - for scheme in amplicon_scheme: - left = amplicon_scheme[scheme]["LEFT"] - right = amplicon_scheme[scheme]["RIGHT"] - probe = amplicon_scheme[scheme]["PROBE"] + for counter, amp in enumerate(amplicon_scheme): + left = amp["LEFT"] + right = amp["RIGHT"] + probe = amp["PROBE"] # amplicons ax[1].hlines(0.8, left[1], right[2], linewidth=5) # text @@ -463,7 +487,6 @@ def qpcr_subplot(ax, amplicon_scheme): # probe ax[1].hlines(0.75, probe[1], probe[2], linewidth=5, color="darkgrey") - counter += 1 # legends ax[1].hlines(0.8, left[1]+config.PRIMER_SIZES[1], right[2]-config.PRIMER_SIZES[1], linewidth=5, label="amplicons") ax[1].hlines(0.8, left[1], left[2], linewidth=5, color="red", label="primers") @@ -515,13 +538,10 @@ def get_SINGLE_TILED_primers_for_plot(amplicon_scheme): """ amplicon_primers = [] - for pool in amplicon_scheme: - for amp in amplicon_scheme[pool]: - primer_names = [i for i in amplicon_scheme[pool][amp]] - left = amplicon_scheme[pool][amp][primer_names[0]] - right = amplicon_scheme[pool][amp][primer_names[1]] - amplicon_primers.append((primer_names[0], left)) - amplicon_primers.append((primer_names[1], right)) + for counter, amp in enumerate(amplicon_scheme): + for type in ["LEFT", "RIGHT"]: + primer_name = f"AMPLICON_{counter}_{type}" + amplicon_primers.append((primer_name, amp[type])) return amplicon_primers @@ -532,12 +552,10 @@ def get_QPCR_primers_for_plot(amplicon_schemes): """ amplicon_primers = [] - for scheme in amplicon_schemes: - for type in amplicon_schemes[scheme]: - if type == "penalty" or type == "deltaG": - continue - primer_name = f"{scheme}_{type}" - amplicon_primers.append((primer_name, amplicon_schemes[scheme][type])) + for counter, amp in enumerate(amplicon_schemes): + for type in ["PROBE", "LEFT", "RIGHT"]: + primer_name = f"QPCR_SCHEME_{counter}_{type}" + amplicon_primers.append((primer_name, amp[type])) return amplicon_primers @@ -581,3 +599,15 @@ def per_base_mismatch_plot(path, amplicon_scheme, threshold, mode="SINGLE/TILED" # - to pdf pdf.savefig(fig, bbox_inches='tight') plt.close() + + +def write_BLAST_warning(amplicon_name, log_file): + """ + for each primer pair that has potential unspecific amplicons + write warnings to file. + """ + logging.raise_error( + f"{amplicon_name} could produce off-targets. No better amplicon in this area was found.", + log_file, + exit=False, + ) diff --git a/varvamp/scripts/scheme.py b/varvamp/scripts/scheme.py index fb831ad..40c6ef1 100644 --- a/varvamp/scripts/scheme.py +++ b/varvamp/scripts/scheme.py @@ -67,7 +67,7 @@ def find_amplicons(all_primers, opt_len, max_len): finds all possible amplicons, creates a dictionary """ amplicon_number = 0 - amplicon_dict = {} + amplicons = [] for left_name in all_primers["+"]: left_primer = all_primers["+"][left_name] @@ -82,17 +82,17 @@ def find_amplicons(all_primers, opt_len, max_len): # penalty multiplied by the e^(fold length of the optimal length). amplicon_costs = (right_primer[3] + left_primer[3])*math.exp(amplicon_length/opt_len) amplicon_name = "AMPLICON_"+str(amplicon_number) - amplicon_dict[amplicon_name] = [ - left_primer[1], # start - right_primer[2], # stop - left_name, # name left primer - right_name, # name right primer - amplicon_length, # amplicon length - amplicon_costs # costs - ] + amplicons.append( + { + "id": amplicon_name, + "penalty": amplicon_costs, + "length": amplicon_length, + "LEFT": left_primer + [left_name], + "RIGHT": right_primer + [right_name], + } + ) amplicon_number += 1 - - return amplicon_dict + return amplicons def create_amplicon_graph(amplicons, min_overlap): @@ -107,24 +107,22 @@ def create_amplicon_graph(amplicons, min_overlap): # before the min overlap min_overlap = min_overlap + config.PRIMER_SIZES[2] - for current in amplicons: + for current_amplicon in amplicons: # remember all vertices - nodes.append(current) - current_amplicon = amplicons[current] - start = current_amplicon[0] + current_amplicon[4]/2 - stop = current_amplicon[1] - min_overlap - for next_amp in amplicons: - next_amplicon = amplicons[next_amp] + amplicon_id = current_amplicon["id"] + nodes.append(amplicon_id) + start = current_amplicon["LEFT"][1] + current_amplicon["length"]/2 + stop = current_amplicon["RIGHT"][2] - min_overlap + for next_amplicon in amplicons: # check if the next amplicon lies within the start/stop range of # the current amplicon and if its non-overlapping part is large # enough to ensure space for a primer and the min overlap of the # following amplicon. - if not all([start <= next_amplicon[0] <= stop, next_amplicon[1] > current_amplicon[1] + next_amplicon[4]/2]): - continue - if current not in amplicon_graph: - amplicon_graph[current] = {next_amp: next_amplicon[5]} - else: - amplicon_graph[current][next_amp] = next_amplicon[5] + if start <= next_amplicon["LEFT"][1] <= stop and next_amplicon["RIGHT"][2] > current_amplicon["RIGHT"][2] + next_amplicon["length"]/2: + if amplicon_id not in amplicon_graph: + amplicon_graph[amplicon_id] = {next_amplicon["id"]: next_amplicon["penalty"]} + else: + amplicon_graph[amplicon_id][next_amplicon["id"]] = next_amplicon["penalty"] # return a graph object return Graph(nodes, amplicon_graph) @@ -167,11 +165,12 @@ def get_end_node(previous_nodes, shortest_path, amplicons): for node in previous_nodes.keys(): # check if a node has a larger stop -> empty dict and set new # best stop nucleotide - if amplicons[node][1] > stop_nucleotide: + amplicon_stop = amplicons[node]["RIGHT"][2] + if amplicon_stop > stop_nucleotide: possible_end_nodes = {node: shortest_path[node]} - stop_nucleotide = amplicons[node][1] + stop_nucleotide = amplicon_stop # if nodes have the same stop nucleotide, add to dictionary - elif amplicons[node][1] == stop_nucleotide: + elif amplicon_stop == stop_nucleotide: possible_end_nodes[node] = shortest_path[node] # return the end node with the lowest penalty costs @@ -196,49 +195,44 @@ def get_min_path(previous_nodes, start_node, target_node): return path[::-1] -def create_scheme_dic(amplicon_scheme, amplicons, all_primers): +def create_scheme_dic(amplicon_path, amplicons_by_id): """ creates the final scheme dictionary """ - - scheme_dictionary = { - 0: {}, - 1: {} - } + amplicon_scheme = [] for pool in (0, 1): - for amp in amplicon_scheme[pool::2]: - scheme_dictionary[pool][amp] = {} - primer_pair = [amplicons[amp][2], amplicons[amp][3]] - scheme_dictionary[pool][amp][primer_pair[0]] = all_primers["+"][primer_pair[0]] - scheme_dictionary[pool][amp][primer_pair[1]] = all_primers["-"][primer_pair[1]] + for amp_id in amplicon_path[pool::2]: + amplicons_by_id[amp_id]["pool"] = pool + amplicon_scheme.append(amplicons_by_id[amp_id]) - return scheme_dictionary + return amplicon_scheme -def find_best_covering_scheme(amplicons, amplicon_graph, all_primers): +def find_best_covering_scheme(amplicons, amplicon_graph): """ this brute forces the amplicon scheme search until the largest coverage with the minimal costs is achieved. """ # ini best_coverage = 0 - max_stop = max(amplicons.items(), key=lambda x: x[1])[1][1] + max_stop = max(amplicons, key=lambda x: x["RIGHT"][2])["RIGHT"][2] lowest_costs = float('infinity') - - for start_node in amplicons: + # a dit for fast access to amplicons by their ID + amps_by_id = {amp["id"]: amp for amp in amplicons} + for amplicon in amplicons: # if the currently best coverage + start nucleotide of the currently tested amplicon # is smaller than the maximal stop nucleotide there might be a better amplicon # scheme that covers more of the genome - if amplicons[start_node][0] + best_coverage <= max_stop: - previous_nodes, shortest_path = dijkstra_algorithm(amplicon_graph, start_node) + if amplicon["LEFT"][1] + best_coverage <= max_stop: + previous_nodes, shortest_path = dijkstra_algorithm(amplicon_graph, amplicon["id"]) # only continue if there are previous_nodes if previous_nodes: - target_node, costs = get_end_node(previous_nodes, shortest_path, amplicons) - coverage = amplicons[target_node][1] - amplicons[start_node][0] + target_node, costs = get_end_node(previous_nodes, shortest_path, amps_by_id) + coverage = amps_by_id[target_node]["RIGHT"][2] - amplicon["LEFT"][1] # if the new coverage is larger, go for the larger coverage if coverage > best_coverage: - best_start_node = start_node + best_start_node = amplicon["id"] best_target_node = target_node best_previous_nodes = previous_nodes lowest_costs = costs @@ -246,18 +240,18 @@ def find_best_covering_scheme(amplicons, amplicon_graph, all_primers): # if the coverages are identical, go for the lowest costs elif coverage == best_coverage: if costs < lowest_costs: - best_start_node = start_node + best_start_node = amplicon["id"] best_target_node = target_node best_previous_nodes = previous_nodes lowest_costs = costs best_coverage = coverage else: # check if the single amplicon has the largest coverage so far - coverage = amplicons[start_node][1] - amplicons[start_node][0] + coverage = amplicon["length"] if coverage > best_coverage: - best_start_node = start_node + best_start_node = amplicon["id"] best_previous_nodes = previous_nodes - lowest_costs = amplicons[start_node][5] + lowest_costs = amplicon["penalty"] best_coverage = coverage # no need to check more, the best covering amplicon scheme was found and # has the minimal costs compared to the schemes with the same coverage @@ -265,13 +259,13 @@ def find_best_covering_scheme(amplicons, amplicon_graph, all_primers): break if best_previous_nodes: - amplicon_scheme = get_min_path(best_previous_nodes, best_start_node, best_target_node) + amplicon_path = get_min_path(best_previous_nodes, best_start_node, best_target_node) else: # if no previous nodes are found but the single amplicon results in the largest # coverage - return as the best scheme - amplicon_scheme = [best_start_node] + amplicon_path = [best_start_node] - return best_coverage, create_scheme_dic(amplicon_scheme, amplicons, all_primers) + return best_coverage, create_scheme_dic(amplicon_path, amps_by_id) def test_scheme_for_dimers(amplicon_scheme): @@ -281,13 +275,13 @@ def test_scheme_for_dimers(amplicon_scheme): primer_dimers = [] - for pool in amplicon_scheme: + for pool in [0, 1]: # test the primer dimers only within the respective pools tested_primers = [] - for amp in amplicon_scheme[pool]: - for primer in amplicon_scheme[pool][amp]: + for amp in amplicon_scheme[pool::2]: + for primer in ["LEFT", "RIGHT"]: # remember where the currrent primer was in the scheme - current_primer = (pool, amp, primer, amplicon_scheme[pool][amp][primer]) + current_primer = (pool, amp, primer, amp[primer]) current_seq = current_primer[3][0] for tested in tested_primers: tested_seq = tested[3][0] @@ -387,26 +381,23 @@ def find_single_amplicons(amplicons, all_primers, n): from all found amplicons. only for the SINGLE mode. """ # sort amplicons - sorted_amplicons = sorted(amplicons.items(), key=lambda x: x[1][5]) - to_retain = [sorted_amplicons[0]] - amplicon_range = list(range(sorted_amplicons[0][1][0], sorted_amplicons[0][1][1]+1)) - amplicon_set = set(amplicon_range) + sorted_amplicons = sorted(amplicons, key=lambda x: (x.get("offset_targets"), x["penalty"])) + to_retain = [] + retained_ranges = [] # find lowest non-overlapping for amp in sorted_amplicons: - amp_positions = list(range(amp[1][0], amp[1][1]+1)) - if not any(x in amp_positions for x in amplicon_set): - amplicon_set.update(amp_positions) + overlaps_retained = False + amp_range = range(amp["LEFT"][1], amp["RIGHT"][2]+1) + for r in retained_ranges: + if amp_range.start in r or amp_range.stop in r or r.start in amp_range or r.stop in amp_range: + overlaps_retained = True + break + if not overlaps_retained: + retained_ranges.append( + range(amp["LEFT"][1], amp["RIGHT"][2]+1) + ) to_retain.append(amp) - # build the final dictionary - scheme_dictionary = {0: {}} - counter = 1 - for amp in to_retain: - scheme_dictionary[0][amp[0]] = {} - scheme_dictionary[0][amp[0]][amp[1][2]] = all_primers["+"][amp[1][2]] - scheme_dictionary[0][amp[0]][amp[1][3]] = all_primers["-"][amp[1][3]] - if n != float("inf"): - if counter == n: + if len(to_retain) == n: break - counter += 1 - return scheme_dictionary + return to_retain From 8d8b1ca28630df4fd89900e3a32783563dc06e70 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Wed, 1 May 2024 18:09:24 +0200 Subject: [PATCH 02/17] Fix amplicon sorting This ensures that the best amplicons get the lowest ID numbers in the written scheme. --- varvamp/command.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/varvamp/command.py b/varvamp/command.py index 5dbda5f..bc6cc28 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -529,8 +529,15 @@ def main(sysargs=sys.argv[1:]): log_file, results_dir ) + # write files - amplicon_scheme.sort(key=lambda x: x["LEFT"][1]) + + if args.mode == "tiled": + # assign amplicon numbers from 5' to 3' along the genome + amplicon_scheme.sort(key=lambda x: x["LEFT"][1]) + else: + # make sure amplicons with no off-target products and with low penalties get the lowest numbers + amplicon_scheme.sort(key=lambda x: (x["off_targets"], x["penalty"])) reporting.write_all_primers(data_dir, all_primers) reporting.write_scheme_to_files( results_dir, @@ -560,8 +567,11 @@ def main(sysargs=sys.argv[1:]): right_primer_candidates, log_file ) + # write files - final_schemes.sort(key=lambda x: x["LEFT"][1]) + + # make sure amplicons with no off-target products and with low penalties get the lowest numbers + final_schemes.sort(key=lambda x: (x["off_targets"], x["penalty"])) reporting.write_regions_to_bed(probe_regions, data_dir, "probe") reporting.write_qpcr_to_files(results_dir, final_schemes, ambiguous_consensus, log_file) reporting.varvamp_plot( From cf92c964782445289f39b39b73f9135c88a0b865 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Wed, 1 May 2024 18:19:53 +0200 Subject: [PATCH 03/17] Document scheme structure as comment --- varvamp/command.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/varvamp/command.py b/varvamp/command.py index bc6cc28..1304441 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -502,6 +502,18 @@ def main(sysargs=sys.argv[1:]): reporting.write_fasta(data_dir, "majority_consensus", majority_consensus) reporting.write_fasta(results_dir, "ambiguous_consensus", ambiguous_consensus) + # Functions called from here on return lists of amplicons that are refined step-wise into final schemes. + # These lists that are passed between functions and later used for reporting consist of dictionary elemnts, + # which represent individual amplicons. A minimal amplicon dict could take the form: + # { + # "id": amplicon_name, + # "penalty": amplicon_cost, + # "length": amplicon_length, + # "LEFT": [left primer data], + # "RIGHT": [right primer data] + # } + # to which different functions may add additional information. + # SINGLE/TILED mode if args.mode == "tiled" or args.mode == "single": all_primers, amplicons, off_target_amplicons = single_and_tiled_shared_workflow( From 14352a3035687642a98b31212792a0d435cbf966 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Thu, 2 May 2024 13:56:04 +0200 Subject: [PATCH 04/17] Weigh amplicon graphs by (offset_targets, costs) --- varvamp/scripts/scheme.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/varvamp/scripts/scheme.py b/varvamp/scripts/scheme.py index 40c6ef1..98abd5b 100644 --- a/varvamp/scripts/scheme.py +++ b/varvamp/scripts/scheme.py @@ -25,7 +25,7 @@ def construct_graph(nodes, init_graph): for node, neighbors in graph.items(): for neighbor in neighbors.keys(): if graph[neighbor].get(node, False) is False: - graph[neighbor][node] = float("infinity") + graph[neighbor][node] = (float("infinity"), 0) return graph @@ -120,9 +120,17 @@ def create_amplicon_graph(amplicons, min_overlap): # following amplicon. if start <= next_amplicon["LEFT"][1] <= stop and next_amplicon["RIGHT"][2] > current_amplicon["RIGHT"][2] + next_amplicon["length"]/2: if amplicon_id not in amplicon_graph: - amplicon_graph[amplicon_id] = {next_amplicon["id"]: next_amplicon["penalty"]} + amplicon_graph[amplicon_id] = { + next_amplicon["id"]: ( + next_amplicon.get("off_targets", False), + next_amplicon["penalty"] + ) + } else: - amplicon_graph[amplicon_id][next_amplicon["id"]] = next_amplicon["penalty"] + amplicon_graph[amplicon_id][next_amplicon["id"]] = ( + next_amplicon.get("off_targets", False), + next_amplicon["penalty"] + ) # return a graph object return Graph(nodes, amplicon_graph) @@ -134,17 +142,21 @@ def dijkstra_algorithm(graph, start_node): """ previous_nodes = {} - shortest_path = {node: float('infinity') for node in graph.get_nodes()} - shortest_path[start_node] = 0 + shortest_path = {node: (float('infinity'), 0) for node in graph.get_nodes()} + shortest_path[start_node] = (0, 0) - nodes_to_test = [(0, start_node)] + nodes_to_test = [((0, 0), start_node)] while nodes_to_test: current_distance, current_node = heapq.heappop(nodes_to_test) if current_distance > shortest_path[current_node]: continue for neighbor in graph.get_neighbors(current_node): - distance = current_distance + graph.value(current_node, neighbor) + off_targets, base_penalty = graph.value(current_node, neighbor) + distance = ( + current_distance[0] + off_targets, + current_distance[1] + base_penalty + ) # Only consider this new path if it's a better path if not distance < shortest_path[neighbor]: continue @@ -217,8 +229,8 @@ def find_best_covering_scheme(amplicons, amplicon_graph): # ini best_coverage = 0 max_stop = max(amplicons, key=lambda x: x["RIGHT"][2])["RIGHT"][2] - lowest_costs = float('infinity') - # a dit for fast access to amplicons by their ID + lowest_costs = (float('infinity'),) + # a dict for fast access to amplicons by their ID amps_by_id = {amp["id"]: amp for amp in amplicons} for amplicon in amplicons: # if the currently best coverage + start nucleotide of the currently tested amplicon @@ -251,7 +263,8 @@ def find_best_covering_scheme(amplicons, amplicon_graph): if coverage > best_coverage: best_start_node = amplicon["id"] best_previous_nodes = previous_nodes - lowest_costs = amplicon["penalty"] + lowest_costs = ( + amplicon.get("off_targets", False), amplicon["penalty"]) best_coverage = coverage # no need to check more, the best covering amplicon scheme was found and # has the minimal costs compared to the schemes with the same coverage From 9ec31b031176cadbebd26dfc9741d676b6c82dbd Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 10:18:19 +0200 Subject: [PATCH 05/17] Fix sorting without blast results --- varvamp/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/varvamp/command.py b/varvamp/command.py index 1304441..ffd0b25 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -549,7 +549,7 @@ def main(sysargs=sys.argv[1:]): amplicon_scheme.sort(key=lambda x: x["LEFT"][1]) else: # make sure amplicons with no off-target products and with low penalties get the lowest numbers - amplicon_scheme.sort(key=lambda x: (x["off_targets"], x["penalty"])) + amplicon_scheme.sort(key=lambda x: (x.get("off_targets", False), x["penalty"])) reporting.write_all_primers(data_dir, all_primers) reporting.write_scheme_to_files( results_dir, From 7eebc82a90a389d777fa5ee861f67d3d6ba0a555 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 10:42:23 +0200 Subject: [PATCH 06/17] Fix same error for qPCR mode --- varvamp/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/varvamp/command.py b/varvamp/command.py index ffd0b25..5f11eb2 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -583,7 +583,7 @@ def main(sysargs=sys.argv[1:]): # write files # make sure amplicons with no off-target products and with low penalties get the lowest numbers - final_schemes.sort(key=lambda x: (x["off_targets"], x["penalty"])) + final_schemes.sort(key=lambda x: (x.get("off_targets", False), x["penalty"])) reporting.write_regions_to_bed(probe_regions, data_dir, "probe") reporting.write_qpcr_to_files(results_dir, final_schemes, ambiguous_consensus, log_file) reporting.varvamp_plot( From bba4ad19bc6957d81e874033fa0fc0768b3ceea6 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 11:02:27 +0200 Subject: [PATCH 07/17] Fix typo which messed up sorting of amplicons in single mode --- varvamp/scripts/scheme.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/varvamp/scripts/scheme.py b/varvamp/scripts/scheme.py index 98abd5b..36d7de5 100644 --- a/varvamp/scripts/scheme.py +++ b/varvamp/scripts/scheme.py @@ -21,12 +21,12 @@ def construct_graph(nodes, init_graph): graph[node] = {} graph.update(init_graph) - + print(graph) + return graph for node, neighbors in graph.items(): for neighbor in neighbors.keys(): if graph[neighbor].get(node, False) is False: graph[neighbor][node] = (float("infinity"), 0) - return graph @@ -277,7 +277,6 @@ def find_best_covering_scheme(amplicons, amplicon_graph): # if no previous nodes are found but the single amplicon results in the largest # coverage - return as the best scheme amplicon_path = [best_start_node] - return best_coverage, create_scheme_dic(amplicon_path, amps_by_id) @@ -394,7 +393,7 @@ def find_single_amplicons(amplicons, all_primers, n): from all found amplicons. only for the SINGLE mode. """ # sort amplicons - sorted_amplicons = sorted(amplicons, key=lambda x: (x.get("offset_targets"), x["penalty"])) + sorted_amplicons = sorted(amplicons, key=lambda x: (x.get("off_targets", False), x["penalty"])) to_retain = [] retained_ranges = [] # find lowest non-overlapping From e8d04cc82a403401c356ba09c07407ff70c7de5c Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 15:31:20 +0200 Subject: [PATCH 08/17] Optimize amplicon overlap calculations and fix off-by-one errors ... in all interval clculations --- varvamp/scripts/primers.py | 4 ++-- varvamp/scripts/qpcr.py | 29 +++++++++++++++-------------- varvamp/scripts/scheme.py | 12 +++++------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/varvamp/scripts/primers.py b/varvamp/scripts/primers.py index c541a99..f4ce59d 100644 --- a/varvamp/scripts/primers.py +++ b/varvamp/scripts/primers.py @@ -386,13 +386,13 @@ def find_best_primers(left_primer_candidates, right_primer_candidates): primer_candidates.sort(key=lambda x: (x[3], x[1])) # ini everything with the primer with the lowest penalty to_retain = [primer_candidates[0]] - primer_ranges = list(range(primer_candidates[0][1], primer_candidates[0][2]+1)) + primer_ranges = list(range(primer_candidates[0][1], primer_candidates[0][2])) primer_set = set(primer_ranges) for primer in primer_candidates: # get the thirds of the primer, only consider the middle thirds_len = int((primer[2] - primer[1])/3) - primer_positions = list(range(primer[1] + thirds_len, primer[2] - thirds_len + 1)) + primer_positions = list(range(primer[1] + thirds_len, primer[2] - thirds_len)) # check if none of the nucleotides of the next primer # are already covered by a better primer if not any(x in primer_positions for x in primer_set): diff --git a/varvamp/scripts/qpcr.py b/varvamp/scripts/qpcr.py index 858765f..f2d89e9 100644 --- a/varvamp/scripts/qpcr.py +++ b/varvamp/scripts/qpcr.py @@ -211,13 +211,13 @@ def assess_amplicons(left_subset, right_subset, qpcr_probes, probe, majority_con if "LEFT" in probe: if not qpcr_probes[probe][1] in range( left_primer[2] + config.QPROBE_DISTANCE[0], - left_primer[2] + config.QPROBE_DISTANCE[1] + 1 + left_primer[2] + config.QPROBE_DISTANCE[1] ): continue elif "RIGHT" in probe: if not right_primer[1] in range( qpcr_probes[probe][2] + config.QPROBE_DISTANCE[0], - qpcr_probes[probe][2] + config.QPROBE_DISTANCE[1] + 1 + qpcr_probes[probe][2] + config.QPROBE_DISTANCE[1] ): continue @@ -297,12 +297,9 @@ def process_single_amplicon_deltaG(amplicon, majority_consensus): Process a single amplicon to test its deltaG and apply filtering. This function will be called concurrently by multiple threads. """ - start = amplicon["LEFT"][1] - stop = amplicon["RIGHT"][2] - seq = majority_consensus[start:stop] + seq = majority_consensus[amplicon["LEFT"][1]:amplicon["RIGHT"][2]] seq = seq.replace("N", "") seq = seq.replace("n", "") - amp_positions = list(range(start, stop + 1)) # check if the amplicon overlaps with an amplicon that was previously # found and had a high enough deltaG min_temp = min((primers.calc_temp(amplicon["LEFT"][0]), @@ -310,7 +307,7 @@ def process_single_amplicon_deltaG(amplicon, majority_consensus): # calculate deltaG at the minimal primer temp amplicon["deltaG"] = seqfold.dg(seq, min_temp) - return amp_positions, amplicon + return amplicon def test_amplicon_deltaG_parallel(qpcr_schemes_candidates, majority_consensus, n_to_test, deltaG_cutoff, n_threads): @@ -320,7 +317,6 @@ def test_amplicon_deltaG_parallel(qpcr_schemes_candidates, majority_consensus, n for processing amplicons in parallel. """ final_amplicons = [] - amplicon_set = set() # Create a pool of processes to handle the concurrent processing with multiprocessing.Pool(processes=n_threads) as pool: @@ -334,15 +330,20 @@ def test_amplicon_deltaG_parallel(qpcr_schemes_candidates, majority_consensus, n # process amplicons concurrently results = pool.starmap(process_single_amplicon_deltaG, [(amp, majority_consensus) for amp in amplicons]) # Process the results - for amp_positions, amp in results: + retained_ranges = [] + for amp in results: # check if the amplicon overlaps with an amplicon that was previously # found and had a high enough deltaG - if any(x in amp_positions for x in amplicon_set): + if amp["deltaG"] <= deltaG_cutoff: continue - # and if this passes cutoff make a dict entry and do not allow further - # amplicons in that region (they will have a lower penalty) - if amp["deltaG"] > deltaG_cutoff: + amp_range = range(amp["LEFT"][1], amp["RIGHT"][2]) + overlaps_retained = False + for r in retained_ranges: + if amp_range.start < r.stop and r.start < amp_range.stop: + overlaps_retained = True + break + if not overlaps_retained: final_amplicons.append(amp) - amplicon_set.update(amp_positions) + retained_ranges.append(amp_range) return final_amplicons diff --git a/varvamp/scripts/scheme.py b/varvamp/scripts/scheme.py index 36d7de5..c07650d 100644 --- a/varvamp/scripts/scheme.py +++ b/varvamp/scripts/scheme.py @@ -319,7 +319,7 @@ def get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidat overlapping_primers_temp = [] thirds_len = int((primer[3][2] - primer[3][1]) / 3) # get the middle third of the primer (here are the previously excluded primers) - overlap_set = set(range(primer[3][1] + thirds_len, primer[3][2] - thirds_len + 1)) + overlap_set = set(range(primer[3][1] + thirds_len, primer[3][2] - thirds_len)) # check in which list to look for them if "RIGHT" in primer[2]: primers_to_test = right_primer_candidates @@ -327,7 +327,7 @@ def get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidat primers_to_test = left_primer_candidates # and check this list for all primers that overlap for potential_new in primers_to_test: - primer_positions = list(range(potential_new[1], potential_new[2]+1)) + primer_positions = list(range(potential_new[1], potential_new[2])) if not any(x in primer_positions for x in overlap_set): continue overlapping_primers_temp.append((primer[0], primer[1], primer[2], potential_new)) @@ -399,15 +399,13 @@ def find_single_amplicons(amplicons, all_primers, n): # find lowest non-overlapping for amp in sorted_amplicons: overlaps_retained = False - amp_range = range(amp["LEFT"][1], amp["RIGHT"][2]+1) + amp_range = range(amp["LEFT"][1], amp["RIGHT"][2]) for r in retained_ranges: - if amp_range.start in r or amp_range.stop in r or r.start in amp_range or r.stop in amp_range: + if amp_range.start < r.stop and r.start < amp_range.stop: overlaps_retained = True break if not overlaps_retained: - retained_ranges.append( - range(amp["LEFT"][1], amp["RIGHT"][2]+1) - ) + retained_ranges.append(amp_range) to_retain.append(amp) if len(to_retain) == n: break From 677ddb5d1b210ae15602feaeddda8813a0fe771f Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 16:25:30 +0200 Subject: [PATCH 09/17] Remove BLAST_PENALTY and clean up blast-related code --- varvamp/command.py | 14 +++++------ varvamp/scripts/blast.py | 42 ++++++++++++------------------- varvamp/scripts/default_config.py | 3 +-- varvamp/scripts/logging.py | 7 ------ 4 files changed, 23 insertions(+), 43 deletions(-) diff --git a/varvamp/command.py b/varvamp/command.py index 5f11eb2..14a1eae 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -314,9 +314,9 @@ def single_and_tiled_shared_workflow(args, left_primer_candidates, right_primer_ if args.database is not None: # create blast query - query_path = blast.create_BLAST_query(all_primers, amplicons, data_dir) + query_path = blast.create_BLAST_query(amplicons, data_dir) # perform primer blast - amplicons, off_target_amplicons = blast.primer_blast( + amplicons = blast.primer_blast( data_dir, args.database, query_path, @@ -326,10 +326,8 @@ def single_and_tiled_shared_workflow(args, left_primer_candidates, right_primer_ log_file, mode="single_tiled" ) - else: - off_target_amplicons = [] - return all_primers, amplicons, off_target_amplicons + return all_primers, amplicons def single_workflow(args, amplicons, all_primers, log_file): @@ -449,9 +447,9 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori # run blast if db is given if args.database is not None: # create blast query - query_path = blast.create_BLAST_query_qpcr(qpcr_scheme_candidates, data_dir) + query_path = blast.create_BLAST_query(qpcr_scheme_candidates, data_dir, mode="qpcr") # perform primer blast - qpcr_scheme_candidates, off_target_amplicons = blast.primer_blast( + qpcr_scheme_candidates = blast.primer_blast( data_dir, args.database, query_path, @@ -516,7 +514,7 @@ def main(sysargs=sys.argv[1:]): # SINGLE/TILED mode if args.mode == "tiled" or args.mode == "single": - all_primers, amplicons, off_target_amplicons = single_and_tiled_shared_workflow( + all_primers, amplicons = single_and_tiled_shared_workflow( args, left_primer_candidates, right_primer_candidates, diff --git a/varvamp/scripts/blast.py b/varvamp/scripts/blast.py index 08a54b4..d9c0bf7 100644 --- a/varvamp/scripts/blast.py +++ b/varvamp/scripts/blast.py @@ -29,33 +29,20 @@ def check_BLAST_installation(log_file): logging.raise_error("BLASTN is not installed", log_file, exit=True) -def create_BLAST_query(all_primers, amplicons, data_dir): +def create_BLAST_query(amplicons, data_dir, mode="single_tiled"): """ - create a query for the BLAST search (tiled, single mode) + create a query for the BLAST search """ - already_written = set() - query_path = os.path.join(data_dir, "BLAST_query.fasta") - with open(query_path, "w") as query: - for amp in amplicons: - for primer_type in ["LEFT", "RIGHT"]: - name = f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}" - if name not in already_written: - print(f">{name}\n{amp[primer_type][0]}", file=query) - already_written.add(name) - return query_path - - -def create_BLAST_query_qpcr(qpcr_scheme_candidates, data_dir): - """ - create a query for the BLAST search (qpcr mode) - """ + if mode == "single_tiled": + primer_types = ["LEFT", "RIGHT"] + elif mode == "qpcr": + primer_types = ["PROBE", "LEFT", "RIGHT"] already_written = set() - query_path = os.path.join(data_dir, "BLAST_query.fasta") with open(query_path, "w") as query: - for amp in qpcr_scheme_candidates: - for primer_type in ["PROBE", "LEFT", "RIGHT"]: + for amp in amplicons: + for primer_type in primer_types: name = f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}" if name not in already_written: print(f">{name}\n{amp[primer_type][0]}", file=query) @@ -197,8 +184,8 @@ def predict_non_specific_amplicons(amplicons, blast_df, max_length, mode, n_thre [(amp, blast_df, max_length, mode) for amp in amplicons] ) if result is not None ] - off_targets = {amp["id"] for amp in annotated_amps if amp["off_targets"]} - return off_targets, annotated_amps + n_off_targets = sum(amp["off_targets"] for amp in annotated_amps) + return n_off_targets, annotated_amps def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log_file, mode): @@ -230,14 +217,17 @@ def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log blast_df = parse_and_filter_BLAST_output(blast_out) print("Predicting non-specific amplicons...") - off_target_amplicons, amplicons = predict_non_specific_amplicons( + n_off_targets, amplicons = predict_non_specific_amplicons( amplicons, blast_df, max_length, mode, n_threads ) - success_text = f"varVAMP successfully predicted non-specific amplicons:\n\t> {len(off_target_amplicons)}/{len(amplicons)} amplicons could produce amplicons with the blast db.\n\t> raised their amplicon penalty by {config.BLAST_PENALTY}" + if n_off_targets > 0: + success_text = f"varVAMP predicted non-specific amplicons:\n\t> {n_off_targets}/{len(amplicons)} amplicons could produce amplicons with the blast db.\n\t> will attempt to avoid them in the final list of amplicons" + else: + success_text = f"NO off-target amplicons found with the blast db and a total of {len(amplicons)} amplicons" print(success_text) with open(log_file, 'a') as f: print( @@ -246,5 +236,5 @@ def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log ) print("\n#### off-target search finished ####\n") - return amplicons, off_target_amplicons + return amplicons diff --git a/varvamp/scripts/default_config.py b/varvamp/scripts/default_config.py index b4c5fff..6c26967 100644 --- a/varvamp/scripts/default_config.py +++ b/varvamp/scripts/default_config.py @@ -4,7 +4,7 @@ # List of all known parameters. DO NOT CHANGE! __all__ = [ - 'BLAST_MAX_DIFF', 'BLAST_PENALTY', 'BLAST_SETTINGS', 'BLAST_SIZE_MULTI', + 'BLAST_MAX_DIFF', 'BLAST_SETTINGS', 'BLAST_SIZE_MULTI', 'END_OVERLAP', 'PCR_DNA_CONC', 'PCR_DNTP_CONC', 'PCR_DV_CONC', 'PCR_MV_CONC', 'PRIMER_3_PENALTY', 'PRIMER_GC_END', 'PRIMER_GC_PENALTY', @@ -74,7 +74,6 @@ } BLAST_MAX_DIFF = 0.5 # min percent match between primer and BLAST hit (coverage and/or mismatches) BLAST_SIZE_MULTI = 2 # multiplier for the max_amp size of off targets (in relation to max amp size) -BLAST_PENALTY = 50 # amplicon penalty increase -> considered only if no other possibilities # nucleotide definitions, do NOT change NUCS = set("atcg") diff --git a/varvamp/scripts/logging.py b/varvamp/scripts/logging.py index a789746..bdfbe03 100644 --- a/varvamp/scripts/logging.py +++ b/varvamp/scripts/logging.py @@ -291,7 +291,6 @@ def confirm_config(args, log_file): ( "BLAST_MAX_DIFF", "BLAST_SIZE_MULTI", - "BLAST_PENALTY" ) ] @@ -384,7 +383,6 @@ def confirm_config(args, log_file): ("qpcr deletion size still considered for deltaG calculation", config.QAMPLICON_DEL_CUTOFF), ("maximum difference between primer and blast db", config.BLAST_MAX_DIFF), ("multiplier of the maximum length for non-specific amplicons", config.BLAST_SIZE_MULTI), - ("blast penalty for off targets", config.BLAST_PENALTY) ] for var_type, var in non_negative_var: if var < 0: @@ -468,11 +466,6 @@ def confirm_config(args, log_file): log_file, exit=True ) - if config.BLAST_PENALTY < 10: - raise_error( - "giving a too small penalty could result in the selection of off-target producing amplicons in the final scheme.", - log_file, - ) # confirm proper BLAST settings in dictionary if not isinstance(config.BLAST_SETTINGS, dict): raise_error( From a7751078d21ef8546e0a20c0f941cc9f2ef20920 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 16:27:07 +0200 Subject: [PATCH 10/17] Remove debug print and early return --- varvamp/scripts/scheme.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/varvamp/scripts/scheme.py b/varvamp/scripts/scheme.py index c07650d..59b7d81 100644 --- a/varvamp/scripts/scheme.py +++ b/varvamp/scripts/scheme.py @@ -21,8 +21,6 @@ def construct_graph(nodes, init_graph): graph[node] = {} graph.update(init_graph) - print(graph) - return graph for node, neighbors in graph.items(): for neighbor in neighbors.keys(): if graph[neighbor].get(node, False) is False: From 486ed094d1ceccb4bd3f1f297075423812984144 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 16:42:40 +0200 Subject: [PATCH 11/17] Sort by position within qpcr primer sets and clean up --- varvamp/scripts/reporting.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/varvamp/scripts/reporting.py b/varvamp/scripts/reporting.py index 7034ee3..12a90a5 100644 --- a/varvamp/scripts/reporting.py +++ b/varvamp/scripts/reporting.py @@ -177,9 +177,7 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, log_file): file=tsv ) # write tsv2 - for oligo_type in ["PROBE", "LEFT", "RIGHT"]: - if oligo_type == "penalty" or oligo_type == "deltaG": - continue + for oligo_type in ["LEFT", "PROBE", "RIGHT"]: seq = ambiguous_consensus[amp[oligo_type][1]:amp[oligo_type][2]] if oligo_type == "RIGHT" or (oligo_type == "PROBE" and amp["PROBE"][5] == "-"): seq = primers.rev_complement(seq) From 2de1a4c2b2714f4c7f257eb40f29741fcda865fd Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 16:51:09 +0200 Subject: [PATCH 12/17] Write amplicons as proper bed6 --- varvamp/scripts/reporting.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/varvamp/scripts/reporting.py b/varvamp/scripts/reporting.py index 12a90a5..84b3cb3 100644 --- a/varvamp/scripts/reporting.py +++ b/varvamp/scripts/reporting.py @@ -149,6 +149,7 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, log_file): amp["RIGHT"][2], amp_name, round(amp["penalty"], 1), + "+", sep="\t", file=bed ) @@ -312,6 +313,7 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode, log_ print( "ambiguous_consensus", *record, + "+", sep="\t", file=bed ) From 81fcce5fbcf0cd69f11504bc816d391bef86d145 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 16:59:23 +0200 Subject: [PATCH 13/17] Indicate minor version --- varvamp/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/varvamp/__init__.py b/varvamp/__init__.py index 8a9f97f..686b47b 100644 --- a/varvamp/__init__.py +++ b/varvamp/__init__.py @@ -1,3 +1,3 @@ """Tool to design amplicons for highly variable virusgenomes""" _program = "varvamp" -__version__ = "1.1.3" +__version__ = "1.2.0" From 800781e62ab80065a0b66de8e5d710cf66d5bb61 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Fri, 3 May 2024 17:03:21 +0200 Subject: [PATCH 14/17] Better use . instead of + for amplicon strand Amplicons do not really sit on any single strand. --- varvamp/scripts/reporting.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/varvamp/scripts/reporting.py b/varvamp/scripts/reporting.py index 84b3cb3..3016ea3 100644 --- a/varvamp/scripts/reporting.py +++ b/varvamp/scripts/reporting.py @@ -149,7 +149,7 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, log_file): amp["RIGHT"][2], amp_name, round(amp["penalty"], 1), - "+", + ".", sep="\t", file=bed ) @@ -313,7 +313,7 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode, log_ print( "ambiguous_consensus", *record, - "+", + ".", sep="\t", file=bed ) From ec1891f1eb6c5513851d2bec640fe7e0422dc1ec Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Mon, 6 May 2024 16:54:07 +0200 Subject: [PATCH 15/17] Include off-target information in primer tsv files --- varvamp/scripts/reporting.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/varvamp/scripts/reporting.py b/varvamp/scripts/reporting.py index 3016ea3..3719ee3 100644 --- a/varvamp/scripts/reporting.py +++ b/varvamp/scripts/reporting.py @@ -133,11 +133,11 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, log_file): with open(tsv_file, "w") as tsv, open(tsv_file_2, "w") as tsv2, open(amplicon_bed_file, "w") as bed, open(primer_fasta_file, "w") as fasta: print( - "qpcr_scheme\toligo_type\tstart\tstop\tseq\tsize\tgc_best\ttemp_best\tmean_gc\tmean_temp\tpenalty", + "qpcr_scheme\toligo_type\tstart\tstop\tseq\tsize\tgc_best\ttemp_best\tmean_gc\tmean_temp\tpenalty\toff_target_amplicons", file=tsv2 ) print( - "qpcr_scheme\toff_target\tpenalty\tdeltaG\tlength\tstart\tstop\tseq", + "qpcr_scheme\toff_target_amplicons\tpenalty\tdeltaG\tlength\tstart\tstop\tseq", file=tsv ) for n, amp in enumerate(final_schemes): @@ -163,7 +163,7 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, log_file): else: amplicon_has_off_target = "No" else: - amplicon_has_off_target = "NA" + amplicon_has_off_target = "n.d." amplicon_seq = ambiguous_consensus[amplicon_start:amplicon_stop] print( amp_name, @@ -201,6 +201,7 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, log_file): gc, temp, round(amp[oligo_type][3], 1), + amplicon_has_off_target, sep="\t", file=tsv2 ) @@ -228,7 +229,7 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode, log_ with open(tsv_file, "w") as tsv, open(amplicon_bed_file, "w") as bed, open(tabular_file, "w") as tabular: # write header for primer tsv print( - "amlicon_name\tamplicon_length\tprimer_name\talternate_primer_name\tpool\tstart\tstop\tseq\tsize\tgc_best\ttemp_best\tmean_gc\tmean_temp\tpenalty", + "amlicon_name\tamplicon_length\tprimer_name\talternate_primer_name\tpool\tstart\tstop\tseq\tsize\tgc_best\ttemp_best\tmean_gc\tmean_temp\tpenalty\toff_target_amplicons", file=tsv ) amplicon_bed_records = [] @@ -247,8 +248,14 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode, log_ new_name = f"AMPLICON_{amplicon_index}" # get left and right primers and their names amp_length = amp["RIGHT"][2] - amp["LEFT"][1] - if amp.get("off_targets"): - write_BLAST_warning(new_name, log_file) + if "off_targets" in amp: + if amp["off_targets"]: + amplicon_has_off_target = "Yes" + write_BLAST_warning(amp_name, log_file) + else: + amplicon_has_off_target = "No" + else: + amplicon_has_off_target = "n.d." # write amplicon bed if mode == "tiled": bed_score = pool @@ -298,6 +305,7 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, mode, log_ gc, temp, round(primer[3], 1), + amplicon_has_off_target, sep="\t", file=tsv ) From 7bf23ca6782c0953f230dcfe733274db5539e3ed Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Tue, 7 May 2024 10:20:52 +0200 Subject: [PATCH 16/17] Fix primer dimer handling broken by previous changes --- varvamp/command.py | 1 + varvamp/scripts/reporting.py | 10 +++---- varvamp/scripts/scheme.py | 57 +++++++++++++++++++++--------------- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/varvamp/command.py b/varvamp/command.py index 14a1eae..7abab93 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -374,6 +374,7 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida reporting.write_dimers(results_dir, dimers_not_solved) # evaluate coverage + # ATTENTION: Genome coverage of the scheme might still change slightly through resolution of primer dimers, but this potential, minor inaccuracy is currently accepted. percent_coverage = round(coverage/len(ambiguous_consensus)*100, 2) logging.varvamp_progress( log_file, diff --git a/varvamp/scripts/reporting.py b/varvamp/scripts/reporting.py index 3719ee3..0f7398a 100644 --- a/varvamp/scripts/reporting.py +++ b/varvamp/scripts/reporting.py @@ -350,12 +350,12 @@ def write_dimers(path, primer_dimers): "pool\tprimer_name_1\tprimer_name_2\tdimer melting temp", file=tsv ) - for dimers in primer_dimers: + for pool, primer1, primer2 in primer_dimers: print( - dimers[0][0], - dimers[0][2], - dimers[1][2], - round(primers.calc_dimer(dimers[0][3][0], dimers[1][3][0]).tm, 1), + pool, + primer1[1], + primer2[1], + round(primers.calc_dimer(primer1[2][0], primer2[2][0]).tm, 1), sep="\t", file=tsv ) diff --git a/varvamp/scripts/scheme.py b/varvamp/scripts/scheme.py index 59b7d81..2c50a1c 100644 --- a/varvamp/scripts/scheme.py +++ b/varvamp/scripts/scheme.py @@ -59,7 +59,6 @@ def value(self, node1, node2): """ return self.graph[node1][node2] - def find_amplicons(all_primers, opt_len, max_len): """ finds all possible amplicons, creates a dictionary @@ -205,9 +204,9 @@ def get_min_path(previous_nodes, start_node, target_node): return path[::-1] -def create_scheme_dic(amplicon_path, amplicons_by_id): +def create_scheme(amplicon_path, amplicons_by_id): """ - creates the final scheme dictionary + creates the final tiled-amplicon scheme """ amplicon_scheme = [] @@ -275,7 +274,7 @@ def find_best_covering_scheme(amplicons, amplicon_graph): # if no previous nodes are found but the single amplicon results in the largest # coverage - return as the best scheme amplicon_path = [best_start_node] - return best_coverage, create_scheme_dic(amplicon_path, amps_by_id) + return best_coverage, create_scheme(amplicon_path, amps_by_id) def test_scheme_for_dimers(amplicon_scheme): @@ -284,17 +283,20 @@ def test_scheme_for_dimers(amplicon_scheme): """ primer_dimers = [] - - for pool in [0, 1]: + pools = {amp["pool"] for amp in amplicon_scheme} + for pool in pools: # test the primer dimers only within the respective pools tested_primers = [] - for amp in amplicon_scheme[pool::2]: + for amp_index, amp in enumerate(amplicon_scheme): + if amp["pool"] != pool: + continue for primer in ["LEFT", "RIGHT"]: # remember where the currrent primer was in the scheme - current_primer = (pool, amp, primer, amp[primer]) - current_seq = current_primer[3][0] + # store the amplicon's index in the scheme, the current primer's original name and its details + current_primer = (amp_index, amp[primer][-1], amp[primer][:-1]) + current_seq = current_primer[2][0] for tested in tested_primers: - tested_seq = tested[3][0] + tested_seq = tested[2][0] if primers.calc_dimer(current_seq, tested_seq).tm <= config.PRIMER_MAX_DIMER_TMP: continue primer_dimers.append((current_primer, tested)) @@ -313,13 +315,13 @@ def get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidat overlapping_primers = [] # test each primer in dimer - for primer in dimer: + for amp_index, primer_name, primer in dimer: overlapping_primers_temp = [] - thirds_len = int((primer[3][2] - primer[3][1]) / 3) + thirds_len = int((primer[2] - primer[1]) / 3) # get the middle third of the primer (here are the previously excluded primers) - overlap_set = set(range(primer[3][1] + thirds_len, primer[3][2] - thirds_len)) + overlap_set = set(range(primer[1] + thirds_len, primer[2] - thirds_len)) # check in which list to look for them - if "RIGHT" in primer[2]: + if "RIGHT" in primer_name: primers_to_test = right_primer_candidates else: primers_to_test = left_primer_candidates @@ -328,7 +330,7 @@ def get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidat primer_positions = list(range(potential_new[1], potential_new[2])) if not any(x in primer_positions for x in overlap_set): continue - overlapping_primers_temp.append((primer[0], primer[1], primer[2], potential_new)) + overlapping_primers_temp.append((amp_index, primer_name, potential_new)) overlapping_primers.append(overlapping_primers_temp) @@ -343,7 +345,7 @@ def test_overlaps_for_dimers(overlapping_primers): for second_overlap in overlapping_primers[1]: # return the first match. primers are sorted by penalty. # first pair that makes it has the lowest penalty - if primers.calc_dimer(first_overlap[3][0], second_overlap[3][0]).tm <= config.PRIMER_MAX_DIMER_TMP: + if primers.calc_dimer(first_overlap[2][0], second_overlap[2][0]).tm <= config.PRIMER_MAX_DIMER_TMP: return [first_overlap, second_overlap] @@ -364,23 +366,30 @@ def check_and_solve_heterodimers(amplicon_scheme, left_primer_candidates, right_ return [] for dimer in primer_dimers: - # get overlapping primers that have not been considere + # get overlapping primers that have not been considered overlapping_primers = get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidates) # test all possible primers against each other for dimers new_primers = test_overlaps_for_dimers(overlapping_primers) # now change these primers in the scheme if new_primers: - for new in new_primers: + for amp_index, primer_name, primer in new_primers: # overwrite in final scheme - amplicon_scheme[new[0]][new[1]][new[2]] = new[3] - # and in all primers - if "LEFT" in new[2]: + # ATTENTION: doesn't update the amplicon penalty currently + # This is ok only because that value isn't used after and doesn't get reported anywhere. + if "LEFT" in primer_name: strand = "+" + amplicon_scheme[amp_index]["LEFT"] = primer + [primer_name] else: strand = "-" - all_primers[strand][new[2]] = new[3] - - primer_dimers = test_scheme_for_dimers(amplicon_scheme) + amplicon_scheme[amp_index]["RIGHT"] = primer + [primer_name] + amplicon_scheme[amp_index]["length"] = amplicon_scheme[amp_index]["RIGHT"][2] - amplicon_scheme[amp_index]["LEFT"][1] + # and in all primers + all_primers[strand][primer_name] = primer + # get remaining dimers in the revised scheme and add pool identifier for reporting + primer_dimers = [ + (amplicon_scheme[primer1[0]]["pool"], primer1, primer2) + for primer1, primer2 in test_scheme_for_dimers(amplicon_scheme) + ] return primer_dimers From d4fc973de8768bda66d5d80280e384ef87561663 Mon Sep 17 00:00:00 2001 From: Wolfgang Maier Date: Tue, 7 May 2024 11:29:16 +0200 Subject: [PATCH 17/17] Remove unused function param / arg pair --- varvamp/command.py | 2 +- varvamp/scripts/scheme.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/varvamp/command.py b/varvamp/command.py index 7abab93..bd9d173 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -335,7 +335,7 @@ def single_workflow(args, amplicons, all_primers, log_file): workflow part specific for single mode """ - amplicon_scheme = scheme.find_single_amplicons(amplicons, all_primers, args.report_n) + amplicon_scheme = scheme.find_single_amplicons(amplicons, args.report_n) logging.varvamp_progress( log_file, progress=0.9, diff --git a/varvamp/scripts/scheme.py b/varvamp/scripts/scheme.py index 2c50a1c..ba0006e 100644 --- a/varvamp/scripts/scheme.py +++ b/varvamp/scripts/scheme.py @@ -394,7 +394,7 @@ def check_and_solve_heterodimers(amplicon_scheme, left_primer_candidates, right_ return primer_dimers -def find_single_amplicons(amplicons, all_primers, n): +def find_single_amplicons(amplicons, n): """ find non-overlapping amplicons with low penalties from all found amplicons. only for the SINGLE mode.