From 54b6658434922958f31be2238b3b6cca4e1d09d1 Mon Sep 17 00:00:00 2001 From: Xinyue Zhang Date: Tue, 9 Jan 2024 18:03:03 +0100 Subject: [PATCH] update --- ehrdata.py | 338 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 224 insertions(+), 114 deletions(-) diff --git a/ehrdata.py b/ehrdata.py index 534839c..b7f6aee 100644 --- a/ehrdata.py +++ b/ehrdata.py @@ -20,6 +20,7 @@ import missingno as msno import warnings import numbers +import os clinical_tables_columns = { @@ -129,65 +130,44 @@ def df_to_dict(df, key, value): return pd.Series(df[value].values, index=df[key]).to_dict() -def get_column_types(csv_path=None, columns=None): - column_types = {} - parse_dates = [] - if csv_path: - with open(csv_path, "r") as f: - dict_reader = csv.DictReader(f) - columns = dict_reader.fieldnames - columns_lowercase = [column.lower() for column in columns] - for i, column in enumerate(columns_lowercase): - if column.endswith( - ( - "source_value", - "reason", - "measurement_time", - "as_string", - "title", - "text", - "name", - "concept", - "code", - "domain_id", - "vocabulary_id", - "concept_class_id", - "relationship_id", - "specimen_source_id", - "production_id", - "unique_device_id", - "sig", - "lot_number", - ) - ): - column_types[columns[i]] = str - # TODO quantity in different tables have different types - elif column.endswith(("as_number", "low", "high", "quantity")): - column_types[columns[i]] = float - elif column.endswith("date"): - parse_dates.append(columns[i]) - elif column.endswith("datetime"): - parse_dates.append(columns[i]) - elif column.endswith(("id", "birth", "id_1", "id_2", "refills", "days_supply")): - column_types[columns[i]] = "Int64" - else: - raise KeyError(f"{columns[i]} is not defined in OMOP CDM") - if len(parse_dates) == 0: - parse_dates = None - return column_types, parse_dates +def check_csv_has_only_header(file_path): + if file_path.endswith('csv'): + with open(file_path, 'r') as file: + reader = csv.reader(file) + header = next(reader, None) # Read the header + if header is not None: + second_row = next(reader, None) # Try to read the next row + return second_row is None # If there's no second row, return True + else: + return False # File is empty or not a valid CSV + else: + return False + + class OMOP: - def __init__(self, file_paths): - self.base = file_paths - file_list = glob.glob(os.path.join(file_paths, "*")) + def __init__(self, folder_path, delimiter=None): + self.base = folder_path + # TODO support also parquet and other formats + file_list = glob.glob(os.path.join(folder_path, "*.csv")) + glob.glob(os.path.join(folder_path, "*.parquet")) + self.loaded_tabel = None self.filepath = {} for file_path in file_list: - file_name = file_path.split("/")[-1].removesuffix(".csv") - self.filepath[file_name] = file_path + file_name = file_path.split("/")[-1].split(".")[0] + + + new_filepath = os.path.join(self.base, file_path.split("/")[-1].lower()) + if check_csv_has_only_header(file_path): + pass + else: + # Rename the file + os.rename(file_path, new_filepath) + self.filepath[file_name] = new_filepath self.tables = list(self.filepath.keys()) + self.delimiter = delimiter """ if "concept" in self.tables: df_concept = dd.read_csv(self.filepath["concept"], usecols=vocabularies_tables_columns["concept"]) @@ -195,6 +175,121 @@ def __init__(self, file_paths): self.concept_name_to_id = dict(zip(df_concept['name'], df_concept['id'])) """ + def __repr__(self) -> str: + # TODO this should be seperated by diff table categories + def format_tables(tables, max_line_length=80): + line = "" + for table in tables: + # Check if adding the next table would exceed the max line length + if len(line) + len(table) > max_line_length: + # Yield the current line and start a new one + yield line + line = table + else: + # Add the table to the current line + line += table if line == "" else ", " + table + # Yield the last line + yield line + + tables_str = "\n".join(format_tables(self.tables)) + return f'OMOP object ({os.path.basename(self.base)}) with {len(self.tables)} tables.\nTables: {tables_str}' + + def set_path(self, table_name, file_path): + # TODO move to init + self.tables.append(table_name) + self.filepath[table_name] = file_path + + def _get_column_types(self, path=None, columns=None): + column_types = {} + parse_dates = [] + + # If not a single file, read the first one + if not os.path.isfile(path): + folder_walk = os.walk(path) + first_file_in_folder = next(folder_walk)[2][0] + path = os.path.join(path, first_file_in_folder) + + if path.endswith('csv'): + with open(path, "r") as f: + dict_reader = csv.DictReader(f, delimiter=self.delimiter) + columns = dict_reader.fieldnames + columns = list(filter(None, columns)) + elif path.endswith('parquet'): + df = dd.read_parquet(path) + columns = list(df.columns) + else: + raise TypeError("Only support CSV and Parquet file!") + columns_lowercase = [column.lower() for column in columns] + for i, column in enumerate(columns_lowercase): + if hasattr(self, "additional_column"): + if column in self.additional_column.keys(): + column_types[columns[i]] = self.additional_column[column] + + elif column.endswith( + ( + "source_value", + "reason", + "measurement_time", + "as_string", + "title", + "text", + "name", + "concept", + "code", + "domain_id", + "vocabulary_id", + "concept_class_id", + "relationship_id", + "specimen_source_id", + "production_id", + "unique_device_id", + "sig", + "lot_number", + ) + ): + column_types[columns[i]] = str + # TODO quantity in different tables have different types + elif column.endswith(("as_number", "low", "high", "quantity")): + column_types[columns[i]] = float + elif column.endswith("date"): + parse_dates.append(columns[i]) + elif column.endswith("datetime"): + parse_dates.append(columns[i]) + elif column.endswith(("id", "birth", "id_1", "id_2", "refills", "days_supply")): + column_types[columns[i]] = "Int64" + else: + raise KeyError(f"{columns[i]} is not defined in OMOP CDM") + if len(parse_dates) == 0: + parse_dates = None + return column_types, parse_dates + + def _read_table(self, path, dtype=None, parse_dates=None, index=None, usecols=None, **kwargs): + + if not os.path.isfile(path): + folder_walk = os.walk(path) + filetype = next(folder_walk)[2][0].split(".")[-1] + else: + filetype = path.split(".")[-1] + if filetype == 'csv': + if not os.path.isfile(path): + path = f"{path}/*.csv" + if usecols: + if parse_dates: + parse_dates = {key: parse_dates[key] for key in usecols if key in parse_dates} + if usecols: + dtype = {key: dtype[key] for key in usecols if key in dtype} + df = dd.read_csv(path, delimiter=self.delimiter, dtype=dtype, parse_dates=parse_dates, usecols=usecols) + elif filetype == 'parquet': + if not os.path.isfile(path): + path = f"{path}/*.parquet" + df = dd.read_parquet(path, dtype=dtype, parse_dates=parse_dates) + else: + raise TypeError("Only support CSV and Parquet file!") + + if index: + df = df.set_index(index) + return df + @property def clinical_tables(self): """ @@ -270,21 +365,20 @@ def health_economics_tables(self): table_names = ["payer_plan_period", "cost"] return [table_name for table_name in self.tables if table_name in table_names] - def load(self, level="stay_level", tables=["visit_occurrence", "person", "death"]): + def load(self, level="stay_level", tables=["visit_occurrence", "person", "death"], remove_empty_column=True): # TODO patient level and hospital level if level == "stay_level": index = {"visit_occurrence": "visit_occurrence_id", "person": "person_id", "death": "person_id"} # TODO Only support clinical_tables_columns for table in tables: - column_types, parse_dates = get_column_types(self.filepath[table]) - setattr( - self, - table, - dd.read_csv(self.filepath[table], dtype=column_types, parse_dates=parse_dates).set_index( - "person_id" - ), - ) + print(f"reading table [{table}]") + column_types, parse_dates = self._get_column_types(self.filepath[table]) + df = self._read_table(self.filepath[table], dtype=column_types, parse_dates = parse_dates, index='person_id') + if remove_empty_column: + columns = [column for column in df.columns if not df[column].compute().isna().all()] + df = df.loc[:, columns] + setattr(self, table, df) # concept_id_list = list(self.concept.concept_id) # concept_name_list = list(self.concept.concept_id) @@ -329,6 +423,12 @@ def load(self, level="stay_level", tables=["visit_occurrence", "person", "death" return adata + def add_additional_column(self, column_name, type): + if hasattr(self, "additional_column"): + self.additional_column[column_name] = type + else: + self.additional_column = {column_name: type} + def feature_statistics( self, source: Literal[ @@ -340,22 +440,21 @@ def feature_statistics( "drug_exposure", "condition_occurrence", ], - map_concept=True, number=20, - ): - column_types, parse_dates = get_column_types(self.filepath[source]) - df_source = dd.read_csv( - self.filepath[source], dtype=column_types, parse_dates=parse_dates - ) # , usecols=clinical_tables_columns[source]) - feature_counts = df_source.compute().value_counts(f"{source}_concept_id")[0:number] + key = None + ): + column_types, parse_dates = self._get_column_types(self.filepath[source]) + df_source = self._read_table(self.filepath[source], dtype=column_types, parse_dates = parse_dates, usecols=[f"{source}_concept_id"]) + feature_counts = df_source[f"{source}_concept_id"].value_counts().compute()[0:number] feature_counts = feature_counts.to_frame().reset_index(drop=False) + feature_counts[f"{source}_concept_id_1"], feature_counts[f"{source}_concept_id_2"] = self.map_concept_id( feature_counts[f"{source}_concept_id"], verbose=False ) feature_counts["feature_name"] = self.get_concept_name(feature_counts[f"{source}_concept_id_1"]) if feature_counts[f"{source}_concept_id_1"].equals(feature_counts[f"{source}_concept_id_2"]): - feature_counts.drop(f"{source}_concept_id_2", inplace=True) + feature_counts.drop(f"{source}_concept_id_2", axis=1, inplace=True) feature_counts.rename(columns={f"{source}_concept_id_1": f"{source}_concept_id"}) feature_counts = feature_counts.reindex(columns=["feature_name", f"{source}_concept_id", "count"]) else: @@ -369,45 +468,50 @@ def feature_statistics( return feature_counts def map_concept_id(self, concept_id: Union[str, List], verbose=True): - column_types, parse_dates = get_column_types(self.filepath["concept_relationship"]) - df_concept_relationship = dd.read_csv( - self.filepath["concept_relationship"], dtype=column_types, parse_dates=parse_dates - ).dropna( - subset=["concept_id_1", "concept_id_2", "relationship_id"] - ) # , usecols=vocabularies_tables_columns["concept_relationship"], - concept_relationship_dict = df_to_dict( - df=df_concept_relationship[df_concept_relationship["relationship_id"] == "Maps to"], - key="concept_id_1", - value="concept_id_2", - ) - concept_relationship_dict_reverse = df_to_dict( - df=df_concept_relationship[df_concept_relationship["relationship_id"] == "Mapped from"], - key="concept_id_1", - value="concept_id_2", - ) - if isinstance(concept_id, numbers.Integral): concept_id = [concept_id] - concept_id_1 = [] concept_id_2 = [] concept_id_mapped_not_found = [] - for id in concept_id: - try: - concept_id_2.append(concept_relationship_dict[id]) - concept_id_1.append(id) - except KeyError: + + if "concept_relationship" in self.tables: + column_types, parse_dates = self._get_column_types(self.filepath["concept_relationship"]) + df_concept_relationship = self._read_csv( + self.filepath["concept_relationship"], dtype=column_types, parse_dates=parse_dates + ) + df_concept_relationship.compute().dropna( + subset=["concept_id_1", "concept_id_2", "relationship_id"], inplace=True + ) # , usecols=vocabularies_tables_columns["concept_relationship"], + concept_relationship_dict = df_to_dict( + df=df_concept_relationship[df_concept_relationship["relationship_id"] == "Maps to"], + key="concept_id_1", + value="concept_id_2", + ) + concept_relationship_dict_reverse = df_to_dict( + df=df_concept_relationship[df_concept_relationship["relationship_id"] == "Mapped from"], + key="concept_id_1", + value="concept_id_2", + ) + for id in concept_id: try: - concept_id_1.append(concept_relationship_dict_reverse[id]) - concept_id_2.append(id) - except KeyError: + concept_id_2.append(concept_relationship_dict[id]) concept_id_1.append(id) - concept_id_2.append(id) - concept_id_mapped_not_found.append(id) - if len(concept_id_mapped_not_found) > 0: - # warnings.warn(f"Couldn't find a map for concept {id} in concept_relationship table!") - if verbose: - rprint(f"Couldn't find a map for concept {concept_id_mapped_not_found} in concept_relationship table!") + except KeyError: + try: + concept_id_1.append(concept_relationship_dict_reverse[id]) + concept_id_2.append(id) + except KeyError: + concept_id_1.append(id) + concept_id_2.append(id) + concept_id_mapped_not_found.append(id) + if len(concept_id_mapped_not_found) > 0: + # warnings.warn(f"Couldn't find a map for concept {id} in concept_relationship table!") + if verbose: + rprint(f"Couldn't find a map for concept {concept_id_mapped_not_found} in concept_relationship table!") + else: + concept_id_1 = concept_id + concept_id_2 = concept_id + if len(concept_id_1) == 1: return concept_id_1[0], concept_id_2[0] else: @@ -417,10 +521,9 @@ def get_concept_name(self, concept_id: Union[str, List], raise_error=False, verb if isinstance(concept_id, numbers.Integral): concept_id = [concept_id] - column_types, parse_dates = get_column_types(self.filepath["concept"]) - df_concept = dd.read_csv(self.filepath["concept"], dtype=column_types, parse_dates=parse_dates).dropna( - subset=["concept_id", "concept_name"] - ) # usecols=vocabularies_tables_columns["concept"] + column_types, parse_dates = self._get_column_types(self.filepath["concept"]) + df_concept = self._read_table(self.filepath["concept"], dtype=column_types, parse_dates=parse_dates) + df_concept.compute().dropna(subset=["concept_id", "concept_name"], inplace=True, ignore_index=True) # usecols=vocabularies_tables_columns["concept"] concept_dict = df_to_dict(df=df_concept, key="concept_id", value="concept_name") concept_name = [] concept_name_not_found = [] @@ -442,7 +545,7 @@ def get_concept_name(self, concept_id: Union[str, List], raise_error=False, verb return concept_name def extract_note(self, adata, source="note"): - column_types, parse_dates = get_column_types(self.filepath[source]) + column_types, parse_dates = self._get_column_types(self.filepath[source]) df_source = dd.read_csv(self.filepath[source], dtype=column_types, parse_dates=parse_dates) if columns is None: columns = df_source.columns @@ -506,13 +609,16 @@ def extract_features( # TODO load using Dask or Dask-Awkward # Load source table using dask - column_types, parse_dates = get_column_types(self.filepath[source]) - if len(parse_dates) == 1: - columns = list(column_types.keys()) + [parse_dates] + column_types, parse_dates = self._get_column_types(self.filepath[source]) + if parse_dates: + if len(parse_dates) == 1: + columns = list(column_types.keys()) + [parse_dates] + else: + columns = list(column_types.keys()) + parse_dates else: - columns = list(column_types.keys()) + parse_dates - df_source = dd.read_csv( - self.filepath[source], dtype=column_types, parse_dates=parse_dates + columns = list(column_types.keys()) + df_source = self._read_table( + self.filepath[source], dtype=column_types, #parse_dates=parse_dates ) # , usecols=clinical_tables_columns[source] if not features: @@ -521,6 +627,8 @@ def extract_features( ) features = list(df_source[key].compute().unique()) else: + if isinstance(features, str): + features = [features] rprint(f"Trying to extarct the following features: {features}") # Input could be feature names/feature id (concept id) @@ -530,8 +638,8 @@ def extract_features( # TODO support features name if "concept" in self.tables: - column_types, parse_dates = get_column_types(self.filepath["concept"]) - df_concept = dd.read_csv(self.filepath["concept"], dtype=column_types, parse_dates=parse_dates).dropna( + column_types, parse_dates = self._get_column_types(self.filepath["concept"]) + df_concept = self._read_table(self.filepath["concept"], dtype=column_types, parse_dates=parse_dates).dropna( subset=["concept_id", "concept_name"] ) # usecols=vocabularies_tables_columns["concept"], concept_dict = df_to_dict(df=df_concept, key="concept_id", value="concept_name") @@ -653,7 +761,7 @@ def extract_features( for aggregation_method in aggregation_methods: func = getattr(ak, aggregation_method) adata.obs[f"{feature_name}_{aggregation_method}"] = list( - func(adata.obsm[feature_name]["value_as_number"], axis=1) + func(adata.obsm[feature_name]["value_source_value"], axis=1) ) adata = ep.ad.move_to_x(adata, var_name_list) @@ -666,6 +774,8 @@ def extract_features( rprint(f"Couldn't find concept {fetures_not_shown_in_concept_table} in concept table!") return adata + # TODO add function to check feature and add concept + # More IO functions def to_dataframe( self,