Skip to content

Commit

Permalink
Update provenance format (#34)
Browse files Browse the repository at this point in the history
* Update provenance, and indenting

* Fix provenance output

* Remove versioned_outdir

* Remove versioned_outdir
  • Loading branch information
dfornika authored May 13, 2024
1 parent 786ae2a commit a2820ce
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 225 deletions.
46 changes: 46 additions & 0 deletions bin/get_kraken_db_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python3

import argparse
import json
import os

import yaml

def main(args):
db_metadata_path = os.path.join(args.db, 'metadata.json')
db_metadata = {}
if os.path.exists(db_metadata_path):
with open(db_metadata_path, 'r') as f:
db_metadata = json.load(f)

if not db_metadata:
exit()

provenance = {}
if args.provenance:
with open(args.provenance, 'r') as f:
provenance = yaml.safe_load(f)

database_provenance = {}
database_name = db_metadata.get('dbname', None)
if database_name:
database_provenance['database_name'] = database_name
database_version = db_metadata.get('version', None)
if database_version:
database_provenance['database_version'] = database_version

for provenance_record in provenance:
if provenance_record.get('process_name') == 'kraken2':
provenance_record['databases'] = []
provenance_record['databases'].append(database_provenance)

with open(args.provenance, 'w') as f:
yaml.dump(provenance, f)


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Get metadata from Kraken database')
parser.add_argument('--provenance', help='Path to provenance file')
parser.add_argument('--db', help='Path to Kraken database')
args = parser.parse_args()
main(args)
1 change: 1 addition & 0 deletions environments/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ channels:
- defaults
dependencies:
- python=3
- pyyaml=6.0.1
- fastp=0.20.1
- kraken2=2.1.2
- bracken=2.6.1
Expand Down
79 changes: 44 additions & 35 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,73 @@ include { collect_provenance } from './modules/provenance.nf'

workflow {

ch_start_time = Channel.of(LocalDateTime.now())
ch_pipeline_name = Channel.of(workflow.manifest.name)
ch_pipeline_version = Channel.of(workflow.manifest.version)

ch_pipeline_provenance = pipeline_provenance(ch_pipeline_name.combine(ch_pipeline_version).combine(ch_start_time))
ch_workflow_metadata = Channel.value([
workflow.sessionId,
workflow.runName,
workflow.manifest.name,
workflow.manifest.version,
workflow.start,
])

if (params.samplesheet_input != 'NO_FILE') {
ch_fastq = Channel.fromPath(params.samplesheet_input).splitCsv(header: true).map{ it -> [it['ID'], it['R1'], it['R2']] }
} else {
ch_fastq = Channel.fromFilePairs( params.fastq_search_path, flat: true ).map{ it -> [it[0].split('_')[0], it[1], it[2]] }.unique{ it -> it[0] }
}
if (params.samplesheet_input != 'NO_FILE') {
ch_fastq = Channel.fromPath(params.samplesheet_input).splitCsv(header: true).map{ it -> [it['ID'], it['R1'], it['R2']] }
} else {
ch_fastq = Channel.fromFilePairs( params.fastq_search_path, flat: true ).map{ it -> [it[0].split('_')[0], it[1], it[2]] }.unique{ it -> it[0] }
}

ch_kraken_db = Channel.fromPath( "${params.kraken_db}", type: 'dir')
ch_bracken_db = Channel.fromPath( "${params.bracken_db}", type: 'dir')
ch_taxonomic_levels = Channel.of(params.taxonomic_level.split(',')).unique()
ch_kraken_db = Channel.fromPath( "${params.kraken_db}", type: 'dir')
ch_bracken_db = Channel.fromPath( "${params.bracken_db}", type: 'dir')
ch_taxonomic_levels = Channel.of(params.taxonomic_level.split(',')).unique()

main:
main:
hash_files(ch_fastq.map{ it -> [it[0], [it[1], it[2]]] }.combine(Channel.of("fastq-input")))

fastp(ch_fastq)

kraken2(fastp.out.reads.combine(ch_kraken_db))

if (!params.skip_bracken) {
bracken(kraken2.out.report.combine(ch_bracken_db).combine(ch_taxonomic_levels))
abundance_top_5(bracken.out.abundances)
ch_abundances = bracken.out.abundances
bracken(kraken2.out.report.combine(ch_bracken_db).combine(ch_taxonomic_levels))
abundance_top_5(bracken.out.abundances)
ch_abundances = bracken.out.abundances
} else {
abundance_top_5_kraken(kraken2.out.report.combine(ch_taxonomic_levels))
ch_abundances = kraken_abundances(kraken2.out.report.combine(ch_taxonomic_levels))
abundance_top_5_kraken(kraken2.out.report.combine(ch_taxonomic_levels))
ch_abundances = kraken_abundances(kraken2.out.report.combine(ch_taxonomic_levels))
}

if (params.extract_reads) {
ch_to_extract = ch_abundances.map{ it -> it[1] }.splitCsv(header: true).filter{ it -> Float.parseFloat(it['fraction_total_reads']) > params.extract_reads_threshold }.map{ it -> [it['sample_id'], it['taxonomy_id']] }
extract_reads(ch_fastq.join(kraken2.out.report).combine(ch_to_extract, by: 0))
ch_to_extract = ch_abundances.map{ it -> it[1] }.splitCsv(header: true).filter{ it -> Float.parseFloat(it['fraction_total_reads']) > params.extract_reads_threshold }.map{ it -> [it['sample_id'], it['taxonomy_id']] }
extract_reads(ch_fastq.join(kraken2.out.report).combine(ch_to_extract, by: 0))
}

if (params.collect_outputs) {
fastp.out.csv.map{ it -> it[1] }.collectFile(name: params.collected_outputs_prefix + "_fastp.csv", storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1] })
if (!params.skip_bracken) {
ch_abundances.collectFile(storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1].split(',')[0] }){ it -> [params.collected_outputs_prefix + "_" + it[2] + "_bracken_abundances.csv", it[1]] }
abundance_top_5.out.collectFile(storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1] }){ it -> [params.collected_outputs_prefix + "_" + it[2] + "_bracken_abundances_top_5.csv", it[1]] }
} else {
ch_abundances.collectFile(storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1].split(',')[0] }){ it -> [params.collected_outputs_prefix + "_" + it[2] + "_kraken_abundances.csv", it[1]] }
abundance_top_5_kraken.out.collectFile(storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1] }){ it -> [params.collected_outputs_prefix + "_" + it[2] + "_kraken_abundances_top_5.csv", it[1]] }
}
fastp.out.csv.map{ it -> it[1] }.collectFile(name: params.collected_outputs_prefix + "_fastp.csv", storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1] })
if (!params.skip_bracken) {
ch_abundances.collectFile(storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1].split(',')[0] }){ it -> [params.collected_outputs_prefix + "_" + it[2] + "_bracken_abundances.csv", it[1]] }
abundance_top_5.out.collectFile(storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1] }){ it -> [params.collected_outputs_prefix + "_" + it[2] + "_bracken_abundances_top_5.csv", it[1]] }
} else {
ch_abundances.collectFile(storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1].split(',')[0] }){ it -> [params.collected_outputs_prefix + "_" + it[2] + "_kraken_abundances.csv", it[1]] }
abundance_top_5_kraken.out.collectFile(storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it.readLines()[1] }){ it -> [params.collected_outputs_prefix + "_" + it[2] + "_kraken_abundances_top_5.csv", it[1]] }
}
}

ch_provenance = fastp.out.provenance

ch_provenance = ch_provenance.join(kraken2.out.provenance).map{ it -> [it[0], [it[1], it[2]]] }
// Collect Provenance
// The basic idea is to build up a channel with the following structure:
// [sample_id, [provenance_file_1.yml, provenance_file_2.yml, provenance_file_3.yml...]]
// At each step, we add another provenance file to the list using the << operator...
// ...and then concatenate them all together in the 'collect_provenance' process.
ch_sample_ids = ch_fastq.map{ it -> it[0] }
ch_provenance = ch_sample_ids
ch_pipeline_provenance = pipeline_provenance(ch_workflow_metadata)
ch_provenance = ch_provenance.combine(ch_pipeline_provenance).map{ it -> [it[0], [it[1]]] }
ch_provenance = ch_provenance.join(hash_files.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(fastp.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(kraken2.out.provenance).map{ it -> [it[0], it[1] << it[2]] }

if (!params.skip_bracken) {
ch_provenance = ch_provenance.join(bracken.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(bracken.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
}

ch_provenance = ch_provenance.join(hash_files.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(ch_fastq.map{ it -> it[0] }.combine(ch_pipeline_provenance)).map{ it -> [it[0], it[1] << it[2]] }

collect_provenance(ch_provenance)
}
46 changes: 25 additions & 21 deletions modules/provenance.nf
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
process collect_provenance {

tag { sample_id }
tag { sample_id }

executor 'local'
executor 'local'

publishDir params.versioned_outdir ? "${params.outdir}/${sample_id}/${params.pipeline_short_name}-v${params.pipeline_minor_version}-output" : "${params.outdir}/${sample_id}", pattern: "${sample_id}_*_provenance.yml", mode: 'copy'
publishDir "${params.outdir}/${sample_id}", pattern: "${sample_id}_*_provenance.yml", mode: 'copy'

input:
tuple val(sample_id), path(provenance_files)
input:
tuple val(sample_id), path(provenance_files)

output:
tuple val(sample_id), file("${sample_id}_*_provenance.yml")
output:
tuple val(sample_id), file("${sample_id}_*_provenance.yml")

script:
"""
cat ${provenance_files} > ${sample_id}_\$(date +%Y%m%d%H%M%S)_provenance.yml
"""
script:
"""
cat ${provenance_files} > ${sample_id}_\$(date +%Y%m%d%H%M%S)_provenance.yml
"""
}

process pipeline_provenance {

tag { pipeline_name + " / " + pipeline_version }
tag { pipeline_name + " / " + pipeline_version }

executor 'local'
executor 'local'

input:
tuple val(pipeline_name), val(pipeline_version), val(analysis_start)
input:
tuple val(session_id), val(run_name), val(pipeline_name), val(pipeline_version), val(timestamp_analysis_start)

output:
file("pipeline_provenance.yml")
output:
file("pipeline_provenance.yml")

script:
"""
printf -- "- pipeline_name: ${pipeline_name}\\n pipeline_version: ${pipeline_version}\\n- timestamp_analysis_start: ${analysis_start}\\n" > pipeline_provenance.yml
"""
script:
"""
printf -- "- pipeline_name: ${pipeline_name}\\n" >> pipeline_provenance.yml
printf -- " pipeline_version: ${pipeline_version}\\n" >> pipeline_provenance.yml
printf -- " nextflow_session_id: ${session_id}\\n" >> pipeline_provenance.yml
printf -- " nextflow_run_name: ${run_name}\\n" >> pipeline_provenance.yml
printf -- " timestamp_analysis_start: ${timestamp_analysis_start}\\n" >> pipeline_provenance.yml
"""
}
Loading

0 comments on commit a2820ce

Please sign in to comment.