diff --git a/narps_open/pipelines/__init__.py b/narps_open/pipelines/__init__.py index e9bf1f0b..2d67eeee 100644 --- a/narps_open/pipelines/__init__.py +++ b/narps_open/pipelines/__init__.py @@ -35,7 +35,7 @@ '6VV2': None, '80GC': None, '94GU': None, - '98BT': None, + '98BT': 'PipelineTeam98BT', '9Q6R': None, '9T8E': None, '9U7M': None, diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index 653593fc..3cc0675f 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -1,788 +1,926 @@ -# pylint: skip-file -from nipype.interfaces.spm import (Coregister, Smooth, OneSampleTTestDesign, EstimateModel, EstimateContrast, - Level1Design, TwoSampleTTestDesign, RealignUnwarp, NewSegment, SliceTiming, - DARTEL, DARTELNorm2MNI, FieldMap) -from nipype.interfaces.spm import Threshold -from nipype.interfaces.fsl import ApplyMask, ExtractROI - -import niflow.nipype1.workflows.fmri.spm as spm_wf # spm -from nipype.algorithms.modelgen import SpecifySPMModel -from nipype.interfaces.utility import IdentityInterface, Function, Rename +#!/usr/bin/python +# coding: utf-8 + +""" Write the work of NARPS team 98BT using Nipype """ +from os.path import join +from itertools import product + +from nipype import Workflow, Node, MapNode, JoinNode +from nipype.interfaces.utility import IdentityInterface, Function, Rename, Merge from nipype.interfaces.io import SelectFiles, DataSink from nipype.algorithms.misc import Gunzip -from nipype import Workflow, Node, MapNode, JoinNode -from nipype.interfaces.base import Bunch - -from os.path import join as opj -import os -import json - -def get_dartel_template_wf(exp_dir, result_dir, working_dir, output_dir, subject_list): - - - infosource_dartel = Node(IdentityInterface(fields=['subject_id']), name="infosource_dartel") - infosource_dartel.iterables = ('subject_id', subject_list) - - # Templates to select files node - anat_file = opj('sub-{subject_id}', 'anat', - 'sub-{subject_id}_T1w.nii.gz') - - template = {'anat' : anat_file} - - # SelectFiles node - to select necessary files - selectfiles_dartel = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles_dartel') - - # GUNZIP NODE : SPM do not use .nii.gz files - gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') - - def get_dartel_input(structural_files): - print(structural_files) - return structural_files - - dartel_input = JoinNode(Function(input_names = ['structural_files'], - output_names = ['structural_files'], - function = get_dartel_input), name = 'dartel_input', - joinsource="infosource_dartel", joinfield="structural_files") - - rename_dartel = MapNode(Rename(format_string="subject_id_%(subject_id)s_struct"), - iterfield=['in_file', 'subject_id'], - name='rename_dartel') - - rename_dartel.inputs.subject_id = subject_list - rename_dartel.inputs.keep_ext = True - - dartel_workflow = spm_wf.create_DARTEL_template(name='dartel_workflow') - dartel_workflow.inputs.inputspec.template_prefix = "template" - - # DataSink Node - store the wanted results in the wanted repository - datasink_dartel = Node(DataSink(base_directory=result_dir, container=output_dir), - name='datasink_dartel') - - dartel = Workflow(base_dir = opj(result_dir, working_dir), name = "dartel") - - dartel.connect([(infosource_dartel, selectfiles_dartel, [('subject_id', 'subject_id')]), - (selectfiles_dartel, gunzip_anat, [('anat', 'in_file')]), - (gunzip_anat, dartel_input, [('out_file', 'structural_files')]), - (dartel_input, rename_dartel, [('structural_files', 'in_file')]), - (rename_dartel, dartel_workflow, [('out_file', 'inputspec.structural_files')]), - (dartel_workflow, datasink_dartel, - [('outputspec.template_file', 'dartel_template.@template_file'), - ('outputspec.flow_fields', 'dartel_template.@flow_fields')])]) - - return dartel - - -def get_fieldmap_infos(info_fmap, magnitude): - """ - Function to get information necessary to compute the fieldmap. - - Parameters: - - info_fmap: str, file with fieldmap information - - magnitude: list of str, list of magnitude files - - Returns: - - TE: float, echo time obtained from fieldmap information file - - magnitude_file: str, necessary file to compute fieldmap - """ - import json - - with open(info_fmap, 'rt') as fp: - fmap_info = json.load(fp) - - short_TE = min(float(fmap_info['EchoTime1']), float(fmap_info['EchoTime2'])) - long_TE = max(float(fmap_info['EchoTime1']), float(fmap_info['EchoTime2'])) - if short_TE == float(fmap_info['EchoTime1']): - magnitude_file = magnitude[0] - elif short_TE == float(fmap_info['EchoTime2']): - magnitude_file = magnitude[1] - - TE = (short_TE, long_TE) - - return TE, magnitude_file - -def rm_field_files(files, subject_id, run_id, result_dir, working_dir): - import shutil - from os.path import join as opj - - preproc_dir = opj(result_dir, working_dir, 'preprocessing', f"run_id_{run_id}_subject_id_{subject_id}") - - dir_to_rm = ['gunzip_func', 'gunzip_phasediff', 'fieldmap_infos', 'gunzip_magnitude', 'fieldmap', - 'slice_timing'] - for dirs in dir_to_rm: - try: - shutil.rmtree(opj(preproc_dir, dirs)) - except OSError as e: - print(e) - else: - print(f"The directory {dirs} is deleted successfully") - - return files - -def get_preprocessing(exp_dir, result_dir, working_dir, output_dir, subject_list, run_list, fwhm, N, ST, TA, TR, total_readout_time): - - infosource = Node(IdentityInterface(fields=['subject_id', 'run_id']), name="infosource") - - infosource.iterables = [('subject_id', subject_list), ('run_id', run_list)] - - # Templates to select files node - anat_file = opj('sub-{subject_id}', 'anat', - 'sub-{subject_id}_T1w.nii.gz') - - func_file = opj('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz') - - magnitude_file = opj('sub-{subject_id}', 'fmap', 'sub-{subject_id}_magnitude*.nii.gz') - - phasediff_file = opj('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.nii.gz') - - info_fieldmap_file = opj('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.json') - - dartel_flowfields_file = opj(result_dir, output_dir, 'dartel_template', - 'u_rc1subject_id_{subject_id}_struct_template.nii') - - dartel_template_file = opj(result_dir, output_dir, 'dartel_template', 'template_6.nii') - - template = {'anat' : anat_file, 'func' : func_file, 'magnitude' : magnitude_file, 'phasediff' : phasediff_file, - 'info_fmap' : info_fieldmap_file, 'dartel_template' : dartel_template_file, - 'dartel_flow_field' : dartel_flowfields_file} - - # SelectFiles node - to select necessary files - selectfiles_preproc = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles_preproc') - - # GUNZIP NODE : SPM do not use .nii.gz files - gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') - - gunzip_func = Node(Gunzip(), name = 'gunzip_func') - - gunzip_magnitude = Node(Gunzip(), name = 'gunzip_magnitude') - - gunzip_phasediff = Node(Gunzip(), name = 'gunzip_phasediff') - - - fieldmap_infos = Node(Function(input_names = ['info_fmap', 'magnitude'], - output_names = ['TE', 'magnitude_file'], - function = get_fieldmap_infos), name = 'fieldmap_infos') - - fieldmap = Node(FieldMap(blip_direction = -1), name = 'fieldmap') - - fieldmap.inputs.total_readout_time = total_readout_time - - """ - - **Segmentation** : SPM Segment function via custom scripts (defaults left in place) - """ - - tissue1 = [('/opt/spm12-r7771/spm12_mcr/spm12/tpm/TPM.nii', 1), 1, (True,False), (True, False)] - tissue2 = [('/opt/spm12-r7771/spm12_mcr/spm12/tpm/TPM.nii', 2), 1, (True,False), (True, False)] - tissue3 = [('/opt/spm12-r7771/spm12_mcr/spm12/tpm/TPM.nii', 3), 2, (True,False), (False, False)] - tissue4 = [('/opt/spm12-r7771/spm12_mcr/spm12/tpm/TPM.nii', 4), 3, (True,False), (False, False)] - tissue5 = [('/opt/spm12-r7771/spm12_mcr/spm12/tpm/TPM.nii', 5), 4, (True,False), (False, False)] - tissue6 = [('/opt/spm12-r7771/spm12_mcr/spm12/tpm/TPM.nii', 6), 2, (False,False), (False, False)] - - tissue_list = [tissue1, tissue2, tissue3, tissue4, tissue5, tissue6] - - segmentation = Node(NewSegment(write_deformation_fields = [True, True], tissues = tissue_list, - channel_info = (0.0001, 60, (True, True))), name = "segmentation") - - """ - - **Slice timing** : SPM slice time correction with default parameters - (shift via sinc interpolation and Fourier transform), with ref =2. Slice time correction performed - as first step of preprocessing, prior to motion correction. - """ - - slice_timing = Node(SliceTiming(num_slices = N, ref_slice = 2, slice_order = ST, - time_acquisition = TA, time_repetition = TR), name = "slice_timing") - - #in_files = func - """ - - **Motion correction** : SPM realign and unwarp, incorporating field maps into motion correction for unwarping. - Most parameters as SPM default, but 4th degree bslines interpolation was used for estimation. - Images were rereferenced to the first scan in each run. - """ - - motion_correction = Node(RealignUnwarp(interp = 4), name = "motion_correction") - - #in_files = timecorrected_files - - """ - - **Intrasubject coregistration** : Each EPI scan was coregistered to the structural scan via SPM's coreg - function, using normalized mutual information. - """ - extract_first = Node(ExtractROI(t_min = 1, t_size=1, output_type = 'NIFTI'), name = 'extract_first') - - coregistration = Node(Coregister(cost_function = 'nmi', jobtype = 'estimate'), name = "coregistration") - - #target = anat - #source = realigned_unwarped_files - - dartel_norm_func = Node(DARTELNorm2MNI(fwhm = fwhm, modulate = False, voxel_size = (2.3, 2.3, 2.15)), name = "dartel_norm_func") - #apply_to_files = coregistered_source - - dartel_norm_anat = Node(DARTELNorm2MNI(fwhm = fwhm, voxel_size = (1, 1, 1)), name = "dartel_norm_anat") - - remove_field_files = Node(Function(input_names = ['files', 'subject_id', 'run_id', 'result_dir', 'working_dir'], - output_names = ['files'], - function = rm_field_files), name = 'remove_field_files') - - remove_field_files.inputs.result_dir = result_dir - remove_field_files.inputs.working_dir = working_dir - - # DataSink Node - store the wanted results in the wanted repository - datasink_preproc = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink_preproc') - - #dartel = get_dartel_template_wf(exp_dir, result_dir, working_dir, output_dir, subject_list) - - preprocessing = Workflow(base_dir = opj(result_dir, working_dir), name = "preprocessing") - - preprocessing.connect([(infosource, selectfiles_preproc, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (infosource, remove_field_files, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (selectfiles_preproc, gunzip_anat, [('anat', 'in_file')]), - (selectfiles_preproc, gunzip_func, [('func', 'in_file')]), - (selectfiles_preproc, gunzip_phasediff, [('phasediff', 'in_file')]), - (selectfiles_preproc, fieldmap_infos, [('info_fmap', 'info_fmap'), - ('magnitude', 'magnitude')]), - (fieldmap_infos, gunzip_magnitude, [('magnitude_file', 'in_file')]), - (fieldmap_infos, fieldmap, [('TE', 'echo_times')]), - (gunzip_magnitude, fieldmap, [('out_file', 'magnitude_file')]), - (gunzip_phasediff, fieldmap, [('out_file', 'phase_file')]), - (gunzip_func, fieldmap, [('out_file', 'epi_file')]), - (fieldmap, motion_correction, [('vdm', 'phase_map')]), - (gunzip_anat, segmentation, [('out_file', 'channel_files')]), - (gunzip_func, slice_timing, [('out_file', 'in_files')]), - (slice_timing, motion_correction, [('timecorrected_files', 'in_files')]), - (motion_correction, remove_field_files, [('realigned_unwarped_files', 'files')]), - (remove_field_files, coregistration, [('files', 'apply_to_files')]), - (gunzip_anat, coregistration, [('out_file', 'target')]), - (remove_field_files, extract_first, [('files', 'in_file')]), - (extract_first, coregistration, [('roi_file', 'source')]), - (selectfiles_preproc, dartel_norm_func, [('dartel_flow_field', 'flowfield_files'), - ('dartel_template', 'template_file')]), - (selectfiles_preproc, dartel_norm_anat, [('dartel_flow_field', 'flowfield_files'), - ('dartel_template', 'template_file')]), - (gunzip_anat, dartel_norm_anat, [('out_file', 'apply_to_files')]), - (coregistration, dartel_norm_func, [('coregistered_files', 'apply_to_files')]), - (dartel_norm_func, datasink_preproc, - [('normalized_files', 'preprocessing.@normalized_files')]), - #(dartel_norm_anat, datasink_preproc, - # [('normalized_files', 'preprocessing.@normalized_anat')]), - (motion_correction, datasink_preproc, - [('realigned_unwarped_files', 'preprocessing.@motion_corrected'), - ('realignment_parameters', 'preprocessing.@param')]), - (segmentation, datasink_preproc, [('normalized_class_images', 'preprocessing.@seg')]) - ]) - - return preprocessing - -def compute_parameters(parameters_files, wc2_file, motion_corrected_files, subject_id, result_dir, working_dir): - import pandas as pd - from os.path import join as opj - import os - import nibabel as nib - import numpy as np - from nilearn import image - from nilearn.image import resample_to_img, resample_img - # import warnings filter - from warnings import simplefilter - # ignore all future warnings - simplefilter(action='ignore', category=FutureWarning) - simplefilter(action='ignore', category=UserWarning) - simplefilter(action='ignore', category=RuntimeWarning) - - mean_wm = [[] for i in range(len(motion_corrected_files))] - - wc2 = nib.load(wc2_file) - wc2_mask = wc2.get_fdata() > 0.6 - wc2_mask = wc2_mask.astype(int) - - - for i, functional_file in enumerate(sorted(motion_corrected_files)): - functional = nib.load(functional_file) - - - for slices in image.iter_img(functional): - slice_img = resample_to_img(slices, wc2, interpolation='nearest', clip = True) - - slice_data = slice_img.get_fdata() - masked_slice = slice_data * wc2_mask - mean_wm[i].append(np.mean(masked_slice)) - - new_parameters_files = [] - for i, file in enumerate(sorted(parameters_files)): - df = pd.read_table(file, sep = ' ', header = None) - - df['Mean_WM'] = mean_wm[i] - - new_path =opj(result_dir, working_dir, 'parameters_file', - f"parameters_file_sub-{subject_id}_run{'0' + str(i+1)}.tsv") - if not os.path.isdir(opj(result_dir, working_dir)): - os.mkdir(opj(result_dir, working_dir)) - if not os.path.isdir(opj(result_dir, working_dir, 'parameters_file')): - os.mkdir(opj(result_dir, working_dir, 'parameters_file')) - writer = open(new_path, "w") - writer.write(df.to_csv(sep = '\t', index = False, header = False, na_rep = '0.0')) - writer.close() - - new_parameters_files.append(new_path) - - return new_parameters_files - -""" -An event related design was used. Events were entered as a single events with a 4 second duration. -Paramteric modualtors were added for each condition for gain, loss, and for accept or reject -(in that order; note that SPM performs serial orthogonalization of modulators). - -Canonical HRFs were used along with the derivative and dispersion functions. -SPM defaults were used for other analysis parameters (Volterra=1, 1st order autocorrelation, high pass filter=128). -The six motion parameters, the absolute value of the 1st derivative of the six parameters, -and the mean WM signal were entered as covariates into the model. -Mean WM signal was calculated for each individual by taking their SPM segmentation WM mask, -thresholded at 0.6, as a mask to the motion corrected BOLD data, taking the mean timeseries of -all voxels determined to be within the white matter. - -All four scans were entered into a single fixed effects model for each individual, as separate sessions. -For each individual, a contrast map was calculated for the parametric effects of gains and the parametric -effects of losses, with both positive and negative contrasts run, collapsing across runs. -Contrast beta maps for the parametric effects of losses or gains were entered into the second level analysis. -Mass univariate modelling was used. A group analysis (radom effects) model was run for the equaldifference group, -equalrange group, and the difference between groups (as separate GLMs) -The main effects of the parametric modulators for gains and losses were tested. The whole brain was used for -calculating cluster statistics. -The data was thresholded such that only clusters above p<0.05 FDR corrected were kept, and data was then masked -to answer the specific hypotheses. Cluster extent, using SPM's FDR corrected cluster extent statistic, -and a threshold of p<0.001 uncorrected. -""" - -def get_subject_infos(event_files, runs): - ''' - Create Bunchs for specifySPMModel. - - Parameters : - - event_files: list of str, list of events files (one per run) for the subject - - runs: list of str, list of runs to use - - Returns : - - subject_info : list of Bunch for 1st level analysis. - ''' - - from nipype.interfaces.base import Bunch - - cond_names = ['gamble'] - onset = {} - duration = {} - weights_gain = {} - weights_loss = {} - answers = {} - - for r in range(len(runs)): # Loop over number of runs. - onset.update({s + '_run' + str(r+1) : [] for s in cond_names}) # creates dictionary items with empty lists - duration.update({s + '_run' + str(r+1) : [] for s in cond_names}) - weights_gain.update({'gain_run' + str(r+1) : []}) - weights_loss.update({'loss_run' + str(r+1) : []}) - answers.update({'answers_run' + str(r+1) : []}) - - for r, run in enumerate(runs): - - f_events = event_files[r] - - with open(f_events, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - - for cond in cond_names: - val = cond + '_run' + str(r+1) # trial_run1 - val_gain = 'gain_run' + str(r+1) # gain_run1 - val_loss = 'loss_run' + str(r+1) # loss_run1 - val_answer = 'answers_run' + str(r+1) - onset[val].append(float(info[0])) # onsets for trial_run1 - duration[val].append(float(4)) - weights_gain[val_gain].append(float(info[2])) # weights gain for trial_run1 - weights_loss[val_loss].append(float(info[3])) # weights loss for trial_run1 - if "accept" in str(info[5]): - answers[val_answer].append(1) - else: - answers[val_answer].append(0) - - # Bunching is done per run, i.e. trial_run1, trial_run2, etc. - # But names must not have '_run1' etc because we concatenate runs - subject_info = [] - for r in range(len(runs)): - - cond = [c + '_run' + str(r+1) for c in cond_names] - gain = 'gain_run' + str(r+1) - loss = 'loss_run' + str(r+1) - answer = 'answers_run' + str(r+1) - - subject_info.insert(r, - Bunch(conditions=cond, - onsets=[onset[k] for k in cond], - durations=[duration[k] for k in cond], - amplitudes=None, - tmod=None, - pmod=[Bunch(name=[gain, loss, answer], - poly=[1, 1, 1], - param=[weights_gain[gain], - weights_loss[loss], - answers[answer]])], - regressor_names=None, - regressors=None)) - - return subject_info - -def get_contrasts(subject_id): - ''' - Create the list of tuples that represents contrasts. - Each contrast is in the form : - (Name,Stat,[list of condition names],[weights on those conditions]) - - Parameters: - - subject_id: str, ID of the subject - - Returns: - - contrasts: list of tuples, list of contrasts to analyze - ''' - runs = 4 - - gamble = [] - gain = [] - loss = [] - for ir in range(runs): - ir += 1 - gamble.append('gamble_run%i' % ir) - gain.append('gamble_run%ixgain_run%i^1' % (ir, ir)) - loss.append('gamble_run%ixloss_run%i^1' % (ir, ir)) - - pos_1 = [1] * runs - neg_1 = [-1] * runs - - - pos_gain = ( - 'pos_gain', 'T', - gain, pos_1) - - pos_loss = ( - 'pos_loss', 'T', - loss, pos_1) - - neg_gain = ( - 'neg_gain', 'T', - gain, neg_1) - - neg_loss = ( - 'neg_loss', 'T', - loss, neg_1) - - contrasts = [pos_gain, pos_loss, neg_gain, neg_loss] - - return contrasts - -def get_l1_analysis(subject_list, TR, run_list, exp_dir, result_dir, working_dir, output_dir): - """ - Returns the first level analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the analysis - - run_list: list of str, list of runs for which you want to do the analysis - - TR: float, time repetition used during acquisition - - Returns: - - l1_analysis : Nipype WorkFlow - """ - # Infosource Node - To iterate on subjects - infosource = Node(IdentityInterface(fields = ['subject_id', 'run_list'], - run_list = run_list), - name = 'infosource') - - infosource.iterables = [('subject_id', subject_list)] - - - func_file = opj(result_dir, output_dir, 'preprocessing', '_run_id_*_subject_id_{subject_id}', - 'swuasub-{subject_id}_task-MGT_run-*_bold.nii') - - motion_corrected_file = opj(result_dir, output_dir, 'preprocessing', '_run_id_*_subject_id_{subject_id}', - 'uasub-{subject_id}_task-MGT_run-*_bold.nii') - - parameter_file = opj(result_dir, output_dir, 'preprocessing', '_run_id_*_subject_id_{subject_id}', - 'rp_asub-{subject_id}_task-MGT_run-*_bold.txt') - - wc2_file = opj(result_dir, output_dir, 'preprocessing', '_run_id_01_subject_id_{subject_id}', - 'wc2sub-{subject_id}_T1w.nii') - - event_file = opj('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-*_events.tsv') - - template = {'func' : func_file, 'param' : parameter_file, 'event' : event_file, 'wc2' : wc2_file, - 'motion_correction': motion_corrected_file} - - # SelectFiles node - to select necessary files - selectfiles = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles') - - # DataSink Node - store the wanted results in the wanted repository - datasink = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink') - - - # Get Subject Info - get subject specific condition information - subject_infos = Node(Function(input_names=['event_files', 'runs'], - output_names=['subject_info'], - function=get_subject_infos), - name='subject_infos') - - # Get parameters - # compute_parameters(parameters_files, wc2_file, functional_files, subject_id, result_dir, working_dir) - - parameters = Node(Function(input_names = ['parameters_files', 'wc2_file', 'motion_corrected_files', 'subject_id', - 'result_dir', 'working_dir'], - output_names = ['new_parameters_files'], - function = compute_parameters), name = 'parameters') - - parameters.inputs.working_dir = working_dir - parameters.inputs.result_dir = result_dir - - - # SpecifyModel - Generates SPM-specific Model - specify_model = Node(SpecifySPMModel(concatenate_runs = False, input_units = 'secs', output_units = 'secs', - time_repetition = TR, high_pass_filter_cutoff = 128), name='specify_model') - - # Level1Design - Generates an SPM design matrix - l1_design = Node(Level1Design(bases = {'hrf': {'derivs': [1, 1]}}, timing_units = 'secs', - interscan_interval = TR), name='l1_design') - - # EstimateModel - estimate the parameters of the model - l1_estimate = Node(EstimateModel(estimation_method={'Classical': 1}), - name="l1_estimate") - - # Node contrasts to get contrasts - contrasts = Node(Function(function=get_contrasts, - input_names=['subject_id'], - output_names=['contrasts']), - name='contrasts') - - - # EstimateContrast - estimates contrasts - contrast_estimate = Node(EstimateContrast(), name="contrast_estimate") - - - # Create l1 analysis workflow and connect its nodes - l1_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = "l1_analysis") - - l1_analysis.connect([(infosource, selectfiles, [('subject_id', 'subject_id')]), - (infosource, subject_infos, [('run_list', 'runs')]), - (infosource, contrasts, [('subject_id', 'subject_id')]), - (infosource, parameters, [('subject_id', 'subject_id')]), - (subject_infos, specify_model, [('subject_info', 'subject_info')]), - (contrasts, contrast_estimate, [('contrasts', 'contrasts')]), - (selectfiles, subject_infos, [('event', 'event_files')]), - (selectfiles, parameters, [('motion_correction', 'motion_corrected_files'), - ('param', 'parameters_files'), - ('wc2', 'wc2_file')]), - (selectfiles, specify_model, [('func', 'functional_runs')]), - (parameters, specify_model, [('new_parameters_files', - 'realignment_parameters')]), - (specify_model, l1_design, [('session_info', 'session_info')]), - (l1_design, l1_estimate, [('spm_mat_file', 'spm_mat_file')]), - (l1_estimate, contrast_estimate, [('spm_mat_file', 'spm_mat_file'), - ('beta_images', 'beta_images'), - ('residual_image', 'residual_image')]), - (contrast_estimate, datasink, [('con_images', 'l1_analysis.@con_images'), - ('spmT_images', 'l1_analysis.@spmT_images'), - ('spm_mat_file', 'l1_analysis.@spm_mat_file')]) - ]) - - return l1_analysis - - -def get_subset_contrasts(file_list, method, subject_list, participants_file): - ''' - Parameters : - - file_list : original file list selected by selectfiles node - - subject_list : list of subject IDs that are in the wanted group for the analysis - - participants_file: str, file containing participants characteristics - - method: str, one of "equalRange", "equalIndifference" or "groupComp" - - This function return the file list containing only the files belonging to subject in the wanted group. - ''' - equalIndifference_id = [] - equalRange_id = [] - equalIndifference_files = [] - equalRange_files = [] - - with open(participants_file, 'rt') as f: - next(f) # skip the header - - for line in f: +from nipype.interfaces.spm import ( + Coregister, OneSampleTTestDesign, + EstimateModel, EstimateContrast, Level1Design, + TwoSampleTTestDesign, RealignUnwarp, NewSegment, SliceTiming, + DARTELNorm2MNI, FieldMap, Threshold + ) +from nipype.interfaces.spm.base import Info as SPMInfo +from nipype.interfaces.fsl import ExtractROI +from nipype.algorithms.modelgen import SpecifySPMModel +from niflow.nipype1.workflows.fmri.spm import create_DARTEL_template + +from narps_open.pipelines import Pipeline +from narps_open.data.task import TaskInformation +from narps_open.data.participants import get_group +from narps_open.core.common import ( + remove_parent_directory, list_intersection, elements_in_string, clean_list + ) +from narps_open.utils.configuration import Configuration + +class PipelineTeam98BT(Pipeline): + """ A class that defines the pipeline of team 98BT. """ + + def __init__(self): + super().__init__() + self.fwhm = 8.0 + self.team_id = '98BT' + self.contrast_list = ['0001', '0002', '0003', '0004'] + + # Define contrasts + gain_conditions = [f'gamble_run{r}xgain_run{r}^1' for r in range(1,len(self.run_list) + 1)] + loss_conditions = [f'gamble_run{r}xloss_run{r}^1' for r in range(1,len(self.run_list) + 1)] + self.subject_level_contrasts = [ + ('pos_gain', 'T', gain_conditions, [1, 1, 1, 1]), + ('pos_loss', 'T', loss_conditions, [1, 1, 1, 1]), + ('neg_gain', 'T', gain_conditions, [-1, -1, -1, -1]), + ('neg_loss', 'T', loss_conditions, [-1, -1, -1, -1]) + ] + + def get_dartel_template_sub_workflow(self): + """ + Create a dartel workflow, as first part of the preprocessing. + + DARTEL allows to create a study-specific template in 3D volume space. + This study template can then be used for normalizating each subject’s + scans to the MNI space. + + Returns: + - dartel : nipype.WorkFlow + """ + # Infosource Node - To iterate on subjects + information_source = Node(IdentityInterface( + fields = ['subject_id']), + name = 'information_source') + information_source.iterables = ('subject_id', self.subject_list) + + # SelectFiles Node - to select necessary files + template = { + 'anat' : join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz') + } + select_files = Node(SelectFiles(template), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + + # Gunzip node - SPM do not use .nii.gz files + gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') + + def get_dartel_input(structural_files): + print(structural_files) + return structural_files + + dartel_input = JoinNode(Function( + function = get_dartel_input, + input_names = ['structural_files'], + output_names = ['structural_files']), + name = 'dartel_input', + joinsource = 'information_source', + joinfield = 'structural_files') + + rename_dartel = MapNode(Rename(format_string = 'subject_id_%(subject_id)s_struct'), + iterfield = ['in_file', 'subject_id'], + name = 'rename_dartel') + rename_dartel.inputs.subject_id = self.subject_list + rename_dartel.inputs.keep_ext = True + + dartel_sub_workflow = create_DARTEL_template(name = 'dartel_sub_workflow') + dartel_sub_workflow.inputs.inputspec.template_prefix = 'template' + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Create dartel workflow and connect its nodes + dartel_workflow = Workflow( + base_dir = self.directories.working_dir, name = 'dartel_workflow') + dartel_workflow.connect([ + (information_source, select_files, [('subject_id', 'subject_id')]), + (select_files, gunzip_anat, [('anat', 'in_file')]), + (gunzip_anat, dartel_input, [('out_file', 'structural_files')]), + (dartel_input, rename_dartel, [('structural_files', 'in_file')]), + (rename_dartel, dartel_sub_workflow, [('out_file', 'inputspec.structural_files')]), + (dartel_sub_workflow, data_sink, [ + ('outputspec.template_file', 'dartel_template.@template_file'), + ('outputspec.flow_fields', 'dartel_template.@flow_fields')]) + ]) + + # Remove large files, if requested + if Configuration()['pipelines']['remove_unused_data']: + + # Function Nodes remove_parent_directory - Remove gunziped files + remove_gunzip = Node(Function( + function = remove_parent_directory, + input_names = ['_', 'file_name'], + output_names = [] + ), name = 'remove_gunzip') + + # Add connections + dartel_workflow.connect([ + (gunzip_anat, remove_gunzip, [('out_file', 'file_name')]), + (data_sink, remove_gunzip, [('out_file', '_')]) + ]) + + return dartel_workflow + + def get_fieldmap_info(fieldmap_info_file, magnitude_files): + """ + Function to get information necessary to compute the fieldmap. + + Parameters: + - fieldmap_info_file: str, file with fieldmap information + - magnitude_files: list of str, list of magnitude files + + Returns: + - echo_times: tuple of floats, echo time obtained from fieldmap information file + - magnitude_file: str, necessary file to compute fieldmap + """ + from json import load + + with open(fieldmap_info_file, 'rt') as file: + fieldmap_info = load(file) + + short_echo_time = min(float(fieldmap_info['EchoTime1']), float(fieldmap_info['EchoTime2'])) + long_echo_time = max(float(fieldmap_info['EchoTime1']), float(fieldmap_info['EchoTime2'])) + + if short_echo_time == float(fieldmap_info['EchoTime1']): + magnitude_file = magnitude_files[0] + elif short_echo_time == float(fieldmap_info['EchoTime2']): + magnitude_file = magnitude_files[1] + + return (short_echo_time, long_echo_time), magnitude_file + + def get_preprocessing_sub_workflow(self): + """ + Create the second part of the preprocessing workflow. + + Returns: + - preprocessing : nipype.WorkFlow + """ + # Infosource Node - To iterate on subjects + information_source = Node(IdentityInterface( + fields = ['subject_id', 'run_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), ('run_id', self.run_list) + ] + + # SelectFiles Node - Select necessary files + templates = { + 'anat' : join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz'), + 'func' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz'), + 'magnitude' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_magnitude*.nii.gz'), + 'phasediff' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.nii.gz'), + 'info_fmap' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.json'), + 'dartel_flow_field' : join(self.directories.output_dir, 'dartel_template', + 'u_rc1subject_id_{subject_id}_struct_template.nii'), + 'dartel_template' :join( + self.directories.output_dir, 'dartel_template', 'template_6.nii') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + + # Gunzip nodes - gunzip files because SPM do not use .nii.gz files + gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') + gunzip_func = Node(Gunzip(), name = 'gunzip_func') + gunzip_magnitude = Node(Gunzip(), name = 'gunzip_magnitude') + gunzip_phasediff = Node(Gunzip(), name = 'gunzip_phasediff') + + # Function Node get_fieldmap_info - + fieldmap_info = Node(Function( + function = self.get_fieldmap_info, + input_names = ['fieldmap_info_file', 'magnitude_files'], + output_names = ['echo_times', 'magnitude_file']), + name = 'fieldmap_info') + + # FieldMap Node - + fieldmap = Node(FieldMap(), name = 'fieldmap') + fieldmap.inputs.blip_direction = -1 + fieldmap.inputs.total_readout_time = TaskInformation()['TotalReadoutTime'] + + # Get SPM Tissue Probability Maps file + spm_tissues_file = join(SPMInfo.getinfo()['path'], 'tpm', 'TPM.nii') + + # Segmentation Node - SPM Segment function via custom scripts (defaults left in place) + segmentation = Node(NewSegment(), name = 'segmentation') + segmentation.inputs.write_deformation_fields = [True, True] + segmentation.inputs.channel_info = (0.0001, 60, (True, True)) + segmentation.inputs.tissues = [ + [(spm_tissues_file, 1), 1, (True,False), (True, False)], + [(spm_tissues_file, 2), 1, (True,False), (True, False)], + [(spm_tissues_file, 3), 2, (True,False), (False, False)], + [(spm_tissues_file, 4), 3, (True,False), (False, False)], + [(spm_tissues_file, 5), 4, (True,False), (False, False)], + [(spm_tissues_file, 6), 2, (False,False), (False, False)] + ] + + # Slice timing - SPM slice time correction with default parameters + slice_timing = Node(SliceTiming(), name = 'slice_timing') + slice_timing.inputs.num_slices = TaskInformation()['NumberOfSlices'] + slice_timing.inputs.ref_slice = 2 + slice_timing.inputs.slice_order = TaskInformation()['SliceTiming'] + slice_timing.inputs.time_acquisition = TaskInformation()['AcquisitionTime'] + slice_timing.inputs.time_repetition = TaskInformation()['RepetitionTime'] + + # Motion correction - SPM realign and unwarp + motion_correction = Node(RealignUnwarp(), name = 'motion_correction') + motion_correction.inputs.interp = 4 + + # Intrasubject coregistration + extract_first = Node(ExtractROI(), name = 'extract_first') + extract_first.inputs.t_min = 1 + extract_first.inputs.t_size = 1 + extract_first.inputs.output_type = 'NIFTI' + + coregistration = Node(Coregister(), name = 'coregistration') + coregistration.inputs.cost_function = 'nmi' + coregistration.inputs.jobtype = 'estimate' + + dartel_norm_func = Node(DARTELNorm2MNI(), name = 'dartel_norm_func') + dartel_norm_func.inputs.fwhm = self.fwhm + dartel_norm_func.inputs.modulate = False + dartel_norm_func.inputs.voxel_size = (2.3, 2.3, 2.15) + + dartel_norm_anat = Node(DARTELNorm2MNI(), name = 'dartel_norm_anat') + dartel_norm_anat.inputs.fwhm = self.fwhm + dartel_norm_anat.inputs.voxel_size = (1, 1, 1) + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Create preprocessing workflow and connect its nodes + preprocessing = Workflow(base_dir = self.directories.working_dir, name = 'preprocessing') + preprocessing.connect([ + (information_source, select_files, [ + ('subject_id', 'subject_id'), + ('run_id', 'run_id')]), + (select_files, gunzip_anat, [('anat', 'in_file')]), + (select_files, gunzip_func, [('func', 'in_file')]), + (select_files, gunzip_phasediff, [('phasediff', 'in_file')]), + (select_files, fieldmap_info, [ + ('info_fmap', 'fieldmap_info_file'), + ('magnitude', 'magnitude_files')]), + (fieldmap_info, gunzip_magnitude, [('magnitude_file', 'in_file')]), + (fieldmap_info, fieldmap, [('echo_times', 'echo_times')]), + (gunzip_magnitude, fieldmap, [('out_file', 'magnitude_file')]), + (gunzip_phasediff, fieldmap, [('out_file', 'phase_file')]), + (gunzip_func, fieldmap, [('out_file', 'epi_file')]), + (fieldmap, motion_correction, [('vdm', 'phase_map')]), + (gunzip_anat, segmentation, [('out_file', 'channel_files')]), + (gunzip_func, slice_timing, [('out_file', 'in_files')]), + (slice_timing, motion_correction, [('timecorrected_files', 'in_files')]), + (motion_correction, coregistration, [('realigned_unwarped_files', 'apply_to_files')]), + (gunzip_anat, coregistration, [('out_file', 'target')]), + (motion_correction, extract_first, [('realigned_unwarped_files', 'in_file')]), + (extract_first, coregistration, [('roi_file', 'source')]), + (select_files, dartel_norm_func, [ + ('dartel_flow_field', 'flowfield_files'), + ('dartel_template', 'template_file')]), + (select_files, dartel_norm_anat, [ + ('dartel_flow_field', 'flowfield_files'), + ('dartel_template', 'template_file')]), + (gunzip_anat, dartel_norm_anat, [('out_file', 'apply_to_files')]), + (coregistration, dartel_norm_func, [('coregistered_files', 'apply_to_files')]), + (dartel_norm_func, data_sink, [ + ('normalized_files', 'preprocessing.@normalized_files')]), + (motion_correction, data_sink, [ + ('realigned_unwarped_files', 'preprocessing.@motion_corrected'), + ('realignment_parameters', 'preprocessing.@param')]), + (segmentation, data_sink, [('normalized_class_images', 'preprocessing.@seg')]), + ]) + + # Remove large files, if requested + if Configuration()['pipelines']['remove_unused_data']: + + # Merge Node - Merge file names to be removed after datasink node is performed + merge_removable_files = Node(Merge(5), name = 'merge_removable_files') + merge_removable_files.inputs.ravel_inputs = True + + # Function Nodes remove_files - Remove sizeable files once they aren't needed + remove_after_datasink = MapNode(Function( + function = remove_parent_directory, + input_names = ['_', 'file_name'], + output_names = [] + ), name = 'remove_after_datasink', iterfield = 'file_name') + + # Add connections + preprocessing.connect([ + (gunzip_func, merge_removable_files, [('out_file', 'in1')]), + (gunzip_phasediff, merge_removable_files, [('out_file', 'in2')]), + (gunzip_magnitude, merge_removable_files, [('out_file', 'in3')]), + (fieldmap, merge_removable_files, [('vdm', 'in4')]), + (slice_timing, merge_removable_files, [('timecorrected_files', 'in5')]), + (merge_removable_files, remove_after_datasink, [('out', 'file_name')]), + (data_sink, remove_after_datasink, [('out_file', '_')]) + ]) + + return preprocessing + + def get_preprocessing(self): + """ + Create the full preprocessing workflow. + + Returns: a list of nipype.WorkFlow + """ + return [ + self.get_dartel_template_sub_workflow(), + self.get_preprocessing_sub_workflow() + ] + + def get_preprocessing_outputs(self): + """ Return the names of the files the preprocessing is supposed to generate. """ + + # Outputs from dartel workflow + return_list = [join(self.directories.output_dir, 'dartel_template', 'template_6.nii')] + return_list += [join(self.directories.output_dir, 'dartel_template', + f'u_rc1subject_id_{subject_id}_struct_template.nii')\ + for subject_id in self.subject_list] + + # Outputs from preprocessing + parameters = { + 'subject_id': self.subject_list, + 'run_id': self.run_list, + } + parameter_sets = product(*parameters.values()) + output_dir = join( + self.directories.output_dir, + 'preprocessing', + '_run_id_{run_id}_subject_id_{subject_id}' + ) + templates = [ + # Realignment parameters + join(output_dir, 'rp_asub-{subject_id}_task-MGT_run-{run_id}_bold.txt'), + # Realigned unwarped files + join(output_dir, 'uasub-{subject_id}_task-MGT_run-{run_id}_bold.nii'), + # Normalized_files + join(output_dir, 'swuasub-{subject_id}_task-MGT_run-{run_id}_bold.nii'), + # Normalized class images + join(output_dir, 'wc2sub-{subject_id}_T1w.nii'), + join(output_dir, 'wc1sub-{subject_id}_T1w.nii') + ] + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets for template in templates] + + return return_list + + def get_run_level_analysis(self): + """ No run level analysis has been done by team 98BT """ + return None + + def get_parameters_file( + parameters_file: str, + wc2_file: str, + func_file: str, subject_id: str, run_id: str, working_dir: str): + """ + Create a new tsv file, by adding the mean signal in white matter to other parameters + in parameters_file. + + Parameters : + - parameters_file : path to subject parameters file (i.e. one per run) + - wc2_file : path to the segmented white matter file + - func_file : path to the functional data + - run_id : run for which the analysis is made + - subject_id : subject for whom the analysis is made + - working_dir : directory where to store the output files + + Return : + - path to new file containing only desired parameters. + """ + from os import makedirs + from os.path import join + from nibabel import load + from numpy import mean + from pandas import read_table + from nilearn.image import iter_img, resample_to_img + + # Ignore all future warnings + from warnings import simplefilter + simplefilter(action = 'ignore', category = FutureWarning) + simplefilter(action = 'ignore', category = UserWarning) + simplefilter(action = 'ignore', category = RuntimeWarning) + + # Load wc2 file and create a mask out of it + wc2 = load(wc2_file) + wc2_mask = wc2.get_fdata() > 0.6 + wc2_mask = wc2_mask.astype(int) + + # Compute the mean signal in white matter, for each slice of the functional data + mean_wm = [] + for current_slice in iter_img(load(func_file)): + slice_data = resample_to_img( + current_slice, wc2, interpolation = 'nearest', clip = True).get_fdata() + # Append mean value of masked data + mean_wm.append(mean(slice_data * wc2_mask)) + + # Create new parameters file + data_frame = read_table(parameters_file, sep = ' ', header = None) + data_frame['Mean_WM'] = mean_wm + + new_parameters_file = join(working_dir, 'parameters_files', + f'parameters_file_sub-{subject_id}_run-{run_id}.tsv') + + makedirs(join(working_dir, 'parameters_files'), exist_ok = True) + + with open(new_parameters_file, 'w') as writer: + writer.write(data_frame.to_csv( + sep = '\t', index = False, header = False, na_rep = '0.0')) + + return new_parameters_file + + def get_subject_information(event_file: str, short_run_id: int): + """ + Create Bunch for specifySPMModel. + + Parameters : + - event_file: str, events file for a run of a subject + - short_run_id: str, an identifier for the run corresponding to the event_file + must be '1' for the first run, '2' for the second run, etc. + + Returns : + - subject_info : Bunch corresponding to the event file + """ + from nipype.interfaces.base import Bunch + + # Create empty lists + onsets = [] + durations = [] + weights_gain = [] + weights_loss = [] + answers = [] + + # Parse event file + with open(event_file, 'rt') as file: + next(file) # skip the header + + for line in file: info = line.strip().split() - - if info[0][-3:] in subject_list and info[1] == "equalIndifference": - equalIndifference_id.append(info[0][-3:]) - elif info[0][-3:] in subject_list and info[1] == "equalRange": - equalRange_id.append(info[0][-3:]) - - for file in file_list: - sub_id = file.split('/') - if sub_id[-2][-3:] in equalIndifference_id: - equalIndifference_files.append(file) - elif sub_id[-2][-3:] in equalRange_id: - equalRange_files.append(file) - - return equalIndifference_id, equalRange_id, equalIndifference_files, equalRange_files - - -def get_l2_analysis(subject_list, n_sub, contrast_list, method, exp_dir, result_dir, working_dir, output_dir): - """ - Returns the 2nd level of analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - contrast_list: list of str, list of contrasts to analyze - - method: one of "equalRange", "equalIndifference" or "groupComp" - - n_sub: int, number of subject - - Returns: - - l2_analysis: Nipype WorkFlow - """ - # Infosource - a function free node to iterate over the list of subject names - infosource_groupanalysis = Node(IdentityInterface(fields=['contrast_id', 'subjects'], - subjects = subject_list), - name="infosource_groupanalysis") - - infosource_groupanalysis.iterables = [('contrast_id', contrast_list)] - - # SelectFiles - contrast_file = opj(result_dir, output_dir, 'l1_analysis', '_subject_id_*', "con_00{contrast_id}.nii") - - participants_file = opj(exp_dir, 'participants.tsv') - - templates = {'contrast' : contrast_file, 'participants' : participants_file} - - selectfiles_groupanalysis = Node(SelectFiles(templates, base_directory=result_dir, force_list= True), - name="selectfiles_groupanalysis") - - # Datasink node : to save important files - datasink_groupanalysis = Node(DataSink(base_directory = result_dir, container = output_dir), - name = 'datasink_groupanalysis') - - # Node to select subset of contrasts - sub_contrasts = Node(Function(input_names = ['file_list', 'method', 'subject_list', 'participants_file'], - output_names = ['equalIndifference_id', 'equalRange_id', - 'equalIndifference_files', 'equalRange_files'], - function = get_subset_contrasts), - name = 'sub_contrasts') - - sub_contrasts.inputs.method = method - - ## Estimate model - estimate_model = Node(EstimateModel(estimation_method={'Classical':1}), name = "estimate_model") - - ## Estimate contrasts - estimate_contrast = Node(EstimateContrast(group_contrast=True), - name = "estimate_contrast") - - ## Create thresholded maps - threshold = MapNode(Threshold(use_fwe_correction=False, height_threshold = 0.01, - extent_fdr_p_threshold = 0.05, use_topo_fdr = False, force_activation = True), name = "threshold", - iterfield = ["stat_image", "contrast_index"]) - - - l2_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = f"l2_analysis_{method}_nsub_{n_sub}") - - l2_analysis.connect([(infosource_groupanalysis, selectfiles_groupanalysis, [('contrast_id', 'contrast_id')]), - (infosource_groupanalysis, sub_contrasts, [('subjects', 'subject_list')]), - (selectfiles_groupanalysis, sub_contrasts, [('contrast', 'file_list'), - ('participants', 'participants_file')]), - (estimate_model, estimate_contrast, [('spm_mat_file', 'spm_mat_file'), - ('residual_image', 'residual_image'), - ('beta_images', 'beta_images')]), - (estimate_contrast, threshold, [('spm_mat_file', 'spm_mat_file'),('spmT_images', 'stat_image')]), - (threshold, datasink_groupanalysis, [('thresholded_map', f"l2_analysis_{method}_nsub_{n_sub}.@thresh")]), - (estimate_model, datasink_groupanalysis, [('mask_image', f"l2_analysis_{method}_nsub_{n_sub}.@mask")]), - (estimate_contrast, datasink_groupanalysis, [('spm_mat_file', f"l2_analysis_{method}_nsub_{n_sub}.@spm_mat"), - ('spmT_images', f"l2_analysis_{method}_nsub_{n_sub}.@T"), - ('con_images', f"l2_analysis_{method}_nsub_{n_sub}.@con")])]) - - - if method=='equalRange' or method=='equalIndifference': - contrasts = [('Group', 'T', ['mean'], [1]), ('Group', 'T', ['mean'], [-1])] - ## Specify design matrix - one_sample_t_test_design = Node(OneSampleTTestDesign(), name = "one_sample_t_test_design") - - l2_analysis.connect([(sub_contrasts, one_sample_t_test_design, [(f"{method}_files", 'in_files')]), - (one_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')])]) - - threshold.inputs.contrast_index = [1, 2] - threshold.synchronize = True - elif method == 'groupComp': - contrasts = [('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1])] - # Node for the design matrix - two_sample_t_test_design = Node(TwoSampleTTestDesign(), - name = 'two_sample_t_test_design') - - l2_analysis.connect([(sub_contrasts, two_sample_t_test_design, [('equalRange_files', "group1_files"), - ('equalIndifference_files', 'group2_files')]), - (two_sample_t_test_design, estimate_model, [("spm_mat_file", "spm_mat_file")])]) - - threshold.inputs.contrast_index = [1] + onsets.append(float(info[0])) + durations.append(4.0) + weights_gain.append(float(info[2])) + weights_loss.append(float(info[3])) + if 'accept' in str(info[5]): + answers.append(1) + else: + answers.append(0) + + # Create Bunch + return Bunch( + conditions = [f'gamble_run{short_run_id}'], + onsets = [onsets], + durations = [durations], + amplitudes = None, + tmod = None, + pmod = [ + Bunch( + name = [ + f'gain_run{short_run_id}', + f'loss_run{short_run_id}', + f'answers_run{short_run_id}'], + poly = [1, 1, 1], + param = [weights_gain, weights_loss, answers] + ) + ], + regressor_names = None, + regressors = None + ) + + def get_subject_level_analysis(self): + """ + Create the subject level analysis workflow. + + Returns: + - subject_level_analysis : nipype.WorkFlow + """ + # Infosource Node - To iterate on subjects + information_source = Node(IdentityInterface( + fields = ['subject_id']), + name = 'information_source') + information_source.iterables = [('subject_id', self.subject_list)] + + # SelectFiles Node - to select necessary files + templates = { + 'func' : join(self.directories.output_dir, + 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'swuasub-{subject_id}_task-MGT_run-*_bold.nii'), + 'motion_correction': join(self.directories.output_dir, + 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'uasub-{subject_id}_task-MGT_run-*_bold.nii'), + 'param' : join(self.directories.output_dir, + 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'rp_asub-{subject_id}_task-MGT_run-*_bold.txt'), + 'wc2' : join(self.directories.output_dir, + 'preprocessing', '_run_id_01_subject_id_{subject_id}', + 'wc2sub-{subject_id}_T1w.nii'), + 'events' : join(self.directories.dataset_dir, + 'sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-*_events.tsv') + } + select_files = Node(SelectFiles(templates),name = 'select_files') + select_files.inputs.base_directory = self.directories.results_dir + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Get Subject Info - get subject specific condition information + subject_information = MapNode(Function( + function = self.get_subject_information, + input_names = ['event_file', 'short_run_id'], + output_names = ['subject_info']), + name = 'subject_information', iterfield = ['event_file', 'short_run_id']) + subject_information.inputs.short_run_id = list(range(1, len(self.run_list) + 1)) + + # Get parameters + parameters = MapNode(Function( + function = self.get_parameters_file, + input_names = [ + 'parameters_file', + 'wc2_file', + 'func_file', + 'subject_id', + 'run_id', + 'working_dir' + ], + output_names = ['new_parameters_file']), + iterfield = ['parameters_file', 'func_file'], + name = 'parameters') + parameters.inputs.run_id = self.run_list + parameters.inputs.working_dir = self.directories.working_dir + + # SpecifyModel - Generates SPM-specific Model + specify_model = Node(SpecifySPMModel(), name = 'specify_model') + specify_model.inputs.concatenate_runs = False + specify_model.inputs.input_units = 'secs' + specify_model.inputs.output_units = 'secs' + specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] + specify_model.inputs.high_pass_filter_cutoff = 128 + + # Level1Design - Generates an SPM design matrix + model_design = Node(Level1Design(), name = 'model_design') + model_design.inputs.bases = {'hrf': {'derivs': [1, 1]}} + model_design.inputs.timing_units = 'secs' + model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + + # EstimateModel - estimate the parameters of the model + model_estimate = Node(EstimateModel(), name = 'model_estimate') + model_estimate.inputs.estimation_method = {'Classical': 1} + + # EstimateContrast - estimates contrasts + contrast_estimate = Node(EstimateContrast(), name = 'contrast_estimate') + contrast_estimate.inputs.contrasts = self.subject_level_contrasts + + # Create l1 analysis workflow and connect its nodes + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis' + ) + subject_level_analysis.connect([ + (information_source, select_files, [('subject_id', 'subject_id')]), + (information_source, parameters, [('subject_id', 'subject_id')]), + (select_files, subject_information, [('events', 'event_file')]), + (select_files, parameters, [ + ('motion_correction', 'func_file'), + ('param', 'parameters_file'), + ('wc2', 'wc2_file')]), + (select_files, specify_model, [('func', 'functional_runs')]), + (subject_information, specify_model, [('subject_info', 'subject_info')]), + (parameters, specify_model, [ + ('new_parameters_file', 'realignment_parameters')]), + (specify_model, model_design, [('session_info', 'session_info')]), + (model_design, model_estimate, [('spm_mat_file', 'spm_mat_file')]), + (model_estimate, contrast_estimate, [ + ('spm_mat_file', 'spm_mat_file'), + ('beta_images', 'beta_images'), + ('residual_image', 'residual_image')]), + (contrast_estimate, data_sink, [ + ('con_images', 'subject_level_analysis.@con_images'), + ('spmT_images', 'subject_level_analysis.@spmT_images'), + ('spm_mat_file', 'subject_level_analysis.@spm_mat_file')]) + ]) + + return subject_level_analysis + + def get_subject_level_outputs(self): + """ Return the names of the files the subject level analysis is supposed to generate. """ + + # Contrat maps + templates = [join( + self.directories.output_dir, + 'subject_level_analysis', '_subject_id_{subject_id}', f'con_{contrast_id}.nii')\ + for contrast_id in self.contrast_list] + + # SPM.mat file + templates += [join( + self.directories.output_dir, + 'subject_level_analysis', '_subject_id_{subject_id}', 'SPM.mat')] + + # spmT maps + templates += [join( + self.directories.output_dir, + 'subject_level_analysis', '_subject_id_{subject_id}', f'spmT_{contrast_id}.nii')\ + for contrast_id in self.contrast_list] + + # Format with subject_ids + return_list = [] + for template in templates: + return_list += [template.format(subject_id = s) for s in self.subject_list] + + return return_list + + def get_group_level_analysis(self): + """ + Return all workflows for the group level analysis. + + Returns; + - a list of nipype.WorkFlow + """ + + methods = ['equalRange', 'equalIndifference', 'groupComp'] + return [self.get_group_level_analysis_sub_workflow(method) for method in methods] + + def get_group_level_analysis_sub_workflow(self, method): + """ + Returns the 2nd level of analysis workflow. + + Parameters: + - method: one of "equalRange", "equalIndifference" or "groupComp" + + Returns: + - group_level_analysis: Nipype WorkFlow + """ + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Infosource - a function free node to iterate over the list of subject names + information_source = Node( + IdentityInterface( + fields = ['contrast_id', 'subjects']), + name = 'information_source') + information_source.iterables = [('contrast_id', self.contrast_list)] + + # SelectFiles + templates = { + 'contrasts' : join(self.directories.output_dir, + 'subject_level_analysis', '_subject_id_*', 'con_{contrast_id}.nii') + } + + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.results_dir + select_files.inputs.force_list = True + + # Datasink - save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Function Node get_equal_range_subjects + # Get subjects in the equalRange group and in the subject_list + get_equal_range_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_range_subjects' + ) + get_equal_range_subjects.inputs.list_1 = get_group('equalRange') + get_equal_range_subjects.inputs.list_2 = self.subject_list + + # Function Node get_equal_indifference_subjects + # Get subjects in the equalIndifference group and in the subject_list + get_equal_indifference_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_indifference_subjects' + ) + get_equal_indifference_subjects.inputs.list_1 = get_group('equalIndifference') + get_equal_indifference_subjects.inputs.list_2 = self.subject_list + + # Function Node elements_in_string + # Get contrast files for required subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_contrasts = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_contrasts', iterfield = 'input_str' + ) + + # Estimate model + estimate_model = Node(EstimateModel(), name = 'estimate_model') + estimate_model.inputs.estimation_method = {'Classical':1} + + # Estimate contrasts + estimate_contrast = Node(EstimateContrast(), name = 'estimate_contrast') + estimate_contrast.inputs.group_contrast = True + + # Create thresholded maps + threshold = MapNode(Threshold(), + name = 'threshold', iterfield = ['stat_image', 'contrast_index']) + threshold.inputs.use_fwe_correction = False + threshold.inputs.height_threshold = 0.01 + threshold.inputs.extent_fdr_p_threshold = 0.05 + threshold.inputs.use_topo_fdr = False + threshold.inputs.force_activation = True threshold.synchronize = True - estimate_contrast.inputs.contrasts = contrasts - - return l2_analysis - -def reorganize_results(result_dir, output_dir, n_sub, team_ID): - """ - Reorganize the results to analyze them. - - Parameters: - - result_dir: str, directory where results will be stored - - output_dir: str, name of the sub-directory for final results - - n_sub: int, number of subject used for analysis - - team_ID: str, name of the team ID for which we reorganize files - """ - from os.path import join as opj - import os - import shutil - import gzip - - h1 = opj(result_dir, output_dir, f"l2_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_01') - h2 = opj(result_dir, output_dir, f"l2_analysis_equalRange_nsub_{n_sub}", '_contrast_id_01') - h3 = opj(result_dir, output_dir, f"l2_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_01') - h4 = opj(result_dir, output_dir, f"l2_analysis_equalRange_nsub_{n_sub}", '_contrast_id_01') - h5 = opj(result_dir, output_dir, f"l2_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_02') - h6 = opj(result_dir, output_dir, f"l2_analysis_equalRange_nsub_{n_sub}", '_contrast_id_02') - h7 = opj(result_dir, output_dir, f"l2_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_02') - h8 = opj(result_dir, output_dir, f"l2_analysis_equalRange_nsub_{n_sub}", '_contrast_id_02') - h9 = opj(result_dir, output_dir, f"l2_analysis_groupComp_nsub_{n_sub}", '_contrast_id_02') - - h = [h1, h2, h3, h4, h5, h6, h7, h8, h9] - - repro_unthresh = [opj(filename, "spmT_0002.nii") if i in [4, 5] else opj(filename, - "spmT_0001.nii") for i, filename in enumerate(h)] - - repro_thresh = [opj(filename, "_threshold1", - "spmT_0002_thr.nii") if i in [4, 5] else opj(filename, - "_threshold0", "spmT_0001_thr.nii") for i, filename in enumerate(h)] - - if not os.path.isdir(opj(result_dir, "NARPS-reproduction")): - os.mkdir(opj(result_dir, "NARPS-reproduction")) - - for i, filename in enumerate(repro_unthresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_unthresholded.nii") - shutil.copyfile(f_in, f_out) - - for i, filename in enumerate(repro_thresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_thresholded.nii") - shutil.copyfile(f_in, f_out) - - print(f"Results files of team {team_ID} reorganized.") \ No newline at end of file + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + group_level_analysis.connect([ + (information_source, select_files, [('contrast_id', 'contrast_id')]), + (select_files, get_contrasts, [('contrasts', 'input_str')]), + (estimate_model, estimate_contrast, [ + ('spm_mat_file', 'spm_mat_file'), + ('residual_image', 'residual_image'), + ('beta_images', 'beta_images')]), + (estimate_contrast, threshold, [ + ('spm_mat_file', 'spm_mat_file'), + ('spmT_images', 'stat_image')]), + (threshold, data_sink, [ + ('thresholded_map', f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh')]), + (estimate_model, data_sink, [ + ('mask_image', f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask')]), + (estimate_contrast, data_sink, [ + ('spm_mat_file', f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat'), + ('spmT_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@T'), + ('con_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@con')])]) + + if method in ('equalRange', 'equalIndifference'): + estimate_contrast.inputs.contrasts = [ + ('Group', 'T', ['mean'], [1]), + ('Group', 'T', ['mean'], [-1]) + ] + + threshold.inputs.contrast_index = [1, 2] + + # Specify design matrix + one_sample_t_test_design = Node(OneSampleTTestDesign(), + name = 'one_sample_t_test_design') + + group_level_analysis.connect([ + (get_contrasts, one_sample_t_test_design, [ + (('out_list', clean_list), 'in_files') + ]), + (one_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) + ]) + + if method == 'equalRange': + group_level_analysis.connect([ + (get_equal_range_subjects, get_contrasts, [('out_list', 'elements')]) + ]) + + elif method == 'equalIndifference': + group_level_analysis.connect([ + (get_equal_indifference_subjects, get_contrasts, [('out_list', 'elements')]) + ]) + + elif method == 'groupComp': + estimate_contrast.inputs.contrasts = [ + ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1]) + ] + + threshold.inputs.contrast_index = [1] + + # Function Node elements_in_string + # Get contrast files for required subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_contrasts_2 = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_contrasts_2', iterfield = 'input_str' + ) + + # Node for the design matrix + two_sample_t_test_design = Node(TwoSampleTTestDesign(), + name = 'two_sample_t_test_design') + + group_level_analysis.connect([ + (select_files, get_contrasts_2, [('contrasts', 'input_str')]), + (get_equal_range_subjects, get_contrasts, [('out_list', 'elements')]), + (get_equal_indifference_subjects, get_contrasts_2, [('out_list', 'elements')]), + (get_contrasts, two_sample_t_test_design, [ + (('out_list', clean_list), 'group1_files') + ]), + (get_contrasts_2, two_sample_t_test_design, [ + (('out_list', clean_list), 'group2_files') + ]), + (two_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) + ]) + + return group_level_analysis + + def get_group_level_outputs(self): + """ Return all names for the files the group level analysis is supposed to generate. """ + + # Handle equalRange and equalIndifference + parameters = { + 'contrast_id': self.contrast_list, + 'method': ['equalRange', 'equalIndifference'], + 'file': [ + 'con_0001.nii', 'con_0002.nii', 'mask.nii', 'SPM.mat', + 'spmT_0001.nii', 'spmT_0002.nii', + join('_threshold0', 'spmT_0001_thr.nii'), join('_threshold1', 'spmT_0002_thr.nii') + ], + 'nb_subjects' : [str(len(self.subject_list))] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_{nb_subjects}', + '_contrast_id_{contrast_id}', + '{file}' + ) + + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + # Handle groupComp + parameters = { + 'contrast_id': self.contrast_list, + 'method': ['groupComp'], + 'file': [ + 'con_0001.nii', 'mask.nii', 'SPM.mat', 'spmT_0001.nii', + join('_threshold0', 'spmT_0001_thr.nii') + ], + 'nb_subjects' : [str(len(self.subject_list))] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_{nb_subjects}', + '_contrast_id_{contrast_id}', + '{file}' + ) + + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_hypotheses_outputs(self): + """ Return all hypotheses output file names. """ + nb_sub = len(self.subject_list) + files = [ + # Hypothesis 1 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001', 'spmT_0001.nii'), + # Hypothesis 2 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001', 'spmT_0001.nii'), + # Hypothesis 3 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001', 'spmT_0001.nii'), + # Hypothesis 4 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001', 'spmT_0001.nii'), + # Hypothesis 5 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0002', '_threshold0', 'spmT_0002_thr.nii'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0002', 'spmT_0002.nii'), + # Hypothesis 6 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0002', '_threshold1', 'spmT_0002_thr.nii'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0002', 'spmT_0002.nii'), + # Hypothesis 7 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0002', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0002', 'spmT_0001.nii'), + # Hypothesis 8 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0002', '_threshold1', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0002', 'spmT_0001.nii'), + # Hypothesis 9 + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_0002', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_0002', 'spmT_0001.nii') + ] + return [join(self.directories.output_dir, f) for f in files] diff --git a/setup.py b/setup.py index ec22904f..b17409b6 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,8 @@ 'networkx>=2.0,<3.0', # a workaround to nipype's bug (issue 3530) 'nilearn>=0.10.0,<0.11', 'nipype>=1.8.6,<1.9', - 'pandas>=1.5.2,<1.6' + 'pandas>=1.5.2,<1.6', + 'niflow-nipype1-workflows>=0.0.5,<0.1.0' ] extras_require = { 'tests': [ diff --git a/tests/conftest.py b/tests/conftest.py index 2223518e..f12f77a0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ from numpy import isclose from pytest import helpers, fixture from pathvalidate import is_valid_filepath +from numpy import isclose from narps_open.pipelines import Pipeline from narps_open.runner import PipelineRunner diff --git a/tests/pipelines/templates/template_test.py b/tests/pipelines/templates/template_test.py index 0c3683fd..f0c459c5 100644 --- a/tests/pipelines/templates/template_test.py +++ b/tests/pipelines/templates/template_test.py @@ -3,21 +3,21 @@ """ This template can be use to test a pipeline. - - Replace all occurrences of XXXX by the actual id of the team. + - Replace all occurrences of 2T6S by the actual id of the team. - All lines starting with [INFO], are meant to help you during the reproduction, these can be removed eventually. - Also remove lines starting with [TODO], once you did what they suggested. - Remove this docstring once you are done with coding the tests. """ -""" Tests of the 'narps_open.pipelines.team_XXXX' module. +""" Tests of the 'narps_open.pipelines.team_2T6S' module. Launch this test with PyTest Usage: ====== - pytest -q test_team_XXXX.py - pytest -q test_team_XXXX.py -k + pytest -q test_team_2T6S.py + pytest -q test_team_2T6S.py -k """ # [INFO] About these imports : @@ -28,12 +28,12 @@ # [INFO] Only for type testing from nipype import Workflow -# [INFO] Of course, import the class you want to test, here the Pipeline class for the team XXXX -from narps_open.pipelines.team_XXXX import PipelineTeamXXXX +# [INFO] Of course, import the class you want to test, here the Pipeline class for the team 2T6S +from narps_open.pipelines.team_2T6S import PipelineTeam2T6S # [INFO] All tests should be contained in the following class, in order to sort them. -class TestPipelinesTeamXXXX: - """ A class that contains all the unit tests for the PipelineTeamXXXX class.""" +class TestPipelinesTeam2T6S: + """ A class that contains all the unit tests for the PipelineTeam2T6S class.""" # [TODO] Write one or several unit_test (and mark them as such) # [TODO] ideally for each method of the class you test. @@ -42,19 +42,19 @@ class TestPipelinesTeamXXXX: @staticmethod @mark.unit_test def test_create(): - """ Test the creation of a PipelineTeamXXXX object """ + """ Test the creation of a PipelineTeam2T6S object """ - pipeline = PipelineTeamXXXX() + pipeline = PipelineTeam2T6S() assert pipeline.fwhm == 8.0 - assert pipeline.team_id == 'XXXX' + assert pipeline.team_id == '2T6S' # [INFO] Here is one example for the methods returning workflows @staticmethod @mark.unit_test def test_workflows(): - """ Test the workflows of a PipelineTeamXXXX object """ + """ Test the workflows of a PipelineTeam2T6S object """ - pipeline = PipelineTeamXXXX() + pipeline = PipelineTeam2T6S() assert pipeline.get_preprocessing() is None assert pipeline.get_run_level_analysis() is None assert isinstance(pipeline.get_subject_level_analysis(), Workflow) @@ -68,24 +68,16 @@ def test_workflows(): @staticmethod @mark.unit_test def test_outputs(): - """ Test the expected outputs of a PipelineTeamXXXX object """ - pipeline = PipelineTeamXXXX() + """ Test the expected outputs of a PipelineTeam2T6S object """ + pipeline = PipelineTeam2T6S() # 1 - 1 subject outputs pipeline.subject_list = ['001'] - assert len(pipeline.get_preprocessing_outputs()) == 0 - assert len(pipeline.get_run_level_outputs()) == 0 - assert len(pipeline.get_subject_level_outputs()) == 7 - assert len(pipeline.get_group_level_outputs()) == 63 - assert len(pipeline.get_hypotheses_outputs()) == 18 + helpers.test_pipeline_outputs(pipeline, [0, 0, 7, 63, 18]) # 2 - 4 subjects outputs pipeline.subject_list = ['001', '002', '003', '004'] - assert len(pipeline.get_preprocessing_outputs()) == 0 - assert len(pipeline.get_run_level_outputs()) == 0 - assert len(pipeline.get_subject_level_outputs()) == 28 - assert len(pipeline.get_group_level_outputs()) == 63 - assert len(pipeline.get_hypotheses_outputs()) == 18 + helpers.test_pipeline_outputs(pipeline, [0, 0, 28, 63, 18]) # [TODO] Feel free to add other methods, e.g. to test the custom node functions of the pipeline @@ -95,8 +87,8 @@ def test_outputs(): @staticmethod @mark.pipeline_test def test_execution(): - """ Test the execution of a PipelineTeamXXXX and compare results """ + """ Test the execution of a PipelineTeam2T6S and compare results """ # [INFO] We use the `test_pipeline_evaluation` helper which is responsible for running the # [INFO] pipeline, iterating over subjects and comparing output with expected results. - helpers.test_pipeline_evaluation('XXXX') + helpers.test_pipeline_evaluation('2T6S') diff --git a/tests/pipelines/test_team_98BT.py b/tests/pipelines/test_team_98BT.py new file mode 100644 index 00000000..6e2a0afb --- /dev/null +++ b/tests/pipelines/test_team_98BT.py @@ -0,0 +1,153 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_98BT' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_98BT.py + pytest -q test_team_98BT.py -k +""" +from os.path import join, exists +from filecmp import cmp + +from pytest import helpers, mark +from nipype import Workflow +from nipype.interfaces.base import Bunch + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines.team_98BT import PipelineTeam98BT + +TEMPORARY_DIR = join(Configuration()['directories']['test_runs'], 'test_98BT') + +class TestPipelinesTeam98BT: + """ A class that contains all the unit tests for the PipelineTeam98BT class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeam98BT object """ + + pipeline = PipelineTeam98BT() + + # 1 - check the parameters + assert pipeline.fwhm == 8.0 + assert pipeline.team_id == '98BT' + + # 2 - check workflows + processing = pipeline.get_preprocessing() + assert len(processing) == 2 + for sub_workflow in processing: + assert isinstance(sub_workflow, Workflow) + + assert pipeline.get_run_level_analysis() is None + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + + group_level = pipeline.get_group_level_analysis() + assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeam98BT object """ + pipeline = PipelineTeam98BT() + # 1 - 1 subject outputs + pipeline.subject_list = ['001'] + helpers.test_pipeline_outputs(pipeline, [1 + 1*1 + 4*1*5,0,9,84,18]) + + # 2 - 4 subjects outputs + pipeline.subject_list = ['001', '002', '003', '004'] + helpers.test_pipeline_outputs(pipeline, [1 + 4*1 + 4*4*5,0,36,84,18]) + + @staticmethod + @mark.unit_test + def test_fieldmap_info(): + """ Test the get_fieldmap_info method """ + + filedmap_file_1 = join( + Configuration()['directories']['test_data'], 'pipelines', 'phasediff_1.json') + filedmap_file_2 = join( + Configuration()['directories']['test_data'], 'pipelines', 'phasediff_2.json') + + test_result = PipelineTeam98BT.get_fieldmap_info( + filedmap_file_1, ['magnitude_1', 'magnitude_2']) + assert test_result[0] == (0.00492, 0.00738) + assert test_result[1] == 'magnitude_1' + test_result = PipelineTeam98BT.get_fieldmap_info( + filedmap_file_2, ['magnitude_1', 'magnitude_2']) + assert test_result[0] == (0.00492, 0.00738) + assert test_result[1] == 'magnitude_2' + + @staticmethod + @mark.unit_test + @mark.parametrize('remove_test_dir', TEMPORARY_DIR) + def test_parameters_files(remove_test_dir): + """ Test the get_parameters_files method + For this test, we created the two following files by downsampling output files + from the preprocessing pipeline : + - wc2sub-001_T1w-32.nii (white matter file) + - uasub-001_task-MGT_run-01_bold_resampled-32.nii (motion corrected file) + Voxel dimension was multiplied by 32, number of slices was reduced to 4. + """ + parameters_file = join( + Configuration()['directories']['test_data'], 'pipelines', 'confounds.tsv') + func_file = join(Configuration()['directories']['test_data'], 'pipelines', + 'team_98BT', 'uasub-001_task-MGT_run-01_bold_resampled-32.nii') + wc2_file = join(Configuration()['directories']['test_data'], 'pipelines', + 'team_98BT', 'wc2sub-001_T1w-32.nii') + reference_file = join( + Configuration()['directories']['test_data'], 'pipelines', + 'team_98BT', 'parameters_file.tsv') + + # Get new parameters file + PipelineTeam98BT.get_parameters_file( + parameters_file, wc2_file, func_file, 'sid', 'rid', TEMPORARY_DIR) + + # Check parameters file was created + created_parameters_file = join( + TEMPORARY_DIR, 'parameters_files', 'parameters_file_sub-sid_run-rid.tsv') + assert exists(created_parameters_file) + + # Check contents + assert cmp(reference_file, created_parameters_file) + + @staticmethod + @mark.unit_test + def test_subject_information(): + """ Test the get_subject_information method """ + + # Get test files + test_file = join(Configuration()['directories']['test_data'], 'pipelines', 'events.tsv') + + bunch = PipelineTeam98BT.get_subject_information(test_file, 1) + + # Compare bunches to expected + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['gamble_run1'] + helpers.compare_float_2d_arrays(bunch.onsets, [ + [4.071, 11.834, 19.535, 27.535, 36.435]]) + helpers.compare_float_2d_arrays(bunch.durations, [ + [4.0, 4.0, 4.0, 4.0, 4.0]]) + assert bunch.amplitudes is None + assert bunch.tmod is None + assert bunch.regressor_names is None + assert bunch.regressors is None + pmod = bunch.pmod[0] + assert isinstance(pmod, Bunch) + assert pmod.name == ['gain_run1', 'loss_run1', 'answers_run1'] + assert pmod.poly == [1, 1, 1] + helpers.compare_float_2d_arrays(pmod.param, [ + [14.0, 34.0, 38.0, 10.0, 16.0], + [6.0, 14.0, 19.0, 15.0, 17.0], + [1, 1, 0, 0, 0] + ]) + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeam98BT and compare results """ + helpers.test_pipeline_evaluation('98BT') diff --git a/tests/test_data/pipelines/phasediff_1.json b/tests/test_data/pipelines/phasediff_1.json new file mode 100644 index 00000000..625980c6 --- /dev/null +++ b/tests/test_data/pipelines/phasediff_1.json @@ -0,0 +1 @@ +{"EchoTime1": 0.00492, "EchoTime2": 0.00738, "IntendedFor": ["func/sub-001_task-MGT_run-01_bold.nii.gz", "func/sub-001_task-MGT_run-02_bold.nii.gz", "func/sub-001_task-MGT_run-03_bold.nii.gz", "func/sub-001_task-MGT_run-04_bold.nii.gz"]} \ No newline at end of file diff --git a/tests/test_data/pipelines/phasediff_2.json b/tests/test_data/pipelines/phasediff_2.json new file mode 100644 index 00000000..d7f3c9d0 --- /dev/null +++ b/tests/test_data/pipelines/phasediff_2.json @@ -0,0 +1 @@ +{"EchoTime1": 0.00738, "EchoTime2": 0.00492, "IntendedFor": ["func/sub-001_task-MGT_run-01_bold.nii.gz", "func/sub-001_task-MGT_run-02_bold.nii.gz", "func/sub-001_task-MGT_run-03_bold.nii.gz", "func/sub-001_task-MGT_run-04_bold.nii.gz"]} \ No newline at end of file diff --git a/tests/test_data/pipelines/team_98BT/parameters_file.tsv b/tests/test_data/pipelines/team_98BT/parameters_file.tsv new file mode 100644 index 00000000..515041a1 --- /dev/null +++ b/tests/test_data/pipelines/team_98BT/parameters_file.tsv @@ -0,0 +1,4 @@ +"CSF WhiteMatter GlobalSignal stdDVARS non-stdDVARS vx-wisestdDVARS FramewiseDisplacement tCompCor00 tCompCor01 tCompCor02 tCompCor03 tCompCor04 tCompCor05 aCompCor00 aCompCor01 aCompCor02 aCompCor03 aCompCor04 aCompCor05 Cosine00 Cosine01 Cosine02 Cosine03 Cosine04 Cosine05 NonSteadyStateOutlier00 X Y Z RotX RotY RotZ" 261.948061726888 +"6551.281999999999 6476.4653 9874.576 n/a n/a n/a n/a 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 -0.0 0.0" 241.43458658854166 +"6484.7285 6473.4890000000005 9830.212 1.09046686 52.78273392 1.05943739 0.13527900930999998 0.0263099209 -0.0673065879 0.0934882554 -0.0079328884 0.0338007737 -0.011491083999999999 -0.042411347099999996 0.027736422900000002 0.0453303087 -0.07022609490000001 0.0963618709 -0.0200867957 0.0665186088 0.0665174038 0.0665153954 0.0665125838 0.0665089688 0.06650455059999999 0.0 -0.00996895 -0.0313444 -3.00931e-06 0.00132687 -0.000384193 -0.00016819" 246.2539803059896 +"6441.5337 6485.7256 9821.212 1.07520139 52.04382706 1.03821933 0.12437666391 -0.0404820317 0.034150583 0.13661184210000002 0.0745358691 -0.0054829985999999995 -0.0217322686 0.046214115199999996 0.005774624 -0.043909359800000006 -0.075619539 0.17546891539999998 -0.0345256763 0.0665153954 0.06650455059999999 0.06648647719999999 0.0664611772 0.0664286533 0.0663889091 0.0 -2.56954e-05 -0.00923735 0.0549667 0.000997278 -0.00019745 -0.000398988" 246.21754170735676 diff --git a/tests/test_data/pipelines/team_98BT/uasub-001_task-MGT_run-01_bold_resampled-32.nii b/tests/test_data/pipelines/team_98BT/uasub-001_task-MGT_run-01_bold_resampled-32.nii new file mode 100644 index 00000000..2f1f3722 Binary files /dev/null and b/tests/test_data/pipelines/team_98BT/uasub-001_task-MGT_run-01_bold_resampled-32.nii differ diff --git a/tests/test_data/pipelines/team_98BT/wc2sub-001_T1w-32.nii b/tests/test_data/pipelines/team_98BT/wc2sub-001_T1w-32.nii new file mode 100644 index 00000000..175a81fe Binary files /dev/null and b/tests/test_data/pipelines/team_98BT/wc2sub-001_T1w-32.nii differ