Skip to content

Commit

Permalink
[ADAM-1352] Clean up consensus model usage.
Browse files Browse the repository at this point in the history
Resolves bigdatagenomics#1352:

* Fixed issues with consensus model instantiation.
* Added union consensus model (to support multiple models).
* Cleaned up consensus model documentation.
* Call distinct in IndelRealigner on consensus sequences to eliminate dupes.
* Added test for known indel model. Fixed off by one bug.
* Add test for union model.
  • Loading branch information
fnothaft committed Jan 19, 2017
1 parent 3fd805d commit 3258b7c
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

// fold and get the consensus model for this realignment run
val consensusGenerator = Option(args.knownIndelsFile)
.fold(new ConsensusGeneratorFromReads().asInstanceOf[ConsensusGenerator])(
new ConsensusGeneratorFromKnowns(_, rdd.rdd.context).asInstanceOf[ConsensusGenerator]
)
.fold(ConsensusGenerator.fromReads)(file => {
ConsensusGenerator.fromKnownIndels(rdd.rdd.context.loadVariants(file))
})

// run realignment
val realignmentRdd = rdd.realignIndels(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,96 @@ import htsjdk.samtools.{ Cigar, CigarOperator }
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.adam.rdd.read.realignment.IndelRealignmentTarget
import org.bdgenomics.adam.rdd.variant.VariantRDD
import org.bdgenomics.adam.rich.RichAlignmentRecord
import scala.collection.JavaConversions._

/**
* Singleton object for creating consensus generators.
*
* Consensus generators are used in INDEL realignment to generate the consensus
* sequences that reads are realigned against. We have three consensus modes:
*
* * From reads: This mode looks at the read alignments for evidence of INDELs.
* Any INDEL that shows up in a read alignment is extracted and used to
* generate a consensus sequence.
* * From reads with Smith-Waterman: This mode discards the original read
* alignments and uses Smith-Waterman to locally realign the read. If this
* realignment leads to a read being aligned with an insertion or deletion
* against the reference, we generate a consensus sequence for the INDEL.
* * From knowns: In this mode, we use a set of provided INDEL variants as the
* INDELs to generate consensus sequences for.
*
* Additionally, we support a union operator, that takes the union of several
* consensus modes.
*/
object ConsensusGenerator {

/**
* Provides a generator to extract consensuses from aligned reads.
*
* @return A consensus generator that looks directly at aligned reads. Here,
* consensus sequences are extracted by substituting INDELs that are
* present in a single aligned read back into the reference sequence where
* they are aligned.
*/
def fromReads: ConsensusGenerator = {
new ConsensusGeneratorFromReads
}

/**
* Provides a generator to extract consensuses by realigning reads.
*
* @param wMatch Match weight to use for Smith-Waterman.
* @param wMismatch Mismatch penalty to use for Smith-Waterman.
* @param wInsert Insert penalty to use for Smith-Waterman.
* @param wDelete Deletion penalty to use for Smith-Waterman.
* @return A consensus generator that uses Smith-Waterman to realign reads to
* the reference sequence they overlap. INDELs that are present after this
* realignment stage are then used as targets for a second realignment phase.
*/
def fromReadsWithSmithWaterman(wMatch: Double,
wMismatch: Double,
wInsert: Double,
wDelete: Double): ConsensusGenerator = {
new ConsensusGeneratorFromSmithWaterman(wMatch,
wMismatch,
wInsert,
wDelete)
}

/**
* Provides a generator to extract consensuses from a known set of INDELs.
*
* @param rdd The previously called INDEL variants.
* @return A consensus generator that looks at previously called INDELs.
*/
def fromKnownIndels(rdd: VariantRDD): ConsensusGenerator = {
new ConsensusGeneratorFromKnowns(rdd.rdd)
}

/**
* Provides a generator to extract consensuses using several methods.
*
* @return A consensus generator that generates consensuses with several
* methods.
*/
def union(generators: ConsensusGenerator*): ConsensusGenerator = {
UnionConsensusGenerator(generators.toSeq)
}
}

/**
* Trait for generating consensus sequences for INDEL realignment.
*
* INDEL realignment scores read alinments against the reference genome and
* INDEL realignment scores read alignments against the reference genome and
* a set of "consensus" sequences. These consensus sequences represent alternate
* alleles/haplotypes, and can be generated via a variety of methods (e.g., seen
* in previous projects --> 1kg INDELs, seen in read alignments, etc). This
* trait provides an interface that a consensus generation method should
* implement to provide it's consensus sequences to the realigner.
*/
private[adam] trait ConsensusGenerator extends Serializable {
trait ConsensusGenerator extends Serializable {

/**
* @param cigar The CIGAR to process.
Expand Down Expand Up @@ -62,6 +138,8 @@ private[adam] trait ConsensusGenerator extends Serializable {
* indel normalization.
*
* @param reads Reads to preprocess.
* @param reference Reference genome bases covering this target.
* @param region The region covered by this target.
* @return Preprocessed reads.
*/
def preprocessReadsForRealignment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ import scala.transient
* @param file Path to file containing variants.
* @param sc Spark context to use.
*/
private[adam] class ConsensusGeneratorFromKnowns(file: String,
sc: SparkContext) extends ConsensusGenerator {
private[adam] class ConsensusGeneratorFromKnowns(rdd: RDD[Variant]) extends ConsensusGenerator {

private val indelTable = sc.broadcast(IndelTable(file, sc))
private val indelTable = rdd.context.broadcast(IndelTable(rdd))

/**
* Generates targets to add to initial set of indel realignment targets, if additional
Expand All @@ -48,10 +47,11 @@ private[adam] class ConsensusGeneratorFromKnowns(file: String,
* @return Returns an option which wraps an RDD of indel realignment targets.
*/
def targetsToAdd(): Option[RDD[IndelRealignmentTarget]] = {
val rdd: RDD[Variant] = sc.loadVariants(file).rdd

Some(rdd.filter(v => v.getReferenceAllele.length != v.getAlternateAllele.length)
.map(v => ReferenceRegion(v.getContigName, v.getStart, v.getStart + v.getReferenceAllele.length))
.map(v => ReferenceRegion(v.getContigName,
v.getStart,
v.getStart + v.getReferenceAllele.length))
.map(r => new IndelRealignmentTarget(Some(r), r)))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.algorithms.consensus

import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.adam.rdd.read.realignment.IndelRealignmentTarget
import org.bdgenomics.adam.rich.RichAlignmentRecord

/**
* A consensus generator that wraps multiple other consensus generators.
*
* @param generator The consensus generator implementations to delegate to.
*/
private[adam] case class UnionConsensusGenerator(
generators: Iterable[ConsensusGenerator]) extends ConsensusGenerator {

/**
* Adds optional known targets before reads are examined.
*
* Loops over the underlying consensus generators and calls their method for
* generating known targets. If all underlying methods return no targets, then
* we also return no targets.
*
* @return Returns all a priori known targets.
*/
def targetsToAdd(): Option[RDD[IndelRealignmentTarget]] = {
val possibleTargets = generators.flatMap(_.targetsToAdd)

if (possibleTargets.isEmpty) {
None
} else {
Some(possibleTargets.head.context.union(possibleTargets.toSeq))
}
}

/**
* Preprocesses reads aligned to a given target.
*
* Loops over the underlying methods and calls their preprocessing algorithms.
* The preprocessed reads are then passed to each new algorithm. I.e., the
* first underlying implementation will run on the unprocessed reads, then the
* output of the first implementation's preprocessing method will be passed
* as input to the second algorithm's preprocessing method.
*
* @param reads Reads to preprocess.
* @param reference Reference genome bases covering this target.
* @param region The region covered by this target.
* @return Preprocessed reads, as treated by all preprocessing methods..
*/
def preprocessReadsForRealignment(
reads: Iterable[RichAlignmentRecord],
reference: String,
region: ReferenceRegion): Iterable[RichAlignmentRecord] = {
generators.foldRight(reads)(
(g, r) => g.preprocessReadsForRealignment(r, reference, region))
}

/**
* For all reads in this region and all methods, generates the consensus sequences.
*
* We do not guarantee that there are no duplicate consensuses in the output
* of this method, since multiple underlying methods may generate a single
* consensus sequence. Instead, we defer deduplication to the INDEL realigner.
*
* @param reads Reads to generate consensus sequences from.
* @return Consensus sequences to use for realignment.
*/
def findConsensus(reads: Iterable[RichAlignmentRecord]): Iterable[Consensus] = {
generators.flatMap(_.findConsensus(reads))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,6 @@ private[adam] class IndelTable(private val table: Map[String, Iterable[Consensus

private[adam] object IndelTable {

/**
* Creates an indel table from a file containing known indels.
*
* @param knownIndelsFile Path to file with known indels.
* @param sc SparkContext to use for loading.
* @return Returns a table with the known indels populated.
*/
def apply(knownIndelsFile: String, sc: SparkContext): IndelTable = {
val rdd: RDD[Variant] = sc.loadVariants(knownIndelsFile).rdd
apply(rdd)
}

/**
* Creates an indel table from an RDD containing known variants.
*
Expand All @@ -76,11 +64,12 @@ private[adam] object IndelTable {
val deletionLength = v.getReferenceAllele.length - v.getAlternateAllele.length
val start = v.getStart + v.getAlternateAllele.length

Consensus("", ReferenceRegion(referenceName, start, start + deletionLength))
Consensus("", ReferenceRegion(referenceName, start, start + deletionLength + 1))
} else {
val start = v.getStart + v.getReferenceAllele.length

Consensus(v.getAlternateAllele.drop(v.getReferenceAllele.length), ReferenceRegion(referenceName, start, start + 1))
Consensus(v.getAlternateAllele.drop(v.getReferenceAllele.length),
ReferenceRegion(referenceName, start, start + 1))
}

(referenceName, consensus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import htsjdk.samtools.{ Cigar, CigarElement, CigarOperator }
import org.bdgenomics.utils.misc.Logging
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.algorithms.consensus.{ Consensus, ConsensusGenerator, ConsensusGeneratorFromReads }
import org.bdgenomics.adam.algorithms.consensus.{ Consensus, ConsensusGenerator }
import org.bdgenomics.adam.models.{ MdTag, ReferencePosition, ReferenceRegion }
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.rich.RichAlignmentRecord._
Expand All @@ -43,7 +43,7 @@ private[read] object RealignIndels extends Serializable with Logging {
*/
def apply(
rdd: RDD[AlignmentRecord],
consensusModel: ConsensusGenerator = new ConsensusGeneratorFromReads,
consensusModel: ConsensusGenerator = ConsensusGenerator.fromReads,
dataIsSorted: Boolean = false,
maxIndelSize: Int = 500,
maxConsensusNumber: Int = 30,
Expand Down Expand Up @@ -222,7 +222,7 @@ private[read] object RealignIndels extends Serializable with Logging {
import org.bdgenomics.adam.rdd.read.realignment.RealignIndels._

private[read] class RealignIndels(
val consensusModel: ConsensusGenerator = new ConsensusGeneratorFromReads,
val consensusModel: ConsensusGenerator = ConsensusGenerator.fromReads,
val dataIsSorted: Boolean = false,
val maxIndelSize: Int = 500,
val maxConsensusNumber: Int = 30,
Expand Down Expand Up @@ -258,6 +258,8 @@ private[read] class RealignIndels(
refRegion
)
var consensus = consensusModel.findConsensus(readsToClean)
.toSeq
.distinct

// reduce count of consensus sequences
if (consensus.size > maxConsensusNumber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class ADAMKryoRegistrator extends KryoRegistrator {

// org.bdgenomics.adam.models
kryo.register(classOf[org.bdgenomics.adam.models.Coverage])
kryo.register(classOf[org.bdgenomics.adam.models.IndelTable])
kryo.register(classOf[org.bdgenomics.adam.models.MdTag])
kryo.register(classOf[org.bdgenomics.adam.models.MultiContigNonoverlappingRegions])
kryo.register(classOf[org.bdgenomics.adam.models.NonoverlappingRegions])
Expand Down
Loading

0 comments on commit 3258b7c

Please sign in to comment.