diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala index 5b5cd061f4..360ea5a99c 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala @@ -174,9 +174,9 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans // fold and get the consensus model for this realignment run val consensusGenerator = Option(args.knownIndelsFile) - .fold(new ConsensusGeneratorFromReads().asInstanceOf[ConsensusGenerator])( - new ConsensusGeneratorFromKnowns(_, rdd.rdd.context).asInstanceOf[ConsensusGenerator] - ) + .fold(ConsensusGenerator.fromReads)(file => { + ConsensusGenerator.fromKnownIndels(rdd.rdd.context.loadVariants(file)) + }) // run realignment val realignmentRdd = rdd.realignIndels( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGenerator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGenerator.scala index 961389b4b4..68ae8bc055 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGenerator.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGenerator.scala @@ -21,20 +21,96 @@ import htsjdk.samtools.{ Cigar, CigarOperator } import org.apache.spark.rdd.RDD import org.bdgenomics.adam.models.ReferenceRegion import org.bdgenomics.adam.rdd.read.realignment.IndelRealignmentTarget +import org.bdgenomics.adam.rdd.variant.VariantRDD import org.bdgenomics.adam.rich.RichAlignmentRecord import scala.collection.JavaConversions._ +/** + * Singleton object for creating consensus generators. + * + * Consensus generators are used in INDEL realignment to generate the consensus + * sequences that reads are realigned against. We have three consensus modes: + * + * * From reads: This mode looks at the read alignments for evidence of INDELs. + * Any INDEL that shows up in a read alignment is extracted and used to + * generate a consensus sequence. + * * From reads with Smith-Waterman: This mode discards the original read + * alignments and uses Smith-Waterman to locally realign the read. If this + * realignment leads to a read being aligned with an insertion or deletion + * against the reference, we generate a consensus sequence for the INDEL. + * * From knowns: In this mode, we use a set of provided INDEL variants as the + * INDELs to generate consensus sequences for. + * + * Additionally, we support a union operator, that takes the union of several + * consensus modes. + */ +object ConsensusGenerator { + + /** + * Provides a generator to extract consensuses from aligned reads. + * + * @return A consensus generator that looks directly at aligned reads. Here, + * consensus sequences are extracted by substituting INDELs that are + * present in a single aligned read back into the reference sequence where + * they are aligned. + */ + def fromReads: ConsensusGenerator = { + new ConsensusGeneratorFromReads + } + + /** + * Provides a generator to extract consensuses by realigning reads. + * + * @param wMatch Match weight to use for Smith-Waterman. + * @param wMismatch Mismatch penalty to use for Smith-Waterman. + * @param wInsert Insert penalty to use for Smith-Waterman. + * @param wDelete Deletion penalty to use for Smith-Waterman. + * @return A consensus generator that uses Smith-Waterman to realign reads to + * the reference sequence they overlap. INDELs that are present after this + * realignment stage are then used as targets for a second realignment phase. + */ + def fromReadsWithSmithWaterman(wMatch: Double, + wMismatch: Double, + wInsert: Double, + wDelete: Double): ConsensusGenerator = { + new ConsensusGeneratorFromSmithWaterman(wMatch, + wMismatch, + wInsert, + wDelete) + } + + /** + * Provides a generator to extract consensuses from a known set of INDELs. + * + * @param rdd The previously called INDEL variants. + * @return A consensus generator that looks at previously called INDELs. + */ + def fromKnownIndels(rdd: VariantRDD): ConsensusGenerator = { + new ConsensusGeneratorFromKnowns(rdd.rdd) + } + + /** + * Provides a generator to extract consensuses using several methods. + * + * @return A consensus generator that generates consensuses with several + * methods. + */ + def union(generators: ConsensusGenerator*): ConsensusGenerator = { + UnionConsensusGenerator(generators.toSeq) + } +} + /** * Trait for generating consensus sequences for INDEL realignment. * - * INDEL realignment scores read alinments against the reference genome and + * INDEL realignment scores read alignments against the reference genome and * a set of "consensus" sequences. These consensus sequences represent alternate * alleles/haplotypes, and can be generated via a variety of methods (e.g., seen * in previous projects --> 1kg INDELs, seen in read alignments, etc). This * trait provides an interface that a consensus generation method should * implement to provide it's consensus sequences to the realigner. */ -private[adam] trait ConsensusGenerator extends Serializable { +trait ConsensusGenerator extends Serializable { /** * @param cigar The CIGAR to process. @@ -62,6 +138,8 @@ private[adam] trait ConsensusGenerator extends Serializable { * indel normalization. * * @param reads Reads to preprocess. + * @param reference Reference genome bases covering this target. + * @param region The region covered by this target. * @return Preprocessed reads. */ def preprocessReadsForRealignment( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromKnowns.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromKnowns.scala index d08fc121f7..d1f7780349 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromKnowns.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromKnowns.scala @@ -36,10 +36,9 @@ import scala.transient * @param file Path to file containing variants. * @param sc Spark context to use. */ -private[adam] class ConsensusGeneratorFromKnowns(file: String, - sc: SparkContext) extends ConsensusGenerator { +private[adam] class ConsensusGeneratorFromKnowns(rdd: RDD[Variant]) extends ConsensusGenerator { - private val indelTable = sc.broadcast(IndelTable(file, sc)) + private val indelTable = rdd.context.broadcast(IndelTable(rdd)) /** * Generates targets to add to initial set of indel realignment targets, if additional @@ -48,10 +47,11 @@ private[adam] class ConsensusGeneratorFromKnowns(file: String, * @return Returns an option which wraps an RDD of indel realignment targets. */ def targetsToAdd(): Option[RDD[IndelRealignmentTarget]] = { - val rdd: RDD[Variant] = sc.loadVariants(file).rdd Some(rdd.filter(v => v.getReferenceAllele.length != v.getAlternateAllele.length) - .map(v => ReferenceRegion(v.getContigName, v.getStart, v.getStart + v.getReferenceAllele.length)) + .map(v => ReferenceRegion(v.getContigName, + v.getStart, + v.getStart + v.getReferenceAllele.length)) .map(r => new IndelRealignmentTarget(Some(r), r))) } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/UnionConsensusGenerator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/UnionConsensusGenerator.scala new file mode 100644 index 0000000000..700f9b550f --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/UnionConsensusGenerator.scala @@ -0,0 +1,87 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.algorithms.consensus + +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.models.ReferenceRegion +import org.bdgenomics.adam.rdd.read.realignment.IndelRealignmentTarget +import org.bdgenomics.adam.rich.RichAlignmentRecord + +/** + * A consensus generator that wraps multiple other consensus generators. + * + * @param generator The consensus generator implementations to delegate to. + */ +private[adam] case class UnionConsensusGenerator( + generators: Iterable[ConsensusGenerator]) extends ConsensusGenerator { + + /** + * Adds optional known targets before reads are examined. + * + * Loops over the underlying consensus generators and calls their method for + * generating known targets. If all underlying methods return no targets, then + * we also return no targets. + * + * @return Returns all a priori known targets. + */ + def targetsToAdd(): Option[RDD[IndelRealignmentTarget]] = { + val possibleTargets = generators.flatMap(_.targetsToAdd) + + if (possibleTargets.isEmpty) { + None + } else { + Some(possibleTargets.head.context.union(possibleTargets.toSeq)) + } + } + + /** + * Preprocesses reads aligned to a given target. + * + * Loops over the underlying methods and calls their preprocessing algorithms. + * The preprocessed reads are then passed to each new algorithm. I.e., the + * first underlying implementation will run on the unprocessed reads, then the + * output of the first implementation's preprocessing method will be passed + * as input to the second algorithm's preprocessing method. + * + * @param reads Reads to preprocess. + * @param reference Reference genome bases covering this target. + * @param region The region covered by this target. + * @return Preprocessed reads, as treated by all preprocessing methods.. + */ + def preprocessReadsForRealignment( + reads: Iterable[RichAlignmentRecord], + reference: String, + region: ReferenceRegion): Iterable[RichAlignmentRecord] = { + generators.foldRight(reads)( + (g, r) => g.preprocessReadsForRealignment(r, reference, region)) + } + + /** + * For all reads in this region and all methods, generates the consensus sequences. + * + * We do not guarantee that there are no duplicate consensuses in the output + * of this method, since multiple underlying methods may generate a single + * consensus sequence. Instead, we defer deduplication to the INDEL realigner. + * + * @param reads Reads to generate consensus sequences from. + * @return Consensus sequences to use for realignment. + */ + def findConsensus(reads: Iterable[RichAlignmentRecord]): Iterable[Consensus] = { + generators.flatMap(_.findConsensus(reads)) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/IndelTable.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/IndelTable.scala index 242195329d..6b19ed77d1 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/IndelTable.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/IndelTable.scala @@ -49,18 +49,6 @@ private[adam] class IndelTable(private val table: Map[String, Iterable[Consensus private[adam] object IndelTable { - /** - * Creates an indel table from a file containing known indels. - * - * @param knownIndelsFile Path to file with known indels. - * @param sc SparkContext to use for loading. - * @return Returns a table with the known indels populated. - */ - def apply(knownIndelsFile: String, sc: SparkContext): IndelTable = { - val rdd: RDD[Variant] = sc.loadVariants(knownIndelsFile).rdd - apply(rdd) - } - /** * Creates an indel table from an RDD containing known variants. * @@ -76,11 +64,12 @@ private[adam] object IndelTable { val deletionLength = v.getReferenceAllele.length - v.getAlternateAllele.length val start = v.getStart + v.getAlternateAllele.length - Consensus("", ReferenceRegion(referenceName, start, start + deletionLength)) + Consensus("", ReferenceRegion(referenceName, start, start + deletionLength + 1)) } else { val start = v.getStart + v.getReferenceAllele.length - Consensus(v.getAlternateAllele.drop(v.getReferenceAllele.length), ReferenceRegion(referenceName, start, start + 1)) + Consensus(v.getAlternateAllele.drop(v.getReferenceAllele.length), + ReferenceRegion(referenceName, start, start + 1)) } (referenceName, consensus) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndels.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndels.scala index 4000756833..874d743326 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndels.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndels.scala @@ -21,7 +21,7 @@ import htsjdk.samtools.{ Cigar, CigarElement, CigarOperator } import org.bdgenomics.utils.misc.Logging import org.apache.spark.rdd.MetricsContext._ import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.algorithms.consensus.{ Consensus, ConsensusGenerator, ConsensusGeneratorFromReads } +import org.bdgenomics.adam.algorithms.consensus.{ Consensus, ConsensusGenerator } import org.bdgenomics.adam.models.{ MdTag, ReferencePosition, ReferenceRegion } import org.bdgenomics.adam.rich.RichAlignmentRecord import org.bdgenomics.adam.rich.RichAlignmentRecord._ @@ -43,7 +43,7 @@ private[read] object RealignIndels extends Serializable with Logging { */ def apply( rdd: RDD[AlignmentRecord], - consensusModel: ConsensusGenerator = new ConsensusGeneratorFromReads, + consensusModel: ConsensusGenerator = ConsensusGenerator.fromReads, dataIsSorted: Boolean = false, maxIndelSize: Int = 500, maxConsensusNumber: Int = 30, @@ -222,7 +222,7 @@ private[read] object RealignIndels extends Serializable with Logging { import org.bdgenomics.adam.rdd.read.realignment.RealignIndels._ private[read] class RealignIndels( - val consensusModel: ConsensusGenerator = new ConsensusGeneratorFromReads, + val consensusModel: ConsensusGenerator = ConsensusGenerator.fromReads, val dataIsSorted: Boolean = false, val maxIndelSize: Int = 500, val maxConsensusNumber: Int = 30, @@ -258,6 +258,8 @@ private[read] class RealignIndels( refRegion ) var consensus = consensusModel.findConsensus(readsToClean) + .toSeq + .distinct // reduce count of consensus sequences if (consensus.size > maxConsensusNumber) { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala index 46bbd19f67..d5ac6d9e36 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala @@ -126,6 +126,7 @@ class ADAMKryoRegistrator extends KryoRegistrator { // org.bdgenomics.adam.models kryo.register(classOf[org.bdgenomics.adam.models.Coverage]) + kryo.register(classOf[org.bdgenomics.adam.models.IndelTable]) kryo.register(classOf[org.bdgenomics.adam.models.MdTag]) kryo.register(classOf[org.bdgenomics.adam.models.MultiContigNonoverlappingRegions]) kryo.register(classOf[org.bdgenomics.adam.models.NonoverlappingRegions]) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndelsSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndelsSuite.scala index 7a050fb1c0..5bf148afb1 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndelsSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndelsSuite.scala @@ -18,13 +18,17 @@ package org.bdgenomics.adam.rdd.read.realignment import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.algorithms.consensus.Consensus +import org.bdgenomics.adam.algorithms.consensus.{ + Consensus, + ConsensusGenerator +} import org.bdgenomics.adam.models.ReferencePosition import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +import org.bdgenomics.adam.rdd.variant.VariantRDD import org.bdgenomics.adam.rich.RichAlignmentRecord import org.bdgenomics.adam.util.ADAMFunSuite -import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } +import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig, Variant } class RealignIndelsSuite extends ADAMFunSuite { @@ -37,9 +41,9 @@ class RealignIndelsSuite extends ADAMFunSuite { artificialReadsRdd.rdd } - def artificialRealignedReads: RDD[AlignmentRecord] = { + def artificialRealignedReads(cg: ConsensusGenerator = ConsensusGenerator.fromReads): RDD[AlignmentRecord] = { artificialReadsRdd - .realignIndels() + .realignIndels(consensusModel = cg) .sortReadsByReferencePosition() .rdd } @@ -133,13 +137,80 @@ class RealignIndelsSuite extends ADAMFunSuite { } sparkTest("checking realigned reads for artificial input") { - val artificialRealignedReads_collected = artificialRealignedReads.collect() - val gatkArtificialRealignedReads_collected = gatkArtificialRealignedReads.collect() + val artificialRealignedReadsCollected = artificialRealignedReads() + .collect() + val gatkArtificialRealignedReadsCollected = gatkArtificialRealignedReads + .collect() + + assert(artificialRealignedReadsCollected.size === gatkArtificialRealignedReadsCollected.size) + + val artificialRead4 = artificialRealignedReadsCollected.filter(_.getReadName == "read4") + val gatkRead4 = gatkArtificialRealignedReadsCollected.filter(_.getReadName == "read4") + val result = artificialRead4.zip(gatkRead4) + + assert(result.forall( + pair => pair._1.getReadName == pair._2.getReadName)) + assert(result.forall( + pair => pair._1.getStart == pair._2.getStart)) + assert(result.forall( + pair => pair._1.getCigar == pair._2.getCigar)) + assert(result.forall( + pair => pair._1.getMapq == pair._2.getMapq)) + } - assert(artificialRealignedReads_collected.size === gatkArtificialRealignedReads_collected.size) + sparkTest("checking realigned reads for artificial input using knowns") { + val indel = Variant.newBuilder() + .setContigName("artificial") + .setStart(33) + .setEnd(44) + .setReferenceAllele("AGGGGGGGGGG") + .setAlternateAllele("A") + .build + val variantRdd = VariantRDD(sc.parallelize(Seq(indel)), + artificialReadsRdd.sequences) + val knowns = ConsensusGenerator.fromKnownIndels(variantRdd) + val artificialRealignedReadsCollected = artificialRealignedReads(cg = knowns) + .collect() + val gatkArtificialRealignedReadsCollected = gatkArtificialRealignedReads + .collect() + + assert(artificialRealignedReadsCollected.size === gatkArtificialRealignedReadsCollected.size) + + val artificialRead4 = artificialRealignedReadsCollected.filter(_.getReadName == "read4") + val gatkRead4 = gatkArtificialRealignedReadsCollected.filter(_.getReadName == "read4") + val result = artificialRead4.zip(gatkRead4) - val artificialRead4 = artificialRealignedReads_collected.filter(_.getReadName == "read4") - val gatkRead4 = gatkArtificialRealignedReads_collected.filter(_.getReadName == "read4") + assert(result.forall( + pair => pair._1.getReadName == pair._2.getReadName)) + assert(result.forall( + pair => pair._1.getStart == pair._2.getStart)) + assert(result.forall( + pair => pair._1.getCigar == pair._2.getCigar)) + assert(result.forall( + pair => pair._1.getMapq == pair._2.getMapq)) + } + + sparkTest("checking realigned reads for artificial input using knowns and reads") { + val indel = Variant.newBuilder() + .setContigName("artificial") + .setStart(33) + .setEnd(44) + .setReferenceAllele("AGGGGGGGGGG") + .setAlternateAllele("A") + .build + val variantRdd = VariantRDD(sc.parallelize(Seq(indel)), + artificialReadsRdd.sequences) + val knowns = ConsensusGenerator.fromKnownIndels(variantRdd) + val union = ConsensusGenerator.union(knowns, ConsensusGenerator.fromReads) + val artificialRealignedReadsCollected = artificialRealignedReads(cg = union) + .collect() + val gatkArtificialRealignedReadsCollected = gatkArtificialRealignedReads + .collect() + + assert(artificialRealignedReadsCollected.size === gatkArtificialRealignedReadsCollected.size) + + val artificialRead4 = artificialRealignedReadsCollected.filter(_.getReadName == "read4") + val gatkRead4 = gatkArtificialRealignedReadsCollected.filter(_.getReadName == "read4") val result = artificialRead4.zip(gatkRead4) assert(result.forall( @@ -254,16 +325,18 @@ class RealignIndelsSuite extends ADAMFunSuite { } sparkTest("test OP and OC tags") { - artificialRealignedReads.collect().foreach(realn => { - val readName = realn.getReadName() - val op = realn.getOldPosition() - val oc = realn.getOldCigar() - - Option(op).filter(_ >= 0).foreach(oPos => { - val s = artificialReads.collect().filter(x => (x.getReadName() == readName)) - assert(s.filter(x => (x.getStart() == oPos)).length > 0) - assert(s.filter(x => (x.getCigar() == oc)).length > 0) + artificialRealignedReads() + .collect() + .foreach(realn => { + val readName = realn.getReadName() + val op = realn.getOldPosition() + val oc = realn.getOldCigar() + + Option(op).filter(_ >= 0).foreach(oPos => { + val s = artificialReads.collect().filter(x => (x.getReadName() == readName)) + assert(s.filter(x => (x.getStart() == oPos)).length > 0) + assert(s.filter(x => (x.getCigar() == oc)).length > 0) + }) }) - }) } }