Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to BLAST functionality and refactoring #39

Merged
merged 17 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion varvamp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Tool to design amplicons for highly variable virusgenomes"""
_program = "varvamp"
__version__ = "1.1.3"
__version__ = "1.2.0"
58 changes: 38 additions & 20 deletions varvamp/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ def single_and_tiled_shared_workflow(args, left_primer_candidates, right_primer_

if args.database is not None:
# create blast query
query_path = blast.create_BLAST_query(all_primers, amplicons, data_dir)
query_path = blast.create_BLAST_query(amplicons, data_dir)
# perform primer blast
amplicons, off_target_amplicons = blast.primer_blast(
amplicons = blast.primer_blast(
data_dir,
args.database,
query_path,
Expand All @@ -326,23 +326,21 @@ def single_and_tiled_shared_workflow(args, left_primer_candidates, right_primer_
log_file,
mode="single_tiled"
)
else:
off_target_amplicons = []

return all_primers, amplicons, off_target_amplicons
return all_primers, amplicons


def single_workflow(args, amplicons, all_primers, log_file):
"""
workflow part specific for single mode
"""

amplicon_scheme = scheme.find_single_amplicons(amplicons, all_primers, args.report_n)
amplicon_scheme = scheme.find_single_amplicons(amplicons, args.report_n)
logging.varvamp_progress(
log_file,
progress=0.9,
job="Finding amplicons with low penalties.",
progress_text=f"{len(amplicon_scheme[0])} amplicons."
progress_text=f"{len(amplicon_scheme)} amplicons."
)

return amplicon_scheme
Expand All @@ -359,8 +357,7 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida
# search for amplicon scheme
coverage, amplicon_scheme = scheme.find_best_covering_scheme(
amplicons,
amplicon_graph,
all_primers
amplicon_graph
)

# check for dimers
Expand All @@ -377,12 +374,13 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida
reporting.write_dimers(results_dir, dimers_not_solved)

# evaluate coverage
# ATTENTION: Genome coverage of the scheme might still change slightly through resolution of primer dimers, but this potential, minor inaccuracy is currently accepted.
percent_coverage = round(coverage/len(ambiguous_consensus)*100, 2)
logging.varvamp_progress(
log_file,
progress=0.9,
job="Creating amplicon scheme.",
progress_text=f"{percent_coverage} % total coverage with {len(amplicon_scheme[0]) + len(amplicon_scheme[1])} amplicons"
progress_text=f"{percent_coverage} % total coverage with {len(amplicon_scheme)} amplicons"
)
if percent_coverage < 70:
logging.raise_error(
Expand Down Expand Up @@ -450,9 +448,9 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori
# run blast if db is given
if args.database is not None:
# create blast query
query_path = blast.create_BLAST_query_qpcr(qpcr_scheme_candidates, data_dir)
query_path = blast.create_BLAST_query(qpcr_scheme_candidates, data_dir, mode="qpcr")
# perform primer blast
amplicons, off_target_amplicons = blast.primer_blast(
qpcr_scheme_candidates = blast.primer_blast(
data_dir,
args.database,
query_path,
Expand All @@ -470,9 +468,6 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori
log_file,
exit=True
)
# report potential blast warnings
if args.database is not None:
blast.write_BLAST_warning(off_target_amplicons, final_schemes, log_file)
logging.varvamp_progress(
log_file,
progress=0.9,
Expand Down Expand Up @@ -506,9 +501,21 @@ def main(sysargs=sys.argv[1:]):
reporting.write_fasta(data_dir, "majority_consensus", majority_consensus)
reporting.write_fasta(results_dir, "ambiguous_consensus", ambiguous_consensus)

# Functions called from here on return lists of amplicons that are refined step-wise into final schemes.
# These lists that are passed between functions and later used for reporting consist of dictionary elemnts,
# which represent individual amplicons. A minimal amplicon dict could take the form:
# {
# "id": amplicon_name,
# "penalty": amplicon_cost,
# "length": amplicon_length,
# "LEFT": [left primer data],
# "RIGHT": [right primer data]
# }
# to which different functions may add additional information.

# SINGLE/TILED mode
if args.mode == "tiled" or args.mode == "single":
all_primers, amplicons, off_target_amplicons = single_and_tiled_shared_workflow(
all_primers, amplicons = single_and_tiled_shared_workflow(
args,
left_primer_candidates,
right_primer_candidates,
Expand All @@ -533,15 +540,22 @@ def main(sysargs=sys.argv[1:]):
log_file,
results_dir
)
if args.database is not None:
blast.write_BLAST_warning(off_target_amplicons, amplicon_scheme, log_file)

# write files

if args.mode == "tiled":
# assign amplicon numbers from 5' to 3' along the genome
amplicon_scheme.sort(key=lambda x: x["LEFT"][1])
else:
# make sure amplicons with no off-target products and with low penalties get the lowest numbers
amplicon_scheme.sort(key=lambda x: (x.get("off_targets", False), x["penalty"]))
reporting.write_all_primers(data_dir, all_primers)
reporting.write_scheme_to_files(
results_dir,
amplicon_scheme,
ambiguous_consensus,
args.mode
args.mode,
log_file
)
reporting.varvamp_plot(
results_dir,
Expand All @@ -564,9 +578,13 @@ def main(sysargs=sys.argv[1:]):
right_primer_candidates,
log_file
)

# write files

# make sure amplicons with no off-target products and with low penalties get the lowest numbers
final_schemes.sort(key=lambda x: (x.get("off_targets", False), x["penalty"]))
reporting.write_regions_to_bed(probe_regions, data_dir, "probe")
reporting.write_qpcr_to_files(results_dir, final_schemes, ambiguous_consensus)
reporting.write_qpcr_to_files(results_dir, final_schemes, ambiguous_consensus, log_file)
reporting.varvamp_plot(
results_dir,
alignment_cleaned,
Expand Down
102 changes: 36 additions & 66 deletions varvamp/scripts/blast.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,24 @@ def check_BLAST_installation(log_file):
logging.raise_error("BLASTN is not installed", log_file, exit=True)


def create_BLAST_query(all_primers, amplicons, data_dir):
def create_BLAST_query(amplicons, data_dir, mode="single_tiled"):
"""
create a query for the BLAST search (tiled, single mode)
create a query for the BLAST search
"""
already_written = []

query_path = os.path.join(data_dir, "BLAST_query.fasta")
with open(query_path, "w") as query:
for amp in amplicons:
fw_primer, rv_primer = amplicons[amp][2], amplicons[amp][3]
if fw_primer not in already_written:
print(f">{fw_primer}\n{all_primers['+'][fw_primer][0]}", file=query)
already_written.append(fw_primer)
if rv_primer not in already_written:
print(f">{rv_primer}\n{all_primers['-'][rv_primer][0]}", file=query)
already_written.append(rv_primer)

return query_path


def create_BLAST_query_qpcr(qpcr_scheme_candidates, data_dir):
"""
create a query for the BLAST search (qpcr mode)
"""
already_written = []
if mode == "single_tiled":
primer_types = ["LEFT", "RIGHT"]
elif mode == "qpcr":
primer_types = ["PROBE", "LEFT", "RIGHT"]
already_written = set()

query_path = os.path.join(data_dir, "BLAST_query.fasta")
with open(query_path, "w") as query:
for amp in qpcr_scheme_candidates:
for primer_type in ["PROBE", "LEFT", "RIGHT"]:
name = f"{primer_type}_{qpcr_scheme_candidates[amp][primer_type][1]}_{qpcr_scheme_candidates[amp][primer_type][2]}"
if name in already_written:
continue
print(f">{name}\n{qpcr_scheme_candidates[amp][primer_type][0]}", file=query)
already_written.append(name)
for amp in amplicons:
for primer_type in primer_types:
name = f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}"
if name not in already_written:
print(f">{name}\n{amp[primer_type][0]}", file=query)
already_written.add(name)
return query_path


Expand Down Expand Up @@ -168,44 +151,41 @@ def predict_non_specific_amplicons_worker(amp, blast_df, max_length, mode):
"""
Worker function to predict unspecific targets for a single amplicon.
"""
name, data = amp
# get correct primers
if mode == "single_tiled":
primers = [data[2], data[3]]
primer_types = ["LEFT", "RIGHT"]
elif mode == "qpcr":
primers = []
for primer_type in ["PROBE", "LEFT", "RIGHT"]:
primers.append(f"{primer_type}_{data[primer_type][1]}_{data[primer_type][2]}")
primer_types = ["PROBE", "LEFT", "RIGHT"]
primers = []
for primer_type in primer_types:
primers.append(f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}")
# subset df for primers
df_amp_primers = blast_df[blast_df["query"].isin(primers)]
# sort by reference and ref start
df_amp_primers_sorted = df_amp_primers.sort_values(["ref", "ref_start"])
# check for off-targets for specific primers
if check_off_targets(df_amp_primers_sorted, max_length, primers):
return name
amp["off_targets"] = True
else:
amp["off_targets"] = False
return amp


def predict_non_specific_amplicons(amplicons, blast_df, max_length, mode, n_threads):
"""
Main function to predict unspecific targets within a size range and give
these primers a high penalty. Uses multiprocessing for parallelization.
"""
off_targets = []
# process amplicons concurrently
with multiprocessing.Pool(processes=n_threads) as pool:
amp_items = amplicons.items()
results = pool.starmap(predict_non_specific_amplicons_worker, [(amp, blast_df, max_length, mode) for amp in amp_items])
# check results
for off_target in results:
if off_target is None:
continue
off_targets.append(off_target)
if mode == "single_tiled":
amplicons[off_target][5] = amplicons[off_target][5] + config.BLAST_PENALTY
elif mode == "qpcr":
amplicons[off_target]["penalty"] = amplicons[off_target]["penalty"] + config.BLAST_PENALTY

return off_targets, amplicons
annotated_amps = [
result for result in pool.starmap(
predict_non_specific_amplicons_worker,
[(amp, blast_df, max_length, mode) for amp in amplicons]
) if result is not None
]
n_off_targets = sum(amp["off_targets"] for amp in annotated_amps)
return n_off_targets, annotated_amps


def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log_file, mode):
Expand Down Expand Up @@ -237,14 +217,17 @@ def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log

blast_df = parse_and_filter_BLAST_output(blast_out)
print("Predicting non-specific amplicons...")
off_target_amplicons, amplicons = predict_non_specific_amplicons(
n_off_targets, amplicons = predict_non_specific_amplicons(
amplicons,
blast_df,
max_length,
mode,
n_threads
)
success_text = f"varVAMP successfully predicted non-specific amplicons:\n\t> {len(off_target_amplicons)}/{len(amplicons)} amplicons could produce amplicons with the blast db.\n\t> raised their amplicon penalty by {config.BLAST_PENALTY}"
if n_off_targets > 0:
success_text = f"varVAMP predicted non-specific amplicons:\n\t> {n_off_targets}/{len(amplicons)} amplicons could produce amplicons with the blast db.\n\t> will attempt to avoid them in the final list of amplicons"
else:
success_text = f"NO off-target amplicons found with the blast db and a total of {len(amplicons)} amplicons"
print(success_text)
with open(log_file, 'a') as f:
print(
Expand All @@ -253,18 +236,5 @@ def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log
)
print("\n#### off-target search finished ####\n")

return amplicons, off_target_amplicons

return amplicons

def write_BLAST_warning(off_target_amplicons, amplicon_scheme, log_file):
"""
for each primer pair that has potential unspecific amplicons
write warnings to file.
"""
for amp in off_target_amplicons:
if amp in amplicon_scheme:
logging.raise_error(
f"{amp} could produce off-targets. No better amplicon in this area was found.",
log_file,
exit=False,
)
3 changes: 1 addition & 2 deletions varvamp/scripts/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# List of all known parameters. DO NOT CHANGE!
__all__ = [
'BLAST_MAX_DIFF', 'BLAST_PENALTY', 'BLAST_SETTINGS', 'BLAST_SIZE_MULTI',
'BLAST_MAX_DIFF', 'BLAST_SETTINGS', 'BLAST_SIZE_MULTI',
'END_OVERLAP',
'PCR_DNA_CONC', 'PCR_DNTP_CONC', 'PCR_DV_CONC', 'PCR_MV_CONC',
'PRIMER_3_PENALTY', 'PRIMER_GC_END', 'PRIMER_GC_PENALTY',
Expand Down Expand Up @@ -74,7 +74,6 @@
}
BLAST_MAX_DIFF = 0.5 # min percent match between primer and BLAST hit (coverage and/or mismatches)
BLAST_SIZE_MULTI = 2 # multiplier for the max_amp size of off targets (in relation to max amp size)
BLAST_PENALTY = 50 # amplicon penalty increase -> considered only if no other possibilities

# nucleotide definitions, do NOT change
NUCS = set("atcg")
Expand Down
7 changes: 0 additions & 7 deletions varvamp/scripts/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ def confirm_config(args, log_file):
(
"BLAST_MAX_DIFF",
"BLAST_SIZE_MULTI",
"BLAST_PENALTY"
)
]

Expand Down Expand Up @@ -384,7 +383,6 @@ def confirm_config(args, log_file):
("qpcr deletion size still considered for deltaG calculation", config.QAMPLICON_DEL_CUTOFF),
("maximum difference between primer and blast db", config.BLAST_MAX_DIFF),
("multiplier of the maximum length for non-specific amplicons", config.BLAST_SIZE_MULTI),
("blast penalty for off targets", config.BLAST_PENALTY)
]
for var_type, var in non_negative_var:
if var < 0:
Expand Down Expand Up @@ -468,11 +466,6 @@ def confirm_config(args, log_file):
log_file,
exit=True
)
if config.BLAST_PENALTY < 10:
raise_error(
"giving a too small penalty could result in the selection of off-target producing amplicons in the final scheme.",
log_file,
)
# confirm proper BLAST settings in dictionary
if not isinstance(config.BLAST_SETTINGS, dict):
raise_error(
Expand Down
4 changes: 2 additions & 2 deletions varvamp/scripts/primers.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,13 @@ def find_best_primers(left_primer_candidates, right_primer_candidates):
primer_candidates.sort(key=lambda x: (x[3], x[1]))
# ini everything with the primer with the lowest penalty
to_retain = [primer_candidates[0]]
primer_ranges = list(range(primer_candidates[0][1], primer_candidates[0][2]+1))
primer_ranges = list(range(primer_candidates[0][1], primer_candidates[0][2]))
primer_set = set(primer_ranges)

for primer in primer_candidates:
# get the thirds of the primer, only consider the middle
thirds_len = int((primer[2] - primer[1])/3)
primer_positions = list(range(primer[1] + thirds_len, primer[2] - thirds_len + 1))
primer_positions = list(range(primer[1] + thirds_len, primer[2] - thirds_len))
# check if none of the nucleotides of the next primer
# are already covered by a better primer
if not any(x in primer_positions for x in primer_set):
Expand Down
Loading