-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy path2_generate_manifest.py
128 lines (103 loc) · 5.45 KB
/
2_generate_manifest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Ed Mountjoy
#
import gzip
import json
import os
from collections import OrderedDict
from glob import glob
import re
import pandas as pd
import yaml
def main():
# Load config
with open('/configs/config.yaml') as config_input:
config = yaml.load(config_input, Loader=yaml.FullLoader)
# Parse args
in_overlap_table = glob('/output/overlap_table/*.json.gz')[0]
out_manifest = '/configs/manifest_unfiltered.json.gz'
overlap_prop_threshold = 0.01
max_credset_threshold = None
# In path patterns (server)
sumstats = os.path.join(config['sumstats'], '{type}/{study_id}.parquet')
ukb_ld_path = os.path.join(config['ld_reference'], 'ukb_v3_chr{chrom}.downsampled10k')
custom_studies = None
if config['custom_studies']:
custom_studies = pd.read_parquet(config['custom_studies'], columns=['study_id']).study_id.unique()
# Out path patterns
data_out = '/output'
# id column names to overlap table columns mapping
id_cols_mapping = {'study': 'study_id', 'type': 'type', 'bio_feature': 'bio_feature', 'phenotype': 'phenotype_id',
'chrom': 'lead_chrom', 'pos': 'lead_pos', 'ref': 'lead_ref', 'alt': 'lead_alt'}
def construct_left_right_hive_partition_dirs(rec):
dirs = []
for side in ['left', 'right']:
for col, ocol in id_cols_mapping.items():
dirs.append(side + '_' + col + '=' + str(rec.get(side + '_' + ocol, None)))
return os.path.join(*dirs)
manifest = []
# Write manifest file
os.makedirs(os.path.dirname(out_manifest), exist_ok=True)
with gzip.open(out_manifest, 'w') as out_h:
with gzip.open(in_overlap_table, 'r') as in_h:
# Go through each overlap
for in_record in in_h:
in_record = json.loads(in_record.decode())
if custom_studies and in_record['left_study_id'] not in custom_studies and \
in_record['right_study_id'] not in custom_studies:
continue
out_record = OrderedDict()
# Skip if proportion_overlap < prop_threshold
if overlap_prop_threshold:
max_overlap_prop = max(in_record['left_overlap_prop'],
in_record['right_overlap_prop'])
if max_overlap_prop < overlap_prop_threshold:
continue
# Skip if the biggest credible has > max_credset_threshold variants
if max_credset_threshold:
max_credset_size = max(in_record['left_num_tags'],
in_record['right_num_tags'])
if max_credset_size > max_credset_threshold:
continue
# Add information for left/right
for side in ['left', 'right']:
study_id = in_record['{}_study_id'.format(side)]
# Add file information
study_type = 'gwas' if in_record['{}_type'.format(side)] == 'gwas' else 'molecular_trait'
out_record['{}_sumstats'.format(side)] = sumstats.format(type=study_type, study_id=study_id)
# If FinnGen, then don't specify LD, as we won't do conditioning
ld_path = ukb_ld_path.format(chrom=in_record['{}_lead_chrom'.format(side)])
if re.match('FINNGEN', study_id):
ld_path = None
out_record['{}_ld'.format(side)] = ld_path
for i in id_cols_mapping.values():
out_record['{}_{}'.format(side, i)] = in_record.get('{}_{}'.format(side, i), None)
# Add method (always conditional for now)
out_record['method'] = 'conditional'
left_right_hive_partition_dirs = construct_left_right_hive_partition_dirs(in_record)
out_record['out'] = os.path.join(data_out, 'data', left_right_hive_partition_dirs, 'coloc_res.csv')
out_record['log'] = os.path.join(data_out, 'logs', 'coloc', left_right_hive_partition_dirs, 'log_file.txt')
out_record['tmpdir'] = os.path.join(data_out, 'tmp', left_right_hive_partition_dirs)
out_record['plot'] = os.path.join(data_out, 'plot', left_right_hive_partition_dirs, 'plot.png')
# Make all paths absolute
for colname in ['left_sumstats', 'left_ld', 'right_sumstats', 'right_ld',
'out', 'log', 'tmpdir', 'plot']:
if out_record[colname] is not None:
out_record[colname] = os.path.abspath(out_record[colname])
# Check that all input paths exist
for colname in ['left_sumstats', 'left_ld', 'right_sumstats', 'right_ld']:
# Get path
in_path = out_record[colname]
if in_path is not None:
# If plink prefix, add .bed suffix
if colname == 'left_ld' or colname == 'right_ld':
in_path = in_path + '.bed'
# Assert exists
assert os.path.exists(in_path), \
"Input file not found ({}): {}".format(colname, in_path)
out_h.write((json.dumps(out_record) + '\n').encode())
return 0
if __name__ == '__main__':
main()