diff --git a/config/output_schemas/estimated_apportioned_schema.toml b/config/output_schemas/estimated_apportioned_schema.toml index cbc137814..5090a8ff0 100644 --- a/config/output_schemas/estimated_apportioned_schema.toml +++ b/config/output_schemas/estimated_apportioned_schema.toml @@ -681,4 +681,3 @@ Deduced_Data_Type = "bool" [a_weight] old_name = "a_weight" Deduced_Data_Type = "float64" - diff --git a/config/output_schemas/full_estimation_qa_schema.toml b/config/output_schemas/full_estimation_qa_schema.toml index 315828e76..4242f2de4 100644 --- a/config/output_schemas/full_estimation_qa_schema.toml +++ b/config/output_schemas/full_estimation_qa_schema.toml @@ -921,4 +921,3 @@ Deduced_Data_Type = "float64" [headcount_oth_f_estimated] old_name = "headcount_oth_f_estimated" Deduced_Data_Type = "float64" - diff --git a/config/output_schemas/full_responses_imputed_schema.toml b/config/output_schemas/full_responses_imputed_schema.toml index 2d62115f7..7b64ab89a 100644 --- a/config/output_schemas/full_responses_imputed_schema.toml +++ b/config/output_schemas/full_responses_imputed_schema.toml @@ -913,4 +913,3 @@ Deduced_Data_Type = "object" [headcount_total_sf_exp_grouping] old_name = "headcount_total_sf_exp_grouping" Deduced_Data_Type = "object" - diff --git a/config/output_schemas/full_responses_mapped_schema.toml b/config/output_schemas/full_responses_mapped_schema.toml new file mode 100644 index 000000000..1cba14979 --- /dev/null +++ b/config/output_schemas/full_responses_mapped_schema.toml @@ -0,0 +1,603 @@ +[reference] +old_name = "reference" +Deduced_Data_Type = "int64" + +[instance] +old_name = "instance" +Deduced_Data_Type = "int64" + +[101] +old_name = "101" +Deduced_Data_Type = "object" + +[102] +old_name = "102" +Deduced_Data_Type = "float64" + +[103] +old_name = "103" +Deduced_Data_Type = "float64" + +[104] +old_name = "104" +Deduced_Data_Type = "float64" + +[200] +old_name = "200" +Deduced_Data_Type = "object" + +[201] +old_name = "201" +Deduced_Data_Type = "object" + +[202] +old_name = "202" +Deduced_Data_Type = "float64" + +[203] +old_name = "203" +Deduced_Data_Type = "float64" + +[204] +old_name = "204" +Deduced_Data_Type = "float64" + +[205] +old_name = "205" +Deduced_Data_Type = "float64" + +[206] +old_name = "206" +Deduced_Data_Type = "float64" + +[207] +old_name = "207" +Deduced_Data_Type = "float64" + +[209] +old_name = "209" +Deduced_Data_Type = "float64" + +[210] +old_name = "210" +Deduced_Data_Type = "float64" + +[211] +old_name = "211" +Deduced_Data_Type = "float64" + +[212] +old_name = "212" +Deduced_Data_Type = "float64" + +[214] +old_name = "214" +Deduced_Data_Type = "float64" + +[216] +old_name = "216" +Deduced_Data_Type = "float64" + +[218] +old_name = "218" +Deduced_Data_Type = "float64" + +[219] +old_name = "219" +Deduced_Data_Type = "float64" + +[220] +old_name = "220" +Deduced_Data_Type = "float64" + +[221] +old_name = "221" +Deduced_Data_Type = "float64" + +[222] +old_name = "222" +Deduced_Data_Type = "float64" + +[223] +old_name = "223" +Deduced_Data_Type = "float64" + +[225] +old_name = "225" +Deduced_Data_Type = "float64" + +[226] +old_name = "226" +Deduced_Data_Type = "float64" + +[227] +old_name = "227" +Deduced_Data_Type = "float64" + +[228] +old_name = "228" +Deduced_Data_Type = "float64" + +[229] +old_name = "229" +Deduced_Data_Type = "float64" + +[237] +old_name = "237" +Deduced_Data_Type = "float64" + +[242] +old_name = "242" +Deduced_Data_Type = "float64" + +[243] +old_name = "243" +Deduced_Data_Type = "float64" + +[244] +old_name = "244" +Deduced_Data_Type = "float64" + +[245] +old_name = "245" +Deduced_Data_Type = "float64" + +[246] +old_name = "246" +Deduced_Data_Type = "float64" + +[247] +old_name = "247" +Deduced_Data_Type = "float64" + +[248] +old_name = "248" +Deduced_Data_Type = "float64" + +[249] +old_name = "249" +Deduced_Data_Type = "float64" + +[250] +old_name = "250" +Deduced_Data_Type = "float64" + +[251] +old_name = "251" +Deduced_Data_Type = "object" + +[300] +old_name = "300" +Deduced_Data_Type = "float64" + +[301] +old_name = "301" +Deduced_Data_Type = "float64" + +[302] +old_name = "302" +Deduced_Data_Type = "float64" + +[303] +old_name = "303" +Deduced_Data_Type = "float64" + +[304] +old_name = "304" +Deduced_Data_Type = "float64" + +[305] +old_name = "305" +Deduced_Data_Type = "float64" + +[307] +old_name = "307" +Deduced_Data_Type = "float64" + +[308] +old_name = "308" +Deduced_Data_Type = "float64" + +[309] +old_name = "309" +Deduced_Data_Type = "float64" + +[405] +old_name = "405" +Deduced_Data_Type = "float64" + +[406] +old_name = "406" +Deduced_Data_Type = "float64" + +[407] +old_name = "407" +Deduced_Data_Type = "float64" + +[408] +old_name = "408" +Deduced_Data_Type = "float64" + +[409] +old_name = "409" +Deduced_Data_Type = "float64" + +[410] +old_name = "410" +Deduced_Data_Type = "float64" + +[411] +old_name = "411" +Deduced_Data_Type = "float64" + +[412] +old_name = "412" +Deduced_Data_Type = "float64" + +[501] +old_name = "501" +Deduced_Data_Type = "float64" + +[502] +old_name = "502" +Deduced_Data_Type = "float64" + +[503] +old_name = "503" +Deduced_Data_Type = "float64" + +[504] +old_name = "504" +Deduced_Data_Type = "float64" + +[505] +old_name = "505" +Deduced_Data_Type = "float64" + +[506] +old_name = "506" +Deduced_Data_Type = "float64" + +[507] +old_name = "507" +Deduced_Data_Type = "float64" + +[508] +old_name = "508" +Deduced_Data_Type = "float64" + +[601] +old_name = "601" +Deduced_Data_Type = "object" + +[602] +old_name = "602" +Deduced_Data_Type = "float64" + +[603] +old_name = "603" +Deduced_Data_Type = "float64" + +[604] +old_name = "604" +Deduced_Data_Type = "object" + +[605] +old_name = "605" +Deduced_Data_Type = "object" + +[701] +old_name = "701" +Deduced_Data_Type = "float64" + +[702] +old_name = "702" +Deduced_Data_Type = "float64" + +[703] +old_name = "703" +Deduced_Data_Type = "float64" + +[704] +old_name = "704" +Deduced_Data_Type = "float64" + +[705] +old_name = "705" +Deduced_Data_Type = "float64" + +[706] +old_name = "706" +Deduced_Data_Type = "float64" + +[707] +old_name = "707" +Deduced_Data_Type = "float64" + +[708] +old_name = "708" +Deduced_Data_Type = "object" + +[709] +old_name = "709" +Deduced_Data_Type = "float64" + +[710] +old_name = "710" +Deduced_Data_Type = "float64" + +[711] +old_name = "711" +Deduced_Data_Type = "float64" + +[712] +old_name = "712" +Deduced_Data_Type = "object" + +[713] +old_name = "713" +Deduced_Data_Type = "object" + +[714] +old_name = "714" +Deduced_Data_Type = "object" + +[9000] +old_name = "9000" +Deduced_Data_Type = "float64" + +[9001] +old_name = "9001" +Deduced_Data_Type = "float64" + +[9002] +old_name = "9002" +Deduced_Data_Type = "float64" + +[9003] +old_name = "9003" +Deduced_Data_Type = "float64" + +[9004] +old_name = "9004" +Deduced_Data_Type = "float64" + +[9005] +old_name = "9005" +Deduced_Data_Type = "float64" + +[9006] +old_name = "9006" +Deduced_Data_Type = "float64" + +[9007] +old_name = "9007" +Deduced_Data_Type = "float64" + +[9008] +old_name = "9008" +Deduced_Data_Type = "float64" + +[9009] +old_name = "9009" +Deduced_Data_Type = "float64" + +[9010] +old_name = "9010" +Deduced_Data_Type = "float64" + +[9011] +old_name = "9011" +Deduced_Data_Type = "float64" + +[9012] +old_name = "9012" +Deduced_Data_Type = "float64" + +[9013] +old_name = "9013" +Deduced_Data_Type = "float64" + +[period] +old_name = "period" +Deduced_Data_Type = "int64" + +[survey] +old_name = "survey" +Deduced_Data_Type = "int64" + +[formid] +old_name = "formid" +Deduced_Data_Type = "int64" + +[status] +old_name = "status" +Deduced_Data_Type = "object" + +[receiptdate] +old_name = "receiptdate" +Deduced_Data_Type = "object" + +[lockedby] +old_name = "lockedby" +Deduced_Data_Type = "float64" + +[lockeddate] +old_name = "lockeddate" +Deduced_Data_Type = "float64" + +[formtype] +old_name = "formtype" +Deduced_Data_Type = "int64" + +[checkletter] +old_name = "checkletter" +Deduced_Data_Type = "object" + +[frozensicoutdated] +old_name = "frozensicoutdated" +Deduced_Data_Type = "int64" + +[rusicoutdated] +old_name = "rusicoutdated" +Deduced_Data_Type = "int64" + +[frozensic] +old_name = "frozensic" +Deduced_Data_Type = "int64" + +[rusic] +old_name = "rusic" +Deduced_Data_Type = "int64" + +[frozenemployees] +old_name = "frozenemployees" +Deduced_Data_Type = "int64" + +[employees] +old_name = "employees" +Deduced_Data_Type = "int64" + +[frozenemployment] +old_name = "frozenemployment" +Deduced_Data_Type = "int64" + +[employment] +old_name = "employment" +Deduced_Data_Type = "int64" + +[frozenfteemployment] +old_name = "frozenfteemployment" +Deduced_Data_Type = "float64" + +[fteemployment] +old_name = "fteemployment" +Deduced_Data_Type = "float64" + +[frozenturnover] +old_name = "frozenturnover" +Deduced_Data_Type = "int64" + +[turnover] +old_name = "turnover" +Deduced_Data_Type = "int64" + +[enterprisereference] +old_name = "enterprisereference" +Deduced_Data_Type = "int64" + +[wowenterprisereference] +old_name = "wowenterprisereference" +Deduced_Data_Type = "int64" + +[cellnumber] +old_name = "cellnumber" +Deduced_Data_Type = "int64" + +[currency] +old_name = "currency" +Deduced_Data_Type = "object" + +[vatreference] +old_name = "vatreference" +Deduced_Data_Type = "int64" + +[payereference] +old_name = "payereference" +Deduced_Data_Type = "float64" + +[companyregistrationnumber] +old_name = "companyregistrationnumber" +Deduced_Data_Type = "int64" + +[numberlivelocalunits] +old_name = "numberlivelocalunits" +Deduced_Data_Type = "int64" + +[numberlivevat] +old_name = "numberlivevat" +Deduced_Data_Type = "int64" + +[numberlivepaye] +old_name = "numberlivepaye" +Deduced_Data_Type = "int64" + +[legalstatus] +old_name = "legalstatus" +Deduced_Data_Type = "int64" + +[reportingunitmarker] +old_name = "reportingunitmarker" +Deduced_Data_Type = "object" + +[region] +old_name = "region" +Deduced_Data_Type = "object" + +[birthdate] +old_name = "birthdate" +Deduced_Data_Type = "object" + +[referencename] +old_name = "referencename" +Deduced_Data_Type = "object" + +[referencepostcode] +old_name = "referencepostcode" +Deduced_Data_Type = "object" + +[tradingstyle] +old_name = "tradingstyle" +Deduced_Data_Type = "object" + +[selectiontype] +old_name = "selectiontype" +Deduced_Data_Type = "object" + +[inclusionexclusion] +old_name = "inclusionexclusion" +Deduced_Data_Type = "float64" + +[lastupdateddate] +old_name = "lastupdateddate" +Deduced_Data_Type = "object" + +[statusencoded] +old_name = "statusencoded" +Deduced_Data_Type = "int64" + +[postcodes_harmonised] +old_name = "postcodes_harmonised" +Deduced_Data_Type = "object" + +[pg_numeric] +old_name = "pg_numeric" +Deduced_Data_Type = "int64" + +[ultfoc] +old_name = "ultfoc" +Deduced_Data_Type = "object" + +[uni_count] +old_name = "uni_count" +Deduced_Data_Type = "int64" + +[itl] +old_name = "itl" +Deduced_Data_Type = "object" + +[ITL221CD] +old_name = "ITL221CD" +Deduced_Data_Type = "object" + +[ITL221NM] +old_name = "ITL221NM" +Deduced_Data_Type = "object" + +[ITL121CD] +old_name = "ITL121CD" +Deduced_Data_Type = "object" + +[ITL121NM] +old_name = "ITL121NM" +Deduced_Data_Type = "object" diff --git a/config/output_schemas/outliers_qa_schema.toml b/config/output_schemas/outliers_qa_schema.toml index fd556c007..f605e030c 100644 --- a/config/output_schemas/outliers_qa_schema.toml +++ b/config/output_schemas/outliers_qa_schema.toml @@ -705,4 +705,3 @@ Deduced_Data_Type = "float64" [outlier] old_name = "outlier" Deduced_Data_Type = "bool" - diff --git a/config/output_schemas/weighted_apportioned_schema.toml b/config/output_schemas/weighted_apportioned_schema.toml index cbc137814..5090a8ff0 100644 --- a/config/output_schemas/weighted_apportioned_schema.toml +++ b/config/output_schemas/weighted_apportioned_schema.toml @@ -681,4 +681,3 @@ Deduced_Data_Type = "bool" [a_weight] old_name = "a_weight" Deduced_Data_Type = "float64" - diff --git a/conftest.py b/conftest.py index 4caa21532..e656f8867 100644 --- a/conftest.py +++ b/conftest.py @@ -3,12 +3,12 @@ This file contains the test configuration for the whole test suite. Within the pytest_addoption function, the markers can be configured, ready to add -to specific tests. Any tests marked with these markers will be skipped by default -(if default=False). To call these tests with pytest, add the command line argument +to specific tests. Any tests marked with these markers will be skipped by default +(if default=False). To call these tests with pytest, add the command line argument '-m --{marker}'. To mark a test, use pytest.mark.{marker}. -When setting up a new marker, the 'pytest_collection_modifyitems' and +When setting up a new marker, the 'pytest_collection_modifyitems' and 'pytest_configure' functions must be updated accordingly. @@ -29,10 +29,7 @@ def pytest_addoption(parser): help="Run HDFS tests.", ) parser.addoption( - "--runwip", - action="store_true", - default=False, - help="Run WIP tests." + "--runwip", action="store_true", default=False, help="Run WIP tests." ) @@ -42,15 +39,11 @@ def pytest_configure(config): config.addinivalue_line("markers", "runwip: Run work in progress tests.") - def pytest_collection_modifyitems(config, items): # noqa:C901 """Handle switching based on cli args.""" # do full test suite when all flags are given - if ( - config.getoption("--runhdfs") & - config.getoption("--runwip") - ): + if config.getoption("--runhdfs") & config.getoption("--runwip"): return # do not add marks when the markers flags are passed @@ -59,7 +52,7 @@ def pytest_collection_modifyitems(config, items): # noqa:C901 for item in items: if "runhdfs" in item.keywords: item.add_marker(skip_hdfs) - + if not config.getoption("--runwip"): skip_hdfs = pytest.mark.skip(reason="Need --runwip option to run.") for item in items: diff --git a/export_main.py b/export_main.py index 678988a14..c855f5974 100644 --- a/export_main.py +++ b/export_main.py @@ -3,18 +3,23 @@ from importlib import reload -# Change to the project repository location -my_wd = os.getcwd() -my_repo = "research-and-development" -if not my_wd.endswith(my_repo): - os.chdir(my_repo) -from src.outputs import export_files +def setup_environment(): + # Change to the project repository location + my_wd = os.getcwd() + my_repo = "research-and-development" + if not my_wd.endswith(my_repo): + os.chdir(my_repo) + # Import the module after changing the directory + from src.outputs import export_files + + reload(export_files) + return export_files -reload(export_files) user_path = os.path.join("src", "user_config.yaml") dev_path = os.path.join("src", "dev_config.yaml") if __name__ == "__main__": + export_files = setup_environment() export_files.run_export(user_path, dev_path) diff --git a/helpers/create_pnp_backdata.py b/helpers/create_pnp_backdata.py index d11801c56..fbab5dbac 100644 --- a/helpers/create_pnp_backdata.py +++ b/helpers/create_pnp_backdata.py @@ -1,3 +1,10 @@ +import pandas as pd +import numpy as np +import os +import toml + +from pandas.testing import assert_frame_equal + from src.imputation.apportionment import run_apportionment from src.staging import staging_helpers as stage_hlp from src.staging.postcode_validation import format_postcodes @@ -6,12 +13,13 @@ from src.utils.config import config_setup from src.utils.local_file_mods import rd_read_csv, rd_write_csv, rd_file_exists import logging -import numpy as np -import os -import toml + MappingMainLogger = logging.getLogger(__name__) +root = "R:/BERD Results System Development 2023/DAP_emulation/2021_surveys/PNP/06_imputation/" # noqa: E501 +backdata_in_path = root + "backdata_prep/" + # Change to the project repository location my_wd = os.getcwd() my_repo = "research-and-development" @@ -102,7 +110,6 @@ def identify_key_business(df): df (pd.DataFrame): The dataframe with the identified key business columns. """ # get key businesses - backdata_in_path = config["imputation_paths"]["backdata_in_path"] key_businesses_df = rd_read_csv( os.path.join(backdata_in_path, "KEYS 2023.csv") @@ -129,7 +136,6 @@ def identify_osmotherly_businesses(df): columns. """ # get osmotherly businesses - backdata_in_path = config["imputation_paths"]["backdata_in_path"] osmotherly_businesses_df = rd_read_csv( os.path.join(backdata_in_path, "Osmotherly PNP 2023.csv") @@ -294,10 +300,12 @@ def add_missing_columns(df): Return: df (pd.DataFrame): The dataframe with the added missing columns. """ - - df["226"] = None - df["228"] = None - df["237"] = None + missing_list = ['226', '228', '237', '203', '225', '227', '229', '251', + '300', '301', '307', '308', '309', '708', + 'survey', 'formid', 'cellnumber', 'pg_numeric'] + for col in missing_list: + if col not in df.columns: + df[col] = np.nan return df @@ -314,11 +322,27 @@ def clean_postcodes(df): df["601"] = df["601"].str.replace("'", "") df["601"] = df["601"].apply(format_postcodes) + return df + +def multiply_by_1000(df, config): + """Values in columns starting 2xx or 3xx are multiplied by 1000.""" + # a list of columns to be updated + numcols = config["breakdowns"]["211"] + config["breakdowns"]["305"] + ["211", "305"] + cols = [c for c in numcols if c in df.columns] + + for col in cols: + # check if the column is float or integer: + try: + df[col] = df[col].apply(lambda x: x * 1000 if x > 0 else x) + except: + print(f"colum {col} is of type {df[col].dtype}") + df[col] = pd.to_numeric(df[col], errors='coerce') + df[col] = df[col].apply(lambda x: x * 1000 if x > 0 else x) return df -def populate_instance_1_columns(df): +def populate_instance_1_columns(df, config): """ Function to populate instance 1 columns that begin with 3 or 2. Args: @@ -327,24 +351,121 @@ def populate_instance_1_columns(df): Return: df (pd.DataFrame): The dataframe with the populated instance 1 columns. """ - df["period_reference"] = df["period"].astype(str) + \ - df["reference"].astype(str) + \ - df["instance"].astype(str) - df = df.set_index("period_reference") - - source_df = df[df["instance"] == 0].copy() - - for col in config["breakdowns"]["211"] + \ - config["breakdowns"]["305"] + \ - ["211", "305"]: - if col in source_df.columns: - for period_reference in source_df.index.values: - df.loc[period_reference[0:-1] + "1", col] = source_df.loc[ - period_reference, col - ] - df = df.reset_index() + # a list of columns to be updated + numcols = config["breakdowns"]["211"] + config["breakdowns"]["305"] + ["211", "305"] + cols = [c for c in numcols if c in df.columns] - return df + # the rows which contain the data use for the updates + source_df = df[df["instance"] == 0].copy()[["reference"] + cols] + # the dataframe to be used for the update + update_df = source_df.copy() + update_df["instance"] = 1 + + # add extra rows with instance 1 to the original dataframe if a reference does not + # have an istance = 1 row + refs_with_ins_1 = df[df["instance"] == 1]["reference"].unique() + refs_without_ins_1 = set(source_df["reference"].unique()) - set(refs_with_ins_1) + + extra_rows_df = df[df["reference"].isin(refs_without_ins_1)].copy() + extra_rows_df["instance"] = 1 + + df = pd.concat([df, extra_rows_df], ignore_index=True) + + merged_df = pd.merge( + df, update_df, on=["reference", "instance"], how="left", suffixes=("", "_y") + ) + + # replace all values in the columns with the values from the update_df + for col in cols: + merged_df.loc[merged_df["instance"] == 1, col] = merged_df[col + "_y"] + merged_df.loc[merged_df["instance"] == 0, col] = 0 + merged_df.drop(columns=[col + "_y"], inplace=True) + + return merged_df + + +def test_populate_instance_1_columns(): + """Test populate_instance_1_columns function.""" + + # Example input DataFrame + data = { + "reference": [1, 1, 2, 2, 3, 3, 4], + "instance": [0, 1, 0, 1, 0, 1, 0], + "211": [10, 0, 20, 5, 30, 0, 40], + "305": [30, 0, 40, 10, 50, 0, 60], + "202": [100, 200, 300, 400, 500, 600, 700], + "301": [100, 200, 300, 400, 500, 600, 700], + "oth": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0] + } + df = pd.DataFrame(data) + + # Example config + config = { + "breakdowns": { + "211": ["202"], + "305": ["301"] + } + } + + # Define the expected output DataFrame + expected_data = { + "reference": [1, 1, 2, 2, 3, 3, 4, 4], + "instance": [0, 1, 0, 1, 0, 1, 0, 1], + "211": [0, 10, 0, 20, 0, 30, 0, 40], + "305": [0, 30, 0, 40, 0, 50, 0, 60], + "202": [0, 100, 0, 300, 0, 500, 0, 700], + "301": [0, 100, 0, 300, 0, 500, 0, 700], + "oth": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 7.0] + } + expected_df = pd.DataFrame(expected_data) + + # Call the function + result_df = populate_instance_1_columns(df, config) + + # Assert that the result DataFrame is equal to the expected DataFrame + assert_frame_equal(result_df, expected_df, check_dtype=False) + + +def test_populate_instance_1_columns(): + """Test populate_instance_1_columns function.""" + + # Example input DataFrame + data = { + "reference": [1, 1, 2, 2, 3, 3, 4], + "instance": [0, 1, 0, 1, 0, 1, 0], + "211": [10, 0, 20, 5, 30, 0, 40], + "305": [30, 0, 40, 10, 50, 0, 60], + "202": [100, 200, 300, 400, 500, 600, 700], + "301": [100, 200, 300, 400, 500, 600, 700], + "oth": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0] + } + df = pd.DataFrame(data) + + # Example config + config = { + "breakdowns": { + "211": ["202"], + "305": ["301"] + } + } + + # Define the expected output DataFrame + expected_data = { + "reference": [1, 1, 2, 2, 3, 3, 4, 4], + "instance": [0, 1, 0, 1, 0, 1, 0, 1], + "211": [0, 10, 0, 20, 0, 30, 0, 40], + "305": [0, 30, 0, 40, 0, 50, 0, 60], + "202": [0, 100, 0, 300, 0, 500, 0, 700], + "301": [0, 100, 0, 300, 0, 500, 0, 700], + "oth": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 7.0] + } + expected_df = pd.DataFrame(expected_data) + + # Call the function + result_df = populate_instance_1_columns(df, config) + + # Assert that the result DataFrame is equal to the expected DataFrame + assert_frame_equal(result_df, expected_df, check_dtype=False) def create_pnp_backdata(df): @@ -462,7 +583,6 @@ def create_pnp_backdata(df): 'q0905', 'q0907', 'q0909', - 'period_reference', 'pnp_key', 'osmotherly', 'area', @@ -529,21 +649,20 @@ def create_pnp_backdata(df): 'q0906': '304', 'q0908': '305'} - new_column_order = ['period', 'reference', 'formtype', 'Region', - 'period_year', 'instance', '101', '103', '104', '200', - '201', '202', '204', '205', '206', '207', '209', '210', - '211', '212', '214', '216', '218', '219', '220', '221', - '222', '223', '226', '228', '237', '242', '243', '244', - '245', '246', '247', '248', '249', '250', '302', '303', - '304', '305', '405', '406', '407', '408', '409', '410', - '411', '412', '501', '502', '503', '504', '505', '506', - '507', '508', '601', '602', '604', - 'statusencoded', 'status', 'imp_marker', 'imp_class', - 'emp_researcher', 'emp_technician', 'emp_other', 'emp_total', - 'headcount_res_m','headcount_res_f', - 'headcount_tec_m', 'headcount_tec_f', - 'headcount_oth_m', 'headcount_oth_f', - 'headcount_tot_m','headcount_tot_f', 'headcount_total'] + new_column_order = ['reference', 'period', 'survey', 'status', 'formid', 'instance', + '101', '103', '104', '200', '201', '202', '203', '204', '205', + '206', '207', '209', '210', '211', '212', '214', '216', '218', + '219', '220', '221', '222', '223', '225', '226', '227', '228', + '229', '237', '242', '243', '244', '245', '246', '247', '248', + '249', '250', '251', '300', '301', '302', '303', '304', '305', + '307', '308', '309', '405', '406', '407', '408', '409', '410', + '411', '412', '501', '502', '503', '504', '505', '506', '507', + '508', '601', '602', '604', '708', + 'cellnumber', 'pg_numeric', 'emp_researcher', 'emp_technician', + 'emp_other', 'emp_total', 'headcount_res_m', 'headcount_res_f', + 'headcount_tec_m', 'headcount_tec_f', 'headcount_oth_m', + 'headcount_oth_f', 'headcount_tot_m', 'headcount_tot_f', + 'headcount_total', 'imp_class', 'imp_marker', 'formtype'] # Rename wanted columns df = df.rename(columns=columns_to_rename_dict) @@ -594,9 +713,6 @@ def create_pnp_backdata(df): use_cellno=False ) - # Run the apportionment on the PNP backdata - df = run_apportionment(df) - # strip leading 0's from select columnns df = remove_leading_zeros(df) @@ -606,14 +722,24 @@ def create_pnp_backdata(df): # clean postcodes df = clean_postcodes(df) - # Populate instance 1 columns that begin with 3 or 2. - df = populate_instance_1_columns(df) + # Multiply values in columns starting 2xx or 3xx by 1000 + df = multiply_by_1000(df, config) + + # Test the populate_instance_1_columns function + test_populate_instance_1_columns() - # Remove unwanted columns - df = df.drop(columns=columns_to_remove_list) + # Populate instance 1 columns that begin with 3xx or 2xx. + df = populate_instance_1_columns(df, config) + + # Run the apportionment on the PNP backdata + df = run_apportionment(df) + + # Remove unwanted columns if the occur in the dataframe + to_remove = [col for col in columns_to_remove_list if col in df.columns] + df = df.drop(columns=to_remove) # Re-order columns to match BERD (for ease of comparrison) - df = df[new_column_order] + df = df[[c for c in new_column_order if c in df.columns]] return df @@ -634,7 +760,6 @@ def main(): """ # Read the input CSV file into a DataFrame - backdata_in_path = config["imputation_paths"]["backdata_in_path"] df = rd_read_csv( os.path.join(backdata_in_path, "210_202112 Raw data from CORA.csv") @@ -644,16 +769,19 @@ def main(): pnp_backdata_df = create_pnp_backdata(df) # Save the cleaned DataFrame to the output CSV file - backdata_out_path = config["imputation_paths"]["backdata_out_path"] - backdata_out_path = backdata_out_path.replace("2023_surveys", "2021_surveys") + backdata_out_path = backdata_in_path rd_write_csv( os.path.join( backdata_out_path, - "PNP_2021_cleaned_backdata.csv"), + "PNP_2021_backdata_clean.csv"), pnp_backdata_df ) if __name__ == "__main__": main() + +# Example usage of the test function +# if __name__ == "__main__": +# test_populate_instance_1_columns() diff --git a/helpers/sic_pg_extract.py b/helpers/sic_pg_extract.py index da01e875a..39bd7a640 100644 --- a/helpers/sic_pg_extract.py +++ b/helpers/sic_pg_extract.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ Created on Wed Sep 11 13:50:53 2024 -This script extracts numeric pg codes that map to more than one pg_alpha, +This script extracts numeric pg codes that map to more than one pg_alpha, depending on the sic code @author: zoring """ @@ -20,9 +20,9 @@ import pandas as pd #%% Load input -mypath = join(fol_in, map_in) +mypath = join(fol_in, map_in) df = pd.read_csv( - mypath, + mypath, usecols=[sic_col, alpha_col, num_col] ).drop_duplicates() @@ -31,7 +31,7 @@ df_multy = df_agg.loc[df_agg[alpha_col] > 1][[num_col]] multi_list = list(df_multy[num_col]) -#%% Extract potential multiplicate +#%% Extract potential multiplicate df_out = df.loc[df[num_col].isin(multi_list)] mypath = join(fol_out, map_out) df_out.to_csv(mypath, index=None) diff --git a/main.py b/main.py index ee63f385e..2d842df1f 100644 --- a/main.py +++ b/main.py @@ -1,21 +1,24 @@ from importlib import reload -import time import os -# Change to the project repository location -my_wd = os.getcwd() -my_repo = "research-and-development" -if not my_wd.endswith(my_repo): - os.chdir(my_repo) -import src.pipeline as src # noqa: E402 +def setup_environment(): + # Change to the project repository location + my_wd = os.getcwd() + my_repo = "research-and-development" + if not my_wd.endswith(my_repo): + os.chdir(my_repo) + # Import the module after changing the directory + import src.pipeline as src + + reload(src) + return src + -# reload the pipeline module to implement any changes -reload(src) user_path = os.path.join("src", "user_config.yaml") dev_path = os.path.join("src", "dev_config.yaml") -start = time.time() +src = setup_environment() run_time = src.run_pipeline(user_path, dev_path) min_secs = divmod(round(run_time), 60) diff --git a/notebooks/.gitkeep b/notebooks/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/notebooks/README.md b/notebooks/README.md deleted file mode 100644 index b65720448..000000000 --- a/notebooks/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# `notebooks` folder overview - -All Jupyter notebooks should be stored in this folder. - -The `.envrc` file should automatically add the entire project path into the -`PYTHONPATH` environment variable. This should allow you to directly import `src` in -your notebook. diff --git a/notebooks/Schema_maker.ipynb b/notebooks/Schema_maker.ipynb deleted file mode 100644 index cde7df731..000000000 --- a/notebooks/Schema_maker.ipynb +++ /dev/null @@ -1,146 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Schema Maker\n", - "\n", - "Creates a complete schema using an existing csv of sample or real data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "\n", - "import pandas as pd" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Imputation file location and name\n", - "common_dir = \"R:\\\\BERD Results System Development 2023\\\\DAP_emulation\\\\\"\n", - "input_dir = \"outputs\\\\output_status_filtered_qa\"\n", - "\n", - "pref = \"status_filtered_qa\"\n", - "suff = \"_2023-12-07_v245.csv\"\n", - "\n", - "# Output folder for all schemas\n", - "out_dir = r\"D:\\programming_projects\\research-and-development\\config\\output_schemas\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read the top 100 rows, inferrring the schema from csv\n", - "read_in_dir = common_dir + input_dir\n", - "read_in_path = os.path.join(read_in_dir, pref + suff)\n", - "df = pd.read_csv(read_in_path, nrows=100)\n", - "\n", - "\n", - "# Remove \"index\" column if it exists\n", - "if \"index\" in df.columns:\n", - " df = df.drop(\"index\", axis=1)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "shp = df.shape\n", - "print(f\"columns: {shp[1]}. Rows {shp[0]}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Stringify the data types" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Get column names as data types as dict of strings\n", - "types = df.dtypes.to_dict()\n", - "# Stringify the datatypes\n", - "schema = {col[0]: str(col[1]) for col in types.items()}" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Calculate the schema" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Initially, the schema is empty\n", - "base_str = \"\"\n", - "add_in_str = f'[{col}]\\nold_name = \"{col}\"\\nDeduced_Data_Type = \"{schema[col]}\"\\n\\n'\n", - "\n", - "\n", - "# Iterate through columns, adding to the string which will be written to toml\n", - "for col in schema:\n", - " base_str += add_in_str" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Output the schema toml file\n", - "mypath = os.path.join(out_dir, pref + \"_schema.toml\")\n", - "text_file = open(mypath, \"w\")\n", - "text_file.write(S)\n", - "text_file.close()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.3" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/notebooks/estimation/calculate_weighting_factor_timing.ipynb b/notebooks/estimation/calculate_weighting_factor_timing.ipynb deleted file mode 100644 index 7c339bed3..000000000 --- a/notebooks/estimation/calculate_weighting_factor_timing.ipynb +++ /dev/null @@ -1,84 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%%timeit\n", - "import numpy as np\n", - "import pandas as pd\n", - "\n", - "frame = {\"test1\": [], \"test2\": [], \"test3\": []}\n", - "df = pd.DataFrame(frame)\n", - "\n", - "for i in range(10):\n", - " df = df.append(\n", - " pd.Series([np.zeros(10) for i in range(3)], index=df.columns), ignore_index=True\n", - " )\n", - "\n", - "df" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import numpy as np\n", - "\n", - "%%timeit\n", - "import pandas as pd\n", - "\n", - "dfs = []\n", - "\n", - "for i in range(10):\n", - " frame = {\"test1\": np.zeros(10), \"test2\": np.zeros(10), \"test3\": np.zeros(10)}\n", - " dfs.append(pd.DataFrame(frame))\n", - "\n", - "df = pd.concat(dfs, axis=1)\n", - "df" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import numpy as np\n", - "\n", - "%%timeit\n", - "import pandas as pd\n", - "\n", - "frame = {\"test1\": [], \"test2\": [], \"test3\": []}\n", - "\n", - "for i in range(10):\n", - " frame[\"test1\"].extend(np.zeros(10))\n", - " frame[\"test2\"].extend(np.zeros(10))\n", - " frame[\"test3\"].extend(np.zeros(10))\n", - "\n", - "df = pd.DataFrame(frame)\n", - "df" - ] - } - ], - "metadata": { - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.13" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/notebooks/regression_tester.ipynb b/notebooks/regression_tester.ipynb deleted file mode 100644 index 4fda54c10..000000000 --- a/notebooks/regression_tester.ipynb +++ /dev/null @@ -1,214 +0,0 @@ -{ - "cells": [ - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Regression tester\n", - "\n", - "A version in jupyter notebook form to save time loading large csv files.\n", - "\n", - "Regression test to compare two versions of outputs\n", - "Reads two csv files, old and new\n", - "Selects the columns of interest\n", - "Joins old and new on key columns, outer\n", - "Checks which records are in old only (left), new only (right) or both\n", - "Compares if the old and new values are the same within tolerance\n", - "Saves the ouotputs" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [] - }, - { - "cell_type": "code", - "execution_count": 12, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "\n", - "import pandas as pd" - ] - }, - { - "cell_type": "code", - "execution_count": 30, - "metadata": {}, - "outputs": [], - "source": [ - "# Imputation file location and name\n", - "common_dir = \"R:\\\\BERD Results System Development 2023\\\\DAP_emulation\\\\\"\n", - "\n", - "input_dir_new = \"outputs\\\\output_long_form\"\n", - "input_dir_new = \"outputs\\\\output_gb_sas\"\n", - "\n", - "pref_old = \"Frozen_Base_Data\"\n", - "suff_old = \"_2024-02-05_v471.csv\"\n", - "\n", - "pref_new = \"output_gb_sas\"\n", - "suff_new = \"_2024-02-05_v471.csv\"\n", - "\n", - "# Output folder for all schemas\n", - "out_dir = \"D:\\\\coding_projectsrandd_test_data\\\\\"\n", - "\n", - "# Columns to select\n", - "key_cols_old = [\"reference\", \"200\", \"201\"]\n", - "key_cols_new = [\"reference\", \"instance\", \"200\", \"201\"]\n", - "value_col = \"211\"\n", - "other_cols = [\"status\", \"imp_marker\", \"602\", \"referencepostcode\", \"postcodes_harmonised\"]\n", - "tolerance = 0.001" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "read_in_dir = common_dir\n", - "read_in_path_old = os.path.join(common_dir, input_dir_old, pref_old + suff_old)\n", - "df_old = pd.read_csv(read_in_path_old)\n", - "\n", - "\n", - "# Remove \"index\" column if it exists\n", - "if \"index\" in df_old.columns:\n", - " df_old = df.drop(\"index\", axis=1)\n", - "\n", - "df_old.head()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read the top 100 rows, inferrring the schema from csv\n", - "read_in_dir = common_dir\n", - "read_in_path_new = os.path.join(common_dir, input_dir_new, pref_new + suff_new)\n", - "df_new = pd.read_csv(read_in_path_new)\n", - "\n", - "\n", - "# Remove \"index\" column if it exists\n", - "if \"index\" in df_new.columns:\n", - " df_new = df.drop(\"index\", axis=1)\n", - "\n", - "df_new.head()" - ] - }, - { - "cell_type": "code", - "execution_count": 26, - "metadata": {}, - "outputs": [], - "source": [ - "# filter conditions\n", - "def get_mask(df:pd.DataFrame, col:str, values:list):\n", - " return df[col].isin(values)\n", - "\n", - "def filter_df(df:pd.DataFrame, col:str, values:list):\n", - " return df.copy().loc[df[col].isin(values)]\n", - "\n", - "# filter_df(df_old, \"formtype\", [1]).head()\n", - "\n", - "# # Filter good statuses only\n", - "# imp_markers_to_keep = [\"TMI\", \"CF\", \"MoR\", \"constructed\"]\n", - "# df_old = filter_df(df_old, \"imp_marker\", imp_markers_to_keep)\n", - "# df_new = filter_df(df_new, \"imp_marker\", imp_markers_to_keep)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\n", - "# sizes\n", - "print(f\"Old size: {df_old.shape}\")\n", - "print(f\"New size: {df_new.shape}\")\n", - "\n", - "#%% Join\n", - "df_merge = df_old.merge(\n", - " df_new,\n", - " on=key_cols,\n", - " how=\"outer\",\n", - " suffixes=(\"_old\", \"_new\"),\n", - " indicator=True\n", - ")\n", - "#%% Compare the values\n", - "df_merge[\"value_different\"] = (\n", - " (df_merge[value_col + \"_old\"] - df_merge[value_col + \"_new\"])**2 > tolerance**2\n", - ")\n", - "\n", - "# %% Save output\n", - "write_csv(out_fol + out_file, df_merge)" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Calculate the schema" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Initially, the schema is empty\n", - "base_str = \"\"\n", - "add_in_str = f'[{col}]\\nold_name = \"{col}\"\\nDeduced_Data_Type = \"{schema[col]}\"\\n\\n'\n", - "\n", - "\n", - "# Iterate through columns, adding to the string which will be written to toml\n", - "for col in schema:\n", - " base_str += add_in_str" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Output the schema toml file\n", - "mypath = os.path.join(out_dir, pref + \"_schema.toml\")\n", - "text_file = open(mypath, \"w\")\n", - "text_file.write(S)\n", - "text_file.close()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.3" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/pytest.ini b/pytest.ini index 81813b693..711690277 100644 --- a/pytest.ini +++ b/pytest.ini @@ -4,4 +4,3 @@ addopts = -vv -rs testpaths = "./tests" filterwarnings = ignore::DeprecationWarning - diff --git a/src/construction/all_data_construction.py b/src/construction/all_data_construction.py index 89335f9a0..376c70e64 100644 --- a/src/construction/all_data_construction.py +++ b/src/construction/all_data_construction.py @@ -1,5 +1,4 @@ import logging -from typing import Callable import pandas as pd import numpy as np @@ -20,7 +19,7 @@ ) -def all_data_construction( +def all_data_construction( # noqa: C901 construction_df: pd.DataFrame, snapshot_df: pd.DataFrame, construction_logger: logging.Logger, @@ -48,7 +47,7 @@ def all_data_construction( columns={"short_to_long": "construction_type"}, inplace=True ) construction_df.loc[ - construction_df["construction_type"] == True, "construction_type" + construction_df["construction_type"].isin([True]), "construction_type" ] = "short_to_long" # clean construction type column diff --git a/src/construction/construction_helpers.py b/src/construction/construction_helpers.py index c583e53af..19d4f7420 100644 --- a/src/construction/construction_helpers.py +++ b/src/construction/construction_helpers.py @@ -99,7 +99,7 @@ def prepare_short_to_long( updated_snapshot_df: pd.DataFrame, construction_df: pd.DataFrame, unique_references: list, - ) -> Tuple[pd.DataFrame, list]: +) -> Tuple[pd.DataFrame, list]: """Create addional instances for short to long construction. Args: @@ -181,7 +181,9 @@ def finalise_forms_gb(updated_snapshot_df: pd.DataFrame) -> pd.DataFrame: and short forms reset. """ - constructed_df = updated_snapshot_df[updated_snapshot_df.is_constructed.isin([True])] + constructed_df = updated_snapshot_df[ + updated_snapshot_df.is_constructed.isin([True]) + ] not_constructed_df = updated_snapshot_df[ updated_snapshot_df.is_constructed.isin([False]) ] diff --git a/src/construction/construction_read_validate.py b/src/construction/construction_read_validate.py index 0151e32cf..38dfec0e7 100644 --- a/src/construction/construction_read_validate.py +++ b/src/construction/construction_read_validate.py @@ -90,8 +90,8 @@ def read_validate_postcode_construction_file( Args: config (dict): The pipeline configuration. check_file_exists (Callable): Function to check if file exists. - read_csv (Callable): Function to read a csv file.nstruction (bool, optional): If true, run the postcode - only construction. + read_csv (Callable): Function to read a csv file. + construction_logger (logging.Logger): The logger for the construction module. Returns: pd.DataFrame: The construction dataframes for postcode constructions. diff --git a/src/construction/construction_validation.py b/src/construction/construction_validation.py index bf0ec2e4e..b15451a47 100644 --- a/src/construction/construction_validation.py +++ b/src/construction/construction_validation.py @@ -161,7 +161,7 @@ def _references_in_snapshot( construction_df[["reference"]].isin(snapshot_refs), how="left", rsuffix="_valid" ) # obtain a df of invalid references (not in snpashot) - invalid_refs = valid_df[valid_df.reference_valid == False] + invalid_refs = valid_df[valid_df.reference_valid.isin([False])] if len(invalid_refs) > 0: inv = invalid_refs["reference"].unique() if logger: diff --git a/src/dev_config.yaml b/src/dev_config.yaml index ff390477e..acc98fc00 100644 --- a/src/dev_config.yaml +++ b/src/dev_config.yaml @@ -44,7 +44,6 @@ imputation_paths: qa_path: "imputation_qa" manual_trimming_path: "manual_trimming" backdata_out_path: "backdata_output" - backdata_in_path: "backdata_input" outliers_paths: folder: "07_outliers" qa_path: "outliers_qa" @@ -81,7 +80,7 @@ network_paths: # Imputation and outliers input paths # backdata_path: "R:/BERD Results System Development 2023/DAP_emulation/2021_surveys/BERD/06_imputation/backdata_output/2021_backdata_oct_24.csv" backdata_path: "R:/BERD Results System Development 2023/DAP_emulation/2022_surveys/BERD/06_imputation/backdata_output/2022_backdata_published_v347.csv" - pnp_backdata_path: "R:/BERD Results System Development 2023/DAP_emulation/2021_surveys/PNP/06_imputation/backdata_prep/PNP_2021_backdata_clean.csv" + pnp_backdata_path: "R:/BERD Results System Development 2023/DAP_emulation/2021_surveys/PNP/06_imputation/backdata_output/PNP_2021_backdata.csv" manual_imp_trim_path: "06_imputation/manual_trimming/2023_manual_trimming_v1.csv" manual_outliers_path: "07_outliers/manual_outliers/2023_manual_outliers_v1.csv" # Construction paths @@ -113,6 +112,8 @@ schema_paths: full_responses_imputed_schema: "config/output_schemas/full_responses_imputed_schema.toml" staged_full_responses_schema: "config/output_schemas/staged_full_responses_schema.toml" invalid_unrecognised_postcodes_schema: "config/output_schemas/invalid_unrecognised_postcodes_schema.toml" + full_responses_mapped_schema: "config/output_schemas/full_responses_mapped_schema.toml" + # Export config for users mappers: geo_cols: ["ITL221CD", "ITL221NM", "ITL121CD", "ITL121NM"] diff --git a/src/estimation/apply_weights.py b/src/estimation/apply_weights.py index 47cabd8ac..3dccb4d31 100644 --- a/src/estimation/apply_weights.py +++ b/src/estimation/apply_weights.py @@ -32,10 +32,12 @@ def apply_weights( # if the dataframe is for QA output, create new columns with the weights applied. if for_qa: - estimated_cols = pd.concat( - [round(df[col] * df["a_weight"], round_val).rename( - f"{col}_estimated") for col in cols_list], axis=1 + [ + round(df[col] * df["a_weight"], round_val).rename(f"{col}_estimated") + for col in cols_list + ], + axis=1, ) df = pd.concat([df, estimated_cols], axis=1) diff --git a/src/freezing/freezing_apply_changes.py b/src/freezing/freezing_apply_changes.py index 145dc12a4..69dde1b0c 100644 --- a/src/freezing/freezing_apply_changes.py +++ b/src/freezing/freezing_apply_changes.py @@ -1,11 +1,8 @@ import logging -import os -from datetime import datetime -from typing import Union, Callable, Dict +from typing import Callable, Dict import pandas as pd -from src.utils.helpers import values_in_column from src.freezing.freezing_utils import _add_last_frozen_column @@ -147,7 +144,7 @@ def apply_amendments( amended_df (pd.DataFrame): The main snapshot with amendments applied. """ changes_refs = amendments_df[ - amendments_df.accept_changes == True + amendments_df.accept_changes.isin([True]) ].reference.unique() accepted_amendments_df = amendments_df[amendments_df.reference.isin(changes_refs)] @@ -200,13 +197,25 @@ def apply_additions( FreezingLogger.info("Skipping additions since the additions csv is invalid...") return main_df # Drop records where accept_changes is False and if any remain, add them to main df - changes_refs = additions_df[additions_df.accept_changes == True].reference.unique() + changes_refs = additions_df[ + additions_df.accept_changes.isin([True]) + ].reference.unique() accepted_additions_df = additions_df[additions_df.reference.isin(changes_refs)] # removes the old form sent out where we have a new clear response - remove_status = ['Form sent out', 'Ceased trading (NIL4)', 'Out of scope (NIL3)', 'Dormant (NIL5)'] - main_df = main_df[~((main_df.reference.isin(accepted_additions_df.reference)) & (main_df.status.isin(remove_status)))] + remove_status = [ + "Form sent out", + "Ceased trading (NIL4)", + "Out of scope (NIL3)", + "Dormant (NIL5)", + ] + main_df = main_df[ + ~( + (main_df.reference.isin(accepted_additions_df.reference)) + & (main_df.status.isin(remove_status)) + ) + ] accepted_additions_df = accepted_additions_df.drop("accept_changes", axis=1) if accepted_additions_df.shape[0] > 0: diff --git a/src/freezing/freezing_compare.py b/src/freezing/freezing_compare.py index 8a5e02f39..13d692e74 100644 --- a/src/freezing/freezing_compare.py +++ b/src/freezing/freezing_compare.py @@ -38,11 +38,7 @@ def get_amendments( "704", "705", "706", "707", "709", "711", ] - non_numeric_cols = [ - "200", - "201", - "601", - ] + non_numeric_cols = ["200", "201", "601"] # numeric_cols_new = [f"{i}_updated" for i in numeric_cols] numeric_cols_diff = [f"{i}_diff" for i in numeric_cols] # non_numeric_cols_new = [f"{i}_updated" for i in non_numeric_cols] diff --git a/src/freezing/freezing_utils.py b/src/freezing/freezing_utils.py index 777eec8c4..573f39684 100644 --- a/src/freezing/freezing_utils.py +++ b/src/freezing/freezing_utils.py @@ -5,8 +5,7 @@ from src.utils.defence import type_defence -def _add_last_frozen_column(frozen_df: pd.DataFrame, - config: dict) -> pd.DataFrame: +def _add_last_frozen_column(frozen_df: pd.DataFrame, config: dict) -> pd.DataFrame: """Add the last_frozen column to staged data. Args: diff --git a/src/imputation/MoR.py b/src/imputation/MoR.py index 69ef2789a..5d83f2d78 100644 --- a/src/imputation/MoR.py +++ b/src/imputation/MoR.py @@ -1,4 +1,5 @@ """Functions for the Mean of Ratios (MoR) methods.""" + import itertools import re import pandas as pd @@ -12,61 +13,28 @@ bad_statuses = ["Form sent out", "Check needed"] -def run_mor(df, backdata, impute_vars, config): - """Function to implement Mean of Ratios method. +def is_lf_only(config): + """ + Determine if there are only long form entries in the current data or backdata. - This is implemented by first carrying forward data from last year - for non-responders, and then calculating and applying growth rates - for each imputation class. + PNP surveys only have long forms. Also, the BERD 2021 backdata only contains + long forms, so if the current year for BERD processing is 2022, then only long + forms can be imputed. (Note: we only have BERD data in 2022 so we only need to + check the survey year and not the type for this case.) Args: - df (pd.DataFrame): Processed full responses DataFrame - backdata (pd.DataFrame): One period of backdata. - impute_vars ([string]): List of variables to impute. + config (dict): Configuration dictionary containing survey details. Returns: - pd.DataFrame: df with MoR applied. - pd.DataFrame: QA DataFrame showing how imputation links are calculated. + bool: True if there are only long form entries, False otherwise. """ - # If the survey year is 2022, there is no shortform backdata - is_2022 = config["survey"]["survey_year"] == 2022 - - to_impute_df, remainder_df, backdata = mor_preprocessing( - df, - backdata, - is_2022, - config - ) - - # Carry forwards method - carried_forwards_df = carry_forwards(to_impute_df, backdata, impute_vars) - - # apply MoR for long form responders - imputed_df_long, links_df_long = calculate_mor( - carried_forwards_df, remainder_df, backdata, config, "long" - ) - - # If the survey year is 2022, there is no shortform backdata - if is_2022: - imputed_df = pd.concat([remainder_df, imputed_df_long]).reset_index(drop=True) - links_df = links_df_long + pnp_survey_cond = config["survey"]["survey_type"] == "PNP" + berd_2021_backdata_cond = config["survey"]["survey_year"] == 2022 - else: - # apply MoR for short form responders - imputed_df_short, links_df_short = calculate_mor( - carried_forwards_df, remainder_df, backdata, config, "short" - ) + return pnp_survey_cond or berd_2021_backdata_cond - imputed_df = pd.concat( - [remainder_df, imputed_df_long, imputed_df_short] - ).reset_index(drop=True) - links_df = pd.concat([links_df_long, links_df_short]).reset_index(drop=True) - - return imputed_df, links_df - - -def mor_preprocessing(df, backdata, is_2022, config): +def mor_preprocessing(df, backdata, config): """Apply filtering and pre-processing ready for MoR. This function creates imputation classes, cleans the "formtype" column. @@ -76,7 +44,7 @@ def mor_preprocessing(df, backdata, is_2022, config): Args: df (pd.DataFrame): full responses for the current year backdata (pd.Dataframe): backdata file read in during staging. - is_2022 (bool): whether the survey year is 2022 + config (dict): Configuration settings. Returns: pd.DataFrame: DataFrame of records to impute @@ -85,15 +53,10 @@ def mor_preprocessing(df, backdata, is_2022, config): """ # Create imp_class column if config["survey"]["survey_type"] == "BERD": - df = create_imp_class_col( - df, - ["200", "201"] - ) + df = create_imp_class_col(df, ["200", "201"]) elif config["survey"]["survey_type"] == "PNP": df = create_imp_class_col( - df, ["pnp_key", "area"], - use_osmotherly=True, - use_cellno=False + df, ["pnp_key", "area"], use_osmotherly=True, use_cellno=False ) # ensure the "formtype" column is in the correct format @@ -105,8 +68,8 @@ def mor_preprocessing(df, backdata, is_2022, config): lf_cond = df["formtype"] == "0001" stat_cond = df["status"].isin(bad_statuses) - # there is no shortform backdata if the survey year is 2022 - if is_2022: + # the case where there are only long forms is treated differently + if is_lf_only(config): imputation_cond = stat_cond & lf_cond else: @@ -123,8 +86,8 @@ def mor_preprocessing(df, backdata, is_2022, config): return to_impute_df, remainder_df, backdata -def carry_forwards(df, backdata, impute_vars): - """Carry forwards matcing `backdata` values for references to be imputed. +def carry_forwards(df, backdata, impute_vars, config): + """Carry forwards matching `backdata` values for references to be imputed. Records are matched based on 'reference'. @@ -175,7 +138,8 @@ def carry_forwards(df, backdata, impute_vars): df.loc[pc_update_cond, "postcodes_harmonised"] = df.loc[match_cond, "601"] # Update the imputation classes based on the new 200 and 201 values - df = create_imp_class_col(df, ["200", "201"]) + if config["survey"]["survey_type"] == "BERD": + df = create_imp_class_col(df, ["200", "201"]) # Update the varibles to be imputed by the corresponding previous values for var in impute_vars: @@ -323,7 +287,7 @@ def calculate_links(gr_df, target_vars, config): def get_threshold_value(config: dict) -> int: """Read, validate and return threshold value from the config.""" threshold_num = config["imputation"]["mor_threshold"] - if (type(threshold_num) == int) & (threshold_num >= 0): + if isinstance(threshold_num, int) & (threshold_num >= 0): return threshold_num else: raise Exception( @@ -417,10 +381,10 @@ def apply_links(cf_df, links_df, target_vars, config, formtype): for var in target_vars: # Only apply MoR where the link is non null/0 no_zero_mask = pd.notnull(cf_df[f"{var}_link"]) & (cf_df[f"{var}_link"] != 0) - full_mask = matched_mask & no_zero_mask + mask = matched_mask & no_zero_mask # Apply the links to the previous values - cf_df.loc[full_mask, f"{var}_imputed"] = ( - cf_df.loc[full_mask, f"{var}_imputed"] * cf_df.loc[full_mask, f"{var}_link"] + cf_df.loc[mask, f"{var}_imputed"] = ( + cf_df.loc[mask, f"{var}_imputed"] * cf_df.loc[mask, f"{var}_link"] ) cf_df.loc[matched_mask, "imp_marker"] = "MoR" @@ -431,16 +395,18 @@ def apply_links(cf_df, links_df, target_vars, config, formtype): q for q in q_targets if q in config["imputation"]["lf_target_vars"] ] for var in q_targets: - for breakdown in config["breakdowns"][var]: + for bd in config["breakdowns"][var]: # As above but using different elements to multiply no_zero_mask = pd.notnull(cf_df[f"{var}_link"]) & ( cf_df[f"{var}_link"] != 0 ) - full_mask = matched_mask & no_zero_mask + mask = matched_mask & no_zero_mask # Apply the links to the previous values - cf_df.loc[full_mask, f"{breakdown}_imputed"] = ( - cf_df.loc[full_mask, f"{breakdown}_imputed"] - * cf_df.loc[full_mask, f"{var}_link"] + cf_df[f"{bd}_imputed"] = pd.to_numeric( + cf_df[f"{bd}_imputed"], errors="coerce" + ) + cf_df.loc[mask, f"{bd}_imputed"] = ( + cf_df.loc[mask, f"{bd}_imputed"] * cf_df.loc[mask, f"{var}_link"] ) cf_df.loc[matched_mask, "imp_marker"] = "MoR" @@ -489,3 +455,50 @@ def calculate_mor(cf_df, remainder_df, backdata, config, formtype): imputed_df = apply_links(cf_df, links_df, target_vars, config, formtype) return imputed_df, links_df + + +def run_mor(df, backdata, impute_vars, config): + """Function to implement Mean of Ratios method. + + This is implemented by first carrying forward data from last year + for non-responders, and then calculating and applying growth rates + for each imputation class. + + Args: + df (pd.DataFrame): Processed full responses DataFrame + backdata (pd.DataFrame): One period of backdata. + impute_vars ([string]): List of variables to impute. + + Returns: + pd.DataFrame: df with MoR applied. + pd.DataFrame: QA DataFrame showing how imputation links are calculated. + """ + + to_impute_df, remainder_df, backdata = mor_preprocessing(df, backdata, config) + + # Carry forwards method + carried_forwards_df = carry_forwards(to_impute_df, backdata, impute_vars, config) + + # apply MoR for long form responders + imputed_df_long, links_df_long = calculate_mor( + carried_forwards_df, remainder_df, backdata, config, "long" + ) + + # the cases where there are only long forms is treated differently + if is_lf_only(config): + imputed_df = pd.concat([remainder_df, imputed_df_long]).reset_index(drop=True) + links_df = links_df_long + + else: + # apply MoR for short form responders + imputed_df_short, links_df_short = calculate_mor( + carried_forwards_df, remainder_df, backdata, config, "short" + ) + + imputed_df = pd.concat( + [remainder_df, imputed_df_long, imputed_df_short] + ).reset_index(drop=True) + + links_df = pd.concat([links_df_long, links_df_short]).reset_index(drop=True) + + return imputed_df, links_df diff --git a/src/imputation/imputation_helpers.py b/src/imputation/imputation_helpers.py index 910e17846..7917363e0 100644 --- a/src/imputation/imputation_helpers.py +++ b/src/imputation/imputation_helpers.py @@ -1,7 +1,6 @@ """Utility functions to be used in the imputation module.""" import logging -import os import pandas as pd from typing import List, Tuple from itertools import chain @@ -429,7 +428,9 @@ def breakdown_checks_after_imputation(df: pd.DataFrame) -> None: # the sum of the other cols should equal the total -def tidy_imputation_dataframe(df: pd.DataFrame, to_impute_cols: List, config) -> pd.DataFrame: +def tidy_imputation_dataframe( + df: pd.DataFrame, to_impute_cols: List, config +) -> pd.DataFrame: """Update cols with imputed values and remove rows and columns no longer needed. Args: @@ -459,12 +460,13 @@ def tidy_imputation_dataframe(df: pd.DataFrame, to_impute_cols: List, config) -> ) ] - to_drop += ["200_original", "pg_sic_class", "empty_pgsic_group", "empty_pg_group"] - to_drop += ["200_imp_marker"] - if config["survey"]["survey_type"] == "PNP": to_drop += ["pnp_key", "osmotherly", "area"] + else: + to_drop += ["200_original", "pg_sic_class", "empty_pgsic_group"] + to_drop += ["empty_pg_group", "200_imp_marker"] + df = df.drop(columns=to_drop) return df diff --git a/src/imputation/imputation_main.py b/src/imputation/imputation_main.py index 6b85e9471..675d96819 100644 --- a/src/imputation/imputation_main.py +++ b/src/imputation/imputation_main.py @@ -1,4 +1,5 @@ """The main file for the Imputation module.""" + import logging import os import pandas as pd @@ -20,7 +21,7 @@ ImputationMainLogger = logging.getLogger(__name__) -def run_imputation( +def run_imputation( # noqa: C901 df: pd.DataFrame, manual_trimming_df: pd.DataFrame, backdata: pd.DataFrame, @@ -91,32 +92,36 @@ def run_imputation( # Run MoR if backdata is not None: # MoR will be re-written with new backdata - df, links_df = run_mor(df, backdata, to_impute_cols, config) + imputed_df, links_df = run_mor(df, backdata, to_impute_cols, config) # Run TMI for long forms and short forms - imputed_df, qa_df, trim_counts_qa = tmi.run_tmi(df, config) + # Skip this step for PNP survey for now + if config["survey"]["survey_type"] == "BERD": + imputed_df, qa_df, trim_counts_qa = tmi.run_tmi(imputed_df, config) + + # After TMI imputation, overwrite the "604" == "No" in any records with + # Status "check needed" (they are now being imputed") + chk_mask = imputed_df["status"].str.contains("Check needed") + imputation_mask = imputed_df["imp_marker"] == "TMI" + imputed_df.loc[(chk_mask & imputation_mask), "604"] = "Yes" # Perform TMI step 5, which calculates employment and headcount totals imputed_df = hlp.calculate_totals(imputed_df) - # After TMI imputation, overwrite the "604" == "No" in any records with - # Status "check needed" (they are now being imputed") - chk_mask = imputed_df["status"].str.contains("Check needed") - imputation_mask = imputed_df["imp_marker"] == "TMI" - imputed_df.loc[(chk_mask & imputation_mask), "604"] = "Yes" - # join constructed rows back to the imputed df # Note that constructed rows need to be included in short form expansion if "is_constructed" in df.columns: imputed_df = pd.concat([imputed_df, constructed_df]) - # Run short form expansion - imputed_df = run_sf_expansion(imputed_df, config) + # Run short form expansion imputation for BERD surveys + if config["survey"]["survey_type"] == "BERD": + imputed_df = run_sf_expansion(imputed_df, config) # join manually trimmed columns back to the imputed df if not trimmed_df.empty: imputed_df = pd.concat([imputed_df, trimmed_df]) - qa_df = pd.concat([qa_df, trimmed_df]).reset_index(drop=True) + if config["survey"]["survey_type"] == "BERD": + qa_df = pd.concat([qa_df, trimmed_df]).reset_index(drop=True) imputed_df = imputed_df.sort_values( ["reference", "instance"], ascending=[True, True] @@ -134,16 +139,18 @@ def run_imputation( links_filename = filename_amender("links_qa", config) trimmed_counts_filename = filename_amender("tmi_trim_count_qa", config) - # create trimming qa dataframe with required columns from schema - schema_path = config["schema_paths"]["manual_trimming_schema"] - schema_dict = load_schema(schema_path) - trimming_qa_output = create_output_df(qa_df, schema_dict) + if config["survey"]["survey_type"] == "BERD": + # create trimming qa dataframe with required columns from schema + schema_path = config["schema_paths"]["manual_trimming_schema"] + schema_dict = load_schema(schema_path) + trimming_qa_output = create_output_df(qa_df, schema_dict) + + write_csv(os.path.join(qa_path, trim_qa_filename), trimming_qa_output) + write_csv(os.path.join(qa_path, trimmed_counts_filename), trim_counts_qa) + write_csv(os.path.join(qa_path, wrong_604_filename), wrong_604_qa_df) - write_csv(os.path.join(qa_path, trim_qa_filename), trimming_qa_output) write_csv(os.path.join(qa_path, full_imp_filename), imputed_df) - write_csv(os.path.join(qa_path, wrong_604_filename), wrong_604_qa_df) write_csv(os.path.join(qa_path, links_filename), links_df) - write_csv(os.path.join(qa_path, trimmed_counts_filename), trim_counts_qa) # remove rows and columns no longer needed from the imputed dataframe imputed_df = hlp.tidy_imputation_dataframe(imputed_df, to_impute_cols, config) diff --git a/src/imputation/sf_expansion.py b/src/imputation/sf_expansion.py index eef3e3438..1d0d1e37c 100644 --- a/src/imputation/sf_expansion.py +++ b/src/imputation/sf_expansion.py @@ -212,14 +212,16 @@ def run_sf_expansion(df: pd.DataFrame, config: dict) -> pd.DataFrame: ) # Set dtype of manual_trim column to bool before concatination - convert_dict = {'manual_trim': bool, - 'empty_pgsic_group': bool, - 'empty_pg_group': bool, - '305_trim': bool, - '211_trim': bool} + convert_dict = { + "manual_trim": bool, + "empty_pgsic_group": bool, + "empty_pg_group": bool, + "305_trim": bool, + "211_trim": bool, + } expanded_df = expanded_df.astype(convert_dict) excluded_df = excluded_df.astype(convert_dict) - + # Re-include those records from the reference list before returning df result_df = pd.concat([expanded_df, excluded_df], axis=0) diff --git a/src/mapping/mapping_helpers.py b/src/mapping/mapping_helpers.py index ddd44334c..94473cefc 100644 --- a/src/mapping/mapping_helpers.py +++ b/src/mapping/mapping_helpers.py @@ -198,7 +198,9 @@ def update_ref_list(full_df: pd.DataFrame, ref_list_df: pd.DataFrame) -> pd.Data return df -def create_additional_ni_cols(ni_full_responses: pd.DataFrame, config: dict) -> pd.DataFrame: +def create_additional_ni_cols( + ni_full_responses: pd.DataFrame, config: dict +) -> pd.DataFrame: """ Create additional columns for Northern Ireland data. diff --git a/src/outputs/export_files.py b/src/outputs/export_files.py index 5e65ec8c0..d9237f806 100644 --- a/src/outputs/export_files.py +++ b/src/outputs/export_files.py @@ -52,7 +52,7 @@ def get_schema_headers(config: dict): for output_name, path in schema_paths.items(): with open(path, 'rb') as file: schema_headers_dict[output_name] = tomli.load(file) - + # schema_headers_dict = { # output_name: tomli.loads(path) for output_name, path in schema_paths.items() # } diff --git a/src/outputs/intram_by_itl.py b/src/outputs/intram_by_itl.py index f5fdc4904..49ebd0392 100644 --- a/src/outputs/intram_by_itl.py +++ b/src/outputs/intram_by_itl.py @@ -41,8 +41,7 @@ def save_detailed_csv( Returns: Dict[str, int]: A dictionary of intramural totals. """ - save_name = filename_amender(filename=title, - config=config) + save_name = filename_amender(filename=title, config=config) save_path = os.path.join(output_dir, save_name) if not overwrite and os.path.exists(save_path): raise FileExistsError( diff --git a/src/outputs/intram_by_pg.py b/src/outputs/intram_by_pg.py index 608e72398..ece8136fa 100644 --- a/src/outputs/intram_by_pg.py +++ b/src/outputs/intram_by_pg.py @@ -14,7 +14,7 @@ def output_intram_by_pg( config: Dict[str, Any], intram_tot_dict: Dict[str, int], write_csv: Callable, - uk_output: bool = False + uk_output: bool = False, ) -> Dict[str, int]: """Run the outputs module. @@ -35,9 +35,7 @@ def output_intram_by_pg( gb_df, ni_df, pg_detailed, config, uk_output ) - _save_output_intram_as_csv( - df_merge, config, write_csv, uk_output - ) + _save_output_intram_as_csv(df_merge, config, write_csv, uk_output) # calculate the intram total for QA across different outputs intram_tot_dict[f"intram_by_pg_{'uk' if uk_output else 'gb'}"] = round(value_tot, 0) @@ -50,7 +48,7 @@ def _generate_intarm_by_pg( ni_df: pd.DataFrame, pg_detailed: pd.DataFrame, config: Dict[str, Any], - uk_output: bool = False + uk_output: bool = False, ): """Generate the intramural by PG output dataframe and intramural by PG total. @@ -97,8 +95,9 @@ def _generate_intarm_by_pg( detail = "Detailed product groups (Alphabetical product groups A-AH)" notes = "Notes" survey_year = config["survey"]["survey_year"] - df_merge = df_merge[[detail, value_col, notes]].rename(columns={ - value_col: survey_year}) + df_merge = df_merge[[detail, value_col, notes]].rename( + columns={value_col: survey_year} + ) return df_merge, value_tot @@ -106,7 +105,7 @@ def _save_output_intram_as_csv( df_merge: pd.DataFrame, config: Dict[str, Any], write_csv: Callable, - uk_output: bool = False + uk_output: bool = False, ): """Save the intramural by PG output as a CSV file. @@ -123,9 +122,10 @@ def _save_output_intram_as_csv( # Outputting the CSV file output_path = config["outputs_paths"]["outputs_master"] - filename = (f"output_intram_by_pg_{'uk' if uk_output else 'gb'}") + filename = f"output_intram_by_pg_{'uk' if uk_output else 'gb'}" filename = filename_amender(filename, config) write_csv( f"{output_path}/output_intram_by_pg_{'uk' if uk_output else 'gb'}/{filename}", - df_merge) + df_merge, + ) diff --git a/src/outputs/intram_totals.py b/src/outputs/intram_totals.py index adfc114b1..71cc103fe 100644 --- a/src/outputs/intram_totals.py +++ b/src/outputs/intram_totals.py @@ -13,7 +13,7 @@ def output_intram_totals( intram_tot_dict: Dict[str, int], config: Dict[str, Any], write_csv: Callable, - ) -> None: +) -> None: """Output the intramural totals. Args: @@ -21,7 +21,7 @@ def output_intram_totals( config (dict): The configuration settings. write_csv (Callable): Function to write to a csv file. This will be the hdfs or network version depending on settings. - + Returns: None """ diff --git a/src/outputs/long_form.py b/src/outputs/long_form.py index b6f83748b..bad790496 100644 --- a/src/outputs/long_form.py +++ b/src/outputs/long_form.py @@ -15,7 +15,7 @@ def output_long_form( df: pd.DataFrame, config: Dict[str, Any], write_csv: Callable, - ): +): """Run the outputs module on long forms. Args: diff --git a/src/outputs/ni_sas.py b/src/outputs/ni_sas.py index 85c83c831..b60926d4e 100644 --- a/src/outputs/ni_sas.py +++ b/src/outputs/ni_sas.py @@ -22,7 +22,7 @@ def output_ni_sas( config (dict): The configuration settings. write_csv (Callable): Function to write to a csv file. This will be the hdfs or network version depending on settings. - """ + """ output_path = config["outputs_paths"]["outputs_master"] # Map the sizebands based on frozen employment diff --git a/src/outputs/outputs_main.py b/src/outputs/outputs_main.py index af0288326..09bbc175f 100644 --- a/src/outputs/outputs_main.py +++ b/src/outputs/outputs_main.py @@ -92,7 +92,10 @@ def run_outputs( # noqa: C901 if config["global"]["output_gb_sas"]: OutputMainLogger.info("Starting GB SAS output...") intram_tot_dict = output_gb_sas( - outputs_df, config, intram_tot_dict, write_csv, + outputs_df, + config, + intram_tot_dict, + write_csv, ) OutputMainLogger.info("Finished GB SAS output.") diff --git a/src/outputs/short_form.py b/src/outputs/short_form.py index 562f2ffbe..a19a6844c 100644 --- a/src/outputs/short_form.py +++ b/src/outputs/short_form.py @@ -99,7 +99,7 @@ def output_short_form( config (dict): The configuration settings. write_csv (Callable): Function to write to a csv file. This will be the hdfs or network version depending on settings. - + """ output_path = config["outputs_paths"]["outputs_master"] diff --git a/src/outputs/tau.py b/src/outputs/tau.py index 16dd5d397..f983ebce5 100644 --- a/src/outputs/tau.py +++ b/src/outputs/tau.py @@ -15,7 +15,7 @@ def output_tau( config: Dict[str, Any], intram_tot_dict: Dict[str, int], write_csv: Callable, - ) -> Dict[str, int]: +) -> Dict[str, int]: """Run the outputs module. Args: diff --git a/src/outputs/total_fte.py b/src/outputs/total_fte.py index c5c006d80..a0daa1b2f 100644 --- a/src/outputs/total_fte.py +++ b/src/outputs/total_fte.py @@ -9,7 +9,9 @@ def qa_output_total_fte( - df: pd.DataFrame, config: Dict[str, Any], write_csv: Callable, + df: pd.DataFrame, + config: Dict[str, Any], + write_csv: Callable, ): """Run the outputs module. diff --git a/src/pipeline.py b/src/pipeline.py index ba025359e..66c157c03 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -26,7 +26,7 @@ MainLogger = logging.getLogger(__name__) -def run_pipeline(user_config_path, dev_config_path): +def run_pipeline(user_config_path, dev_config_path): # noqa C901 """The main pipeline. Args: @@ -181,72 +181,36 @@ def run_pipeline(user_config_path, dev_config_path): ) MainLogger.info("Finished Mapping...") - if config["survey"]["survey_type"] == "PNP": - MainLogger.info( - "PNP is set so skipping modules:" "Imputation, Outliering and Estimation." - ) - - # Data processing: Apportionment to sites - - apportioned_responses_df, intram_tot_dict = run_site_apportionment( - mapped_df, - config, - mods.rd_write_csv, - ) - - MainLogger.info("Finished Site Apportionment module.") - - MainLogger.info("Starting Outputs...") - - run_outputs( - apportioned_responses_df, - ni_full_responses, - config, - intram_tot_dict, - mods.rd_write_csv, - pg_detailed, - sic_division_detailed, - ) - - run_id = runlog_obj.run_id - MainLogger.info(f"Finishing Pipeline run id {run_id}.........") - - runlog_obj.write_runlog() - runlog_obj.mark_mainlog_passed() + # Imputation module + MainLogger.info("Starting Imputation...") + imputed_df = run_imputation( + mapped_df, + manual_trimming_df, + backdata, + config, + mods.rd_write_csv, + ) + MainLogger.info("Finished Imputation...") - return runlog_obj.time_taken - else: - # Add any specific processing for BERD survey here if needed) - - # Imputation module - MainLogger.info("Starting Imputation...") - imputed_df = run_imputation( - mapped_df, - manual_trimming_df, - backdata, - config, - mods.rd_write_csv, - ) - MainLogger.info("Finished Imputation...") - - # Perform postcode construction now imputation is complete - run_postcode_construction = config["global"]["run_postcode_construction"] - if run_postcode_construction: - imputed_df = run_construction( - imputed_df, - config, - mods.rd_file_exists, - mods.rd_read_csv, - is_run_postcode_construction=True, - ) - - imputed_df = validate_updated_postcodes( + # Perform postcode construction now imputation is complete + run_postcode_construction = config["global"]["run_postcode_construction"] + if run_postcode_construction: + imputed_df = run_construction( imputed_df, - postcode_mapper, - itl_mapper, config, + mods.rd_file_exists, + mods.rd_read_csv, + is_run_postcode_construction=True, ) + imputed_df = validate_updated_postcodes( + imputed_df, + postcode_mapper, + itl_mapper, + config, + ) + + if config["survey"]["survey_type"] == "BERD": # Outlier detection module MainLogger.info("Starting Outlier Detection...") outliered_responses_df = run_outliers( @@ -261,31 +225,35 @@ def run_pipeline(user_config_path, dev_config_path): ) MainLogger.info("Finished Estimation module.") - # Data processing: Apportionment to sites - apportioned_responses_df, intram_tot_dict = run_site_apportionment( - estimated_responses_df, - config, - mods.rd_write_csv, - ) + elif config["survey"]["survey_type"] == "PNP": + MainLogger.info("PNP is set so skipping modules Outliering and Estimation.") + estimated_responses_df = imputed_df - MainLogger.info("Finished Site Apportionment module.") + # Data processing: Apportionment to sites + apportioned_responses_df, intram_tot_dict = run_site_apportionment( + estimated_responses_df, + config, + mods.rd_write_csv, + ) - MainLogger.info("Starting Outputs...") + MainLogger.info("Finished Site Apportionment module.") - run_outputs( - apportioned_responses_df, - ni_full_responses, - config, - intram_tot_dict, - mods.rd_write_csv, - pg_detailed, - sic_division_detailed, - ) + MainLogger.info("Starting Outputs...") + + run_outputs( + apportioned_responses_df, + ni_full_responses, + config, + intram_tot_dict, + mods.rd_write_csv, + pg_detailed, + sic_division_detailed, + ) - run_id = runlog_obj.run_id - MainLogger.info(f"Finishing Pipeline run id {run_id}.........") + run_id = runlog_obj.run_id + MainLogger.info(f"Finishing Pipeline run id {run_id}.........") - runlog_obj.write_runlog() - runlog_obj.mark_mainlog_passed() + runlog_obj.write_runlog() + runlog_obj.mark_mainlog_passed() - return runlog_obj.time_taken + return runlog_obj.time_taken diff --git a/src/site_apportionment/pnp_pre_processing.py b/src/site_apportionment/pnp_pre_processing.py deleted file mode 100644 index c83fcf093..000000000 --- a/src/site_apportionment/pnp_pre_processing.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging -import pandas as pd -import src.pipeline as src - - -from src.imputation.apportionment import run_apportionment -from src.imputation.imputation_helpers import imputation_marker - -def pnp_pre_processing(df: pd.DataFrame) -> pd.DataFrame: - """Pre-process the PNP data. - Args: - df (pd.DataFrame): the main dataset to pre-process - Returns: - pd.DataFrame: pre-processed dataframe - """ - df = run_apportionment(df) - # Add a column for imputation marker - df = imputation_marker(df) - # Add a column for a weight - df["a_weight"] = 1.0 - - return df \ No newline at end of file diff --git a/src/site_apportionment/site_apportionment_main.py b/src/site_apportionment/site_apportionment_main.py index 7ffa9bbf4..02a504e5c 100644 --- a/src/site_apportionment/site_apportionment_main.py +++ b/src/site_apportionment/site_apportionment_main.py @@ -1,9 +1,9 @@ """The main file for the Apportionment to sites module.""" + import logging import pandas as pd from typing import Callable, Dict, Any -from src.site_apportionment.pnp_pre_processing import pnp_pre_processing from src.site_apportionment.site_apportionment import run_apportion_sites from src.site_apportionment.output_status_filtered import ( output_status_filtered, @@ -37,7 +37,7 @@ def run_site_apportionment( to apportion for long forms """ if config["survey"]["survey_type"] == "PNP": - df = pnp_pre_processing(df) + df["a_weight"] = 1.0 # Create variable for output of QA apportionment file qa_path = config["apportionment_paths"]["qa_path"] diff --git a/src/staging/spp_snapshot_processing.py b/src/staging/spp_snapshot_processing.py index 114368b14..faba036c2 100644 --- a/src/staging/spp_snapshot_processing.py +++ b/src/staging/spp_snapshot_processing.py @@ -109,4 +109,3 @@ def response_rate(contributors: pd.DataFrame, responses: pd.DataFrame) -> float: SppProcessingLogger.info(f"The response rate is {int(rounded_resp_rate*100)}%") return response_rate - diff --git a/src/user_config.yaml b/src/user_config.yaml index 45a02b41e..ca88a1d62 100644 --- a/src/user_config.yaml +++ b/src/user_config.yaml @@ -26,7 +26,7 @@ global: # QA output settings output_full_responses: False output_ni_full_responses: False - output_mapping_qa: True + output_mapping_qa: False output_mapping_ni_qa: False output_imputation_qa: False output_auto_outliers: False @@ -140,7 +140,7 @@ export_choices: export_short_form: None export_long_form: None export_tau: None - export_gb_sas: None + export_gb_sas: "PNP_2023_output_gb_sas_24-11-19_v914.csv" export_ni_sas: None export_intram_by_pg_gb: None export_intram_by_pg_uk: None @@ -152,9 +152,10 @@ export_choices: export_fte_total_qa: None export_status_filtered: None export_frozen_group: None - export_staged_BERD_full_responses: "2023_staged_BERD_full_responses_24-11-26_v922.csv" - export_staged_full_responses: "2023_staged_full_responses_24-11-26_v922.csv" + export_staged_BERD_full_responses: None #"2023_staged_BERD_full_responses_24-11-26_v922.csv" + export_staged_full_responses: None # "PNP_2023_staged_full_responses_24-11-26_v922.csv" export_staged_NI_full_responses: None - export_full_responses_imputed: None + export_full_responses_imputed: "PNP_2023_full_responses_imputed_24-12-18_v971.csv" export_full_estimation_qa: None # "2022_full_estimation_qa_24-07-15_v555.csv" export_invalid_unrecognised_postcodes: None # "2022_invalid_unrecognised_postcodes_24-07-04_v503.csv" + export_full_responses_mapped: "PNP_2023_full_responses_mapped_24-12-11_v212.csv" diff --git a/src/utils/config.py b/src/utils/config.py index 4987477b9..d30c4907e 100644 --- a/src/utils/config.py +++ b/src/utils/config.py @@ -1,4 +1,5 @@ """Simple utils to assist the config.""" + from copy import deepcopy from typing import Union, Tuple, Dict diff --git a/src/utils/hdfs_mods.py b/src/utils/hdfs_mods.py index 05bce6649..04d68e958 100644 --- a/src/utils/hdfs_mods.py +++ b/src/utils/hdfs_mods.py @@ -14,7 +14,7 @@ import subprocess import os import pathlib -from typing import List, Union +from typing import Union import yaml @@ -33,8 +33,8 @@ def rd_read_csv(filepath: str, **kwargs) -> pd.DataFrame: - """Reads a csv from HDFS into a Pandas Dataframe using pydoop. - If "thousands" argument is not specified, sets it to ",". + """Reads a csv from HDFS into a Pandas Dataframe using pydoop. + If "thousands" argument is not specified, sets it to ",". Allows to use any additional keyword arguments of Pandas read_csv method. Args: @@ -48,7 +48,7 @@ def rd_read_csv(filepath: str, **kwargs) -> pd.DataFrame: # If "thousands" argument is not specified, set it to "," if "thousands" not in kwargs: kwargs["thousands"] = "," - + # Read the scv file using the path and keyword arguments try: df = pd.read_csv(file, **kwargs) diff --git a/src/utils/postcode_reduction_helper.py b/src/utils/postcode_reduction_helper.py index f8cc3edef..cbe9e87a1 100644 --- a/src/utils/postcode_reduction_helper.py +++ b/src/utils/postcode_reduction_helper.py @@ -17,7 +17,6 @@ def run_postcode_reduction(user_config_path, dev_config_path): - # load and validate the config config = config_setup(user_config_path, dev_config_path) diff --git a/src/utils/runlog.py b/src/utils/runlog.py index 5878c9f82..bd30cd85e 100644 --- a/src/utils/runlog.py +++ b/src/utils/runlog.py @@ -1,4 +1,5 @@ import os + # import csv from datetime import datetime from typing import Tuple @@ -63,7 +64,7 @@ def _create_run_id(self): # Check if the dataframe has at least one data row if len(runfile): latest_id = max(runfile.run_id) - + # increment the latest id by 1 run_id = latest_id + 1 return run_id @@ -137,7 +138,7 @@ def log_csv_creator(self, filepath: str, columns: list) -> None: Returns: None """ - # Check if the runolg file exists. + # Check if the runolg file exists. if not self.file_exists_func(filepath): # Create an empty dataframe with column names df = pd.DataFrame(columns=columns) diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index c36df5e4c..8ce2f4fad 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -16,18 +16,12 @@ Read feather - possibly, not needed Write to feather - possibly, not needed """ - -# Standard libraries import json import logging - -# Third party libraries import pandas as pd -from io import StringIO, TextIOWrapper, BytesIO - +from io import StringIO, TextIOWrapper -# Local libraries from rdsa_utils.cdp.helpers.s3_utils import ( file_exists, create_folder_on_s3, @@ -35,10 +29,9 @@ is_s3_directory, copy_file, move_file, - validate_bucket_name, - validate_s3_file_path, ) from src.utils.singleton_boto import SingletonBoto + # from src.utils.singleton_config import SingletonConfig # set up logging, boto3 client and s3 bucket @@ -99,9 +92,7 @@ def rd_write_csv(filepath: str, data: pd.DataFrame) -> None: csv_buffer.seek(0) # Write the buffer into the s3 bucket - _ = s3_client.put_object( - Bucket=s3_bucket, Body=csv_buffer.getvalue(), Key=filepath - ) + _ = s3_client.put_object(Bucket=s3_bucket, Body=csv_buffer.getvalue(), Key=filepath) return None @@ -136,9 +127,7 @@ def rd_file_exists(filepath: str, raise_error=False) -> bool: result (bool): A boolean value which is true if the file exists. """ - result = file_exists( - client=s3_client, bucket_name=s3_bucket, object_name=filepath - ) + result = file_exists(client=s3_client, bucket_name=s3_bucket, object_name=filepath) if not result and raise_error: raise FileExistsError(f"File: {filepath} does not exist") @@ -188,7 +177,7 @@ def rd_file_size(filepath: str) -> int: """ _response = s3_client.head_object(Bucket=s3_bucket, Key=filepath) - file_size = _response['ContentLength'] + file_size = _response["ContentLength"] return file_size @@ -215,10 +204,7 @@ def rd_md5sum(filepath: str) -> str: """ try: - md5result = s3_client.head_object( - Bucket=s3_bucket, - Key=filepath - )['ETag'][1:-1] + md5result = s3_client.head_object(Bucket=s3_bucket, Key=filepath)["ETag"][1:-1] except s3_client.exceptions.ClientError as e: s3_logger.error(f"Failed to compute the md5 checksum: {str(e)}") md5result = None @@ -236,18 +222,16 @@ def rd_isdir(dirpath: str) -> bool: """ # The directory name must end with forward slash - if not dirpath.endswith('/'): - dirpath = dirpath + '/' + if not dirpath.endswith("/"): + dirpath = dirpath + "/" # Any slashes at the beginning should be removed - while dirpath.startswith('/'): + while dirpath.startswith("/"): dirpath = dirpath[1:] # Use the function from rdsa_utils response = is_s3_directory( - client=s3_client, - bucket_name=s3_bucket, - object_name=dirpath + client=s3_client, bucket_name=s3_bucket, object_name=dirpath ) return response @@ -297,7 +281,7 @@ def rd_read_header(path: str) -> str: status (bool): True if the dirpath is a directory, false otherwise. """ # Create an input/output stream pointer, same as open - stream = TextIOWrapper(s3_client.get_object(Bucket=s3_bucket, Key=path)['Body']) + stream = TextIOWrapper(s3_client.get_object(Bucket=s3_bucket, Key=path)["Body"]) # Read the first line from the stream response = stream.readline() @@ -320,9 +304,7 @@ def rd_write_string_to_file(content: bytes, filepath: str): str_buffer.seek(0) # Write the buffer into the s3 bucket - _ = s3_client.put_object( - Bucket=s3_bucket, Body=str_buffer.getvalue(), Key=filepath - ) + _ = s3_client.put_object(Bucket=s3_bucket, Body=str_buffer.getvalue(), Key=filepath) return None @@ -336,7 +318,7 @@ def _path_long2short(path: "str") -> str: """ if "/" in path: last_slash = path.rfind("/") - return path[last_slash + 1:] + return path[last_slash + 1 :] else: return path @@ -358,14 +340,14 @@ def rd_copy_file(src_path: str, dst_path: str) -> bool: removed. This is needed for the library method copy_file to work correctly. Library method copy_file requires that the paths are file paths: - old_dir/old.file and new_dir/new.file. The rd_copy_file takes full file name + old_dir/old.file and new_dir/new.file. The rd_copy_file takes full file name with the full file path as a source, and just a directory path as a destination, like this: old_dir/old.file and new_dir/ or new_dir without the - slash at the end. old.file will become new_dir/old.file, i.e. the file is + slash at the end. old.file will become new_dir/old.file, i.e. the file is copied with the same name, not renamed. - Supplementary function _path_long2short decouples old.file from the full + Supplementary function _path_long2short decouples old.file from the full source path and "glues it" to the end of destination path. - + Args: src_path (string): Full path of the source file, not including the bucket name, but including the quasi-directories and slashes preceding @@ -408,7 +390,7 @@ def rd_move_file(src_path: str, dst_path: str) -> bool: source_bucket_name=s3_bucket, source_object_name=src_path, destination_bucket_name=s3_bucket, - destination_object_name=dst_path + destination_object_name=dst_path, ) return success @@ -433,8 +415,8 @@ def processLocation(root, prefixLocal, location): if prefixLocal not in root: root[prefixLocal] = (set(), set()) # check how many folders are available after prefix - remainder = location[len(prefixLocal):] - structure = remainder.split('/') + remainder = location[len(prefixLocal) :] + structure = remainder.split("/") # If we are not yet in the folder of the file we need to continue with # a larger prefix @@ -442,7 +424,7 @@ def processLocation(root, prefixLocal, location): # add folder dir root[prefixLocal][0].add(structure[0]) # make sure file is added allong the way - processLocation(root, prefixLocal + '/' + structure[0], location) + processLocation(root, prefixLocal + "/" + structure[0], location) else: # add to file root[prefixLocal][1].add(structure[0]) @@ -474,11 +456,10 @@ def rd_search_file(dir_path: str, ending: str) -> str: response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=dir_path) # retrieve key values - locations = [object['Key'] for object in response['Contents']] + locations = [object["Key"] for object in response["Contents"]] for _, (__, files) in s3walk(locations, dir_path): for file in files: - # Check for ending if file.endswith(ending): target_file = str(file) diff --git a/src/utils/singleton_boto.py b/src/utils/singleton_boto.py index e07487285..c2358b2ed 100644 --- a/src/utils/singleton_boto.py +++ b/src/utils/singleton_boto.py @@ -4,7 +4,7 @@ import boto3 import raz_client - + class SingletonBoto: _instance = None _bucket = None @@ -16,10 +16,7 @@ def __init__(self): def get_client(cls, config={}): if cls._instance is None: client = boto3.client("s3") - raz_client.configure_ranger_raz( - client, - ssl_file=config["s3"]["ssl_file"] - ) + raz_client.configure_ranger_raz(client, ssl_file=config["s3"]["ssl_file"]) cls._bucket = config["s3"]["s3_bucket"] cls._instance = client return cls._instance diff --git a/tests/test_construction/test_construction_helpers.py b/tests/test_construction/test_construction_helpers.py index 840c282e4..09a998c9a 100644 --- a/tests/test_construction/test_construction_helpers.py +++ b/tests/test_construction/test_construction_helpers.py @@ -452,6 +452,3 @@ def test_replace_values_in_construction(self): expected_snapshot_output, check_dtype=False ) - - - diff --git a/tests/test_construction/test_construction_validation.py b/tests/test_construction/test_construction_validation.py index d8245c21e..4150f8a98 100644 --- a/tests/test_construction/test_construction_validation.py +++ b/tests/test_construction/test_construction_validation.py @@ -264,5 +264,3 @@ def test_validate_short_to_long_raises(self, short_to_long_df): ) with pytest.raises(ValueError, match=msg): validate_short_to_long(short_to_long_df) - - diff --git a/tests/test_estimation/test_calculate_weights.py b/tests/test_estimation/test_calculate_weights.py index 2c35120d4..5a3a4a510 100644 --- a/tests/test_estimation/test_calculate_weights.py +++ b/tests/test_estimation/test_calculate_weights.py @@ -454,4 +454,4 @@ def test_outlier_weights(self): expected_df = self.create_expected_output() result_df = calw.outlier_weights(input_df) - assert_frame_equal(result_df, expected_df) \ No newline at end of file + assert_frame_equal(result_df, expected_df) diff --git a/tests/test_freezing/test_freezing_apply_changes.py b/tests/test_freezing/test_freezing_apply_changes.py index 639f8a528..6a1e6a9cb 100644 --- a/tests/test_freezing/test_freezing_apply_changes.py +++ b/tests/test_freezing/test_freezing_apply_changes.py @@ -186,7 +186,7 @@ def expected_additions(self) -> pd.DataFrame: df = pd.DataFrame(data=data, columns=columns) return df - + def test_apply_additions(self, frozen_df, dummy_additions): """General tests for apply_additions""" amended = apply_additions(frozen_df, dummy_additions, {"filename_items": {"run_id": "1"}}, test_logger) diff --git a/tests/test_freezing/test_freezing_compare.py b/tests/test_freezing/test_freezing_compare.py index c6ebe3b51..b13f31f48 100644 --- a/tests/test_freezing/test_freezing_compare.py +++ b/tests/test_freezing/test_freezing_compare.py @@ -242,6 +242,3 @@ def test_bring_together_split_cases(self): assert_frame_equal( expected_amendments_df, result_amendments_df.reset_index(drop=True) ) - - - diff --git a/tests/test_freezing/test_freezing_utils.py b/tests/test_freezing/test_freezing_utils.py index fc5c10d96..1e3259870 100644 --- a/tests/test_freezing/test_freezing_utils.py +++ b/tests/test_freezing/test_freezing_utils.py @@ -29,4 +29,4 @@ def test__add_last_frozen_column(self): ) assert last_frozen_df.last_frozen.unique()[0] == exp_last_frozen, ( "_add_last_frozen_column not behaving as expected." - ) \ No newline at end of file + ) diff --git a/tests/test_imputation/conftest.py b/tests/test_imputation/conftest.py index ffa58e6a7..0a6558294 100644 --- a/tests/test_imputation/conftest.py +++ b/tests/test_imputation/conftest.py @@ -26,3 +26,28 @@ def imputation_config() -> dict: } } return config + + +@pytest.fixture(scope="module") +def pnp_imputation_config() -> dict: + """A dummy imputation config for running imputation tetsts.""" + config = { + "survey": { + "survey_type": "PNP", + "survey_year": 2023 + }, + "imputation": { + "mor_threshold": 3, + "trim_threshold": 10, + "lower_trim_perc": 15, + "upper_trim_perc": 15, + "sf_expansion_threshold": 3, + "lf_target_vars": ["211", "emp_researcher", "emp_technician"], + "sum_cols": ["emp_total"] + }, + "breakdowns": { + "211": ["212", "214", "216"], + "emp_total": ["emp_researcher", "emp_technician"] + } + } + return config diff --git a/tests/test_imputation/test_MoR.py b/tests/test_imputation/test_MoR.py index 756711c96..b1d2bf458 100644 --- a/tests/test_imputation/test_MoR.py +++ b/tests/test_imputation/test_MoR.py @@ -9,11 +9,40 @@ from pandas.testing import assert_frame_equal # Local Imports -from src.imputation.MoR import run_mor +from src.imputation.MoR import run_mor, is_lf_only from src.imputation.imputation_helpers import get_imputation_cols # pytestmark = pytest.mark.runwip +class TestIsLfOnly(object): + """Tests for is_lf_only.""" + def test_pnp_survey(self): + config = { + "survey": { + "survey_type": "PNP", + "survey_year": 2021 + } + } + assert is_lf_only(config) == True + + def test_berd_2021_backdata(self): + config = { + "survey": { + "survey_type": "BERD", + "survey_year": 2022 + } + } + assert is_lf_only(config) == True + + def test_neither_condition(self): + config = { + "survey": { + "survey_type": "BERD", + "survey_year": 2021 + } + } + assert is_lf_only(config) == False + class TestRunMoRLongForm(object): """Tests for run_mor."""