diff --git a/conf/modules.config b/conf/modules.config index 6244108..e77ab87 100644 --- a/conf/modules.config +++ b/conf/modules.config @@ -35,15 +35,26 @@ process { withName: SAMTOOLS_COLLATETOFASTA { beforeScript = { "export REF_PATH=spoof"} ext.args = { (params.use_work_dir_as_temp ? "-T." : "") } + ext.prefix = { "${meta.chunk_id}" } + } + + withName: SAMTOOLS_FILTERTOFASTQ { + ext.prefix = { "${meta.chunk_id}" } } withName: BLAST_BLASTN { ext.args = '-task blastn -reward 1 -penalty -5 -gapopen 3 -gapextend 3 -dust yes -soft_masking true -evalue .01 -searchsp 1750000000000 -outfmt 6' + ext.prefix = { "${meta.chunk_id}" } + } + + withName: PACBIO_FILTER { + ext.prefix = { "${meta.chunk_id}" } } withName: SAMTOOLS_CONVERT { beforeScript = { "export REF_PATH=spoof"} - ext.args = "-be '[rq]>=0.99' -x fi -x fp -x ri -x rp --write-index" + ext.args = "--output-fmt bam --write-index" + ext.prefix = { "${meta.chunk_id}" } } withName: CONVERT_CRAM { @@ -100,6 +111,11 @@ process { ext.args4 = { "--write-index -l1" } } + withName: '.*:.*:ALIGN_HIFI:MINIMAP2_ALIGN' { + ext.args = { "-ax map-hifi --cs=short -R ${meta.read_group} -I" + Math.ceil(meta2.genome_size/1e9) + 'G' } + ext.prefix = { "${meta.chunk_id}" } + } + withName: ".*:ALIGN_CLR:.*:CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT" { ext.args = "" ext.args1 = { "-F 0x200 -nt" } diff --git a/modules/local/cram_filter.nf b/modules/local/cram_filter.nf new file mode 100644 index 0000000..faea46f --- /dev/null +++ b/modules/local/cram_filter.nf @@ -0,0 +1,46 @@ +process CRAM_FILTER { + tag "$meta.chunk_id" + label "process_high" + + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'https://depot.galaxyproject.org/singularity/mulled-v2-1a6fe65bd6674daba65066aa796ed8f5e8b4687b:688e175eb0db54de17822ba7810cc9e20fa06dd5-0' : + 'biocontainers/mulled-v2-1a6fe65bd6674daba65066aa796ed8f5e8b4687b:688e175eb0db54de17822ba7810cc9e20fa06dd5-0' }" + + input: + tuple val(meta), path(cramfile), path(cramindex), val(from), val(to), val(base), val(chunkid), val(rglines), path(reference) + + output: + tuple val(meta), path("*.cram"), emit: cram + path "versions.yml" , emit: versions + + when: + task.ext.when == null || task.ext.when + + script: + def args = task.ext.args ?: '' + def prefix = task.ext.prefix ?: "${meta.id}" + def VERSION = "1.15" // Staden_io versions break the pipeline + """ + cram_filter -n ${from}-${to} ${cramfile} ${prefix}_${base}_${chunkid}.cram + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + samtools: \$(echo \$(samtools --version 2>&1) | sed 's/^.*samtools //; s/Using.*\$//' ) + staden_io: $VERSION + END_VERSIONS + """ + + stub: + def prefix = task.ext.prefix ?: "${meta.id}" + def base = "45022_3#2" + def chunkid = "1" + """ + touch ${prefix}_${base}_${chunkid}_filtered.cram + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + samtools: \$(echo \$(samtools --version 2>&1) | sed 's/^.*samtools //; s/Using.*\$//' ) + staden_io: $VERSION + END_VERSIONS + """ +} diff --git a/subworkflows/local/align_pacbio.nf b/subworkflows/local/align_pacbio.nf index 59e039c..1b6e409 100644 --- a/subworkflows/local/align_pacbio.nf +++ b/subworkflows/local/align_pacbio.nf @@ -6,10 +6,10 @@ include { FILTER_PACBIO } from '../../subworkflows/local/filter_pacbio' include { SAMTOOLS_ADDREPLACERG } from '../../modules/local/samtools_addreplacerg' include { SAMTOOLS_INDEX } from '../../modules/nf-core/samtools/index/main' include { GENERATE_CRAM_CSV } from '../../modules/local/generate_cram_csv' -include { MINIMAP2_MAPREDUCE } from '../../subworkflows/local/minimap2_mapreduce' include { SAMTOOLS_SORMADUP as CONVERT_CRAM } from '../../modules/local/samtools_sormadup' include { SAMTOOLS_MERGE } from '../../modules/nf-core/samtools/merge/main' - +include { CREATE_CRAM_FILTER_INPUT } from '../../subworkflows/local/create_cram_filter_input' +include { MINIMAP2_ALIGN } from '../../modules/nf-core/minimap2/align/main' workflow ALIGN_PACBIO { take: @@ -22,54 +22,40 @@ workflow ALIGN_PACBIO { ch_versions = Channel.empty() ch_merged_bam = Channel.empty() - // Filter BAM and output as FASTQ - FILTER_PACBIO ( reads, db ) - ch_versions = ch_versions.mix ( FILTER_PACBIO.out.versions ) - - // Convert FASTQ to CRAM - CONVERT_CRAM ( FILTER_PACBIO.out.fastq, fasta ) + // Convert input to CRAM + CONVERT_CRAM ( reads, fasta ) ch_versions = ch_versions.mix ( CONVERT_CRAM.out.versions ) SAMTOOLS_ADDREPLACERG ( CONVERT_CRAM.out.bam ) ch_versions = ch_versions.mix ( SAMTOOLS_ADDREPLACERG.out.versions ) - SAMTOOLS_ADDREPLACERG.out.cram - | set { ch_reads_cram } - // Index the CRAM file - SAMTOOLS_INDEX ( ch_reads_cram ) + SAMTOOLS_INDEX ( SAMTOOLS_ADDREPLACERG.out.cram ) ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions ) - ch_reads_cram + SAMTOOLS_ADDREPLACERG.out.cram | join ( SAMTOOLS_INDEX.out.crai ) - | set { ch_reads_cram_crai } - + | set { ch_reads_cram } - // - // MODULE: generate a CRAM CSV file containing the required parametres for CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT - // - GENERATE_CRAM_CSV( ch_reads_cram_crai ) + GENERATE_CRAM_CSV( ch_reads_cram ) ch_versions = ch_versions.mix( GENERATE_CRAM_CSV.out.versions ) - // - // SUBWORKFLOW: mapping pacbio reads using minimap2 - // - MINIMAP2_MAPREDUCE ( - fasta, - GENERATE_CRAM_CSV.out.csv - ) - ch_versions = ch_versions.mix( MINIMAP2_MAPREDUCE.out.versions ) - ch_merged_bam = ch_merged_bam.mix(MINIMAP2_MAPREDUCE.out.mergedbam) - - ch_merged_bam - | combine( ch_reads_cram_crai ) - | map { meta_bam, bam, meta_cram, cram, crai -> [ meta_cram, bam ] } - | set { ch_merged_bam } - - // Collect all BAM output by sample name - ch_merged_bam + CREATE_CRAM_FILTER_INPUT ( GENERATE_CRAM_CSV.out.csv, fasta ) + ch_versions = ch_versions.mix( CREATE_CRAM_FILTER_INPUT.out.versions ) + + // Filter BAM and output as FASTQ + FILTER_PACBIO ( CREATE_CRAM_FILTER_INPUT.out.chunked_cram, db ) + ch_versions = ch_versions.mix ( FILTER_PACBIO.out.versions ) + + // Align without map reduce + // Align Fastq to Genome with minimap2. bam_format is set to true, making the output a *sorted* BAM + MINIMAP2_ALIGN ( FILTER_PACBIO.out.fastq, fasta, true, "bai", false, false ) + ch_versions = ch_versions.mix ( MINIMAP2_ALIGN.out.versions.first() ) + + // Collect all alignment output by sample name + MINIMAP2_ALIGN.out.bam | map { meta, bam -> [['id': meta.id.split('_')[0..-2].join('_'), 'datatype': meta.datatype], meta.read_count, bam] } - | groupTuple( by: [0] ) + | groupTuple ( by: [0] ) | map { meta, read_counts, bams -> [meta + [read_count: read_counts.sum()], bams] } | branch { meta, bams -> @@ -81,7 +67,7 @@ workflow ALIGN_PACBIO { // Merge, but only if there is more than 1 file SAMTOOLS_MERGE ( ch_bams.multi_bams, [ [], [] ], [ [], [] ] ) - ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions ) + ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions.first() ) // Convert merged BAM to CRAM and calculate indices and statistics diff --git a/subworkflows/local/create_cram_filter_input.nf b/subworkflows/local/create_cram_filter_input.nf new file mode 100644 index 0000000..f8fc133 --- /dev/null +++ b/subworkflows/local/create_cram_filter_input.nf @@ -0,0 +1,42 @@ +include { CRAM_FILTER } from '../../modules/local/cram_filter' + +workflow CREATE_CRAM_FILTER_INPUT { + take: + csv_ch + fasta + + main: + ch_versions = Channel.empty() + + // Generate input channel for CRAM_FILTER + csv_ch + |splitCsv() + |combine(fasta) + |map { cram_id, cram_info, ref_id, ref_dir -> + tuple([ + id: cram_id.id, + chunk_id: cram_id.id + "_" + cram_info[5], + genome_size: ref_id.genome_size, + read_count: cram_id.read_count, + read_group: cram_id.read_group, + datatype: cram_id.datatype, + ], + file(cram_info[0]), + cram_info[1], + cram_info[2], + cram_info[3], + cram_info[4], + cram_info[5], + cram_info[6], + ref_dir + ) + } + | set { ch_cram_filter_input } + + CRAM_FILTER(ch_cram_filter_input) + ch_versions = ch_versions.mix(CRAM_FILTER.out.versions) + + emit: + chunked_cram = CRAM_FILTER.out.cram + versions = ch_versions +} diff --git a/subworkflows/local/filter_pacbio.nf b/subworkflows/local/filter_pacbio.nf index 5edb338..713ba5f 100644 --- a/subworkflows/local/filter_pacbio.nf +++ b/subworkflows/local/filter_pacbio.nf @@ -9,7 +9,7 @@ include { BLAST_BLASTN } from '../../modules/nf-core/blast/ include { PACBIO_FILTER } from '../../modules/local/pacbio_filter' include { SAMTOOLS_FILTERTOFASTQ } from '../../modules/local/samtools_filtertofastq' include { SEQKIT_FQ2FA } from '../../modules/nf-core/seqkit/fq2fa' -include { BBMAP_FILTERBYNAME } from '../../modules/nf-core/bbmap/filterbyname' +include { BBMAP_FILTERBYNAME } from '../../modules/nf-core/bbmap/filterbyname' workflow FILTER_PACBIO { @@ -17,55 +17,32 @@ workflow FILTER_PACBIO { reads // channel: [ val(meta), /path/to/datafile ] db // channel: /path/to/vector_db - main: ch_versions = Channel.empty() - - // Check file types and branch + // Convert from PacBio CRAM to Samtools BAM reads - | branch { - meta, reads -> - fastq : reads.findAll { it.getName().toLowerCase() =~ /.*f.*\.gz/ } - bam : true - } - | set { ch_reads } - - - // Convert from PacBio BAM to Samtools BAM - ch_reads.bam - | map { meta, bam -> [ meta, bam, [] ] } + | map { meta, cram -> [ meta, cram, [] ] } | set { ch_pacbio } SAMTOOLS_CONVERT ( ch_pacbio, [ [], [] ], [] ) - ch_versions = ch_versions.mix ( SAMTOOLS_CONVERT.out.versions.first() ) - + ch_versions = ch_versions.mix ( SAMTOOLS_CONVERT.out.versions ) // Collate BAM file to create interleaved FASTA SAMTOOLS_COLLATETOFASTA ( SAMTOOLS_CONVERT.out.bam ) - ch_versions = ch_versions.mix ( SAMTOOLS_COLLATETOFASTA.out.versions.first() ) - - - // Convert FASTQ to FASTA using SEQKIT_FQ2FA - SEQKIT_FQ2FA ( ch_reads.fastq ) - ch_versions = ch_versions.mix ( SEQKIT_FQ2FA.out.versions.first() ) + ch_versions = ch_versions.mix ( SAMTOOLS_COLLATETOFASTA.out.versions ) - - // Combine BAM-derived FASTA with converted FASTQ inputs + // Combine BAM-derived FASTA SAMTOOLS_COLLATETOFASTA.out.fasta - | concat( SEQKIT_FQ2FA.out.fasta ) | set { ch_fasta } - // Nucleotide BLAST BLAST_BLASTN ( ch_fasta, db ) - ch_versions = ch_versions.mix ( BLAST_BLASTN.out.versions.first() ) - + ch_versions = ch_versions.mix ( BLAST_BLASTN.out.versions ) // Filter BLAST output PACBIO_FILTER ( BLAST_BLASTN.out.txt ) - ch_versions = ch_versions.mix ( PACBIO_FILTER.out.versions.first() ) - + ch_versions = ch_versions.mix ( PACBIO_FILTER.out.versions ) // Filter the input BAM and output as interleaved FASTA SAMTOOLS_CONVERT.out.bam @@ -78,28 +55,12 @@ workflow FILTER_PACBIO { | set { ch_bam_reads } SAMTOOLS_FILTERTOFASTQ ( ch_bam_reads.bams, ch_bam_reads.lists ) - ch_versions = ch_versions.mix ( SAMTOOLS_FILTERTOFASTQ.out.versions.first() ) - - - // Filter inputs provided as FASTQ and output as interleaved FASTQ - ch_reads.fastq - | join(PACBIO_FILTER.out.list) - | multiMap { meta, fastq, list -> \ - fastqs: [meta, fastq] - lists: list - } - | set { ch_reads_fastq } - - BBMAP_FILTERBYNAME ( ch_reads_fastq.fastqs, ch_reads_fastq.lists , "fastq", true) - ch_versions = ch_versions.mix ( BBMAP_FILTERBYNAME.out.versions.first() ) - + ch_versions = ch_versions.mix ( SAMTOOLS_FILTERTOFASTQ.out.versions ) // Merge filtered outputs as ch_output_fastq - BBMAP_FILTERBYNAME.out.reads - | concat ( SAMTOOLS_FILTERTOFASTQ.out.fastq ) + SAMTOOLS_FILTERTOFASTQ.out.fastq | set { ch_filtered_fastq } - emit: fastq = ch_filtered_fastq // channel: [ meta, /path/to/fastq ] versions = ch_versions // channel: [ versions.yml ]