Skip to content

Commit

Permalink
Merge pull request #80 from yibeichan/fix-readonly
Browse files Browse the repository at this point in the history
fix isVis based on Field annotation
  • Loading branch information
satra authored Dec 4, 2024
2 parents e8aeb0d + b84b05b commit 813005f
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 55 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ dependencies = [
"pyyaml",
"beautifulsoup4",
"lxml",
"pydantic >= 2.0"
"pydantic >= 2.0",
"pandas"
]
description = "Reproschema Python library"
# Version from setuptools_scm
Expand Down
171 changes: 126 additions & 45 deletions reproschema/redcap2reproschema.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import csv
import os
import re
from pathlib import Path

import pandas as pd
import yaml
from bs4 import BeautifulSoup

Expand All @@ -17,7 +17,7 @@
"Field Note": "description",
# TODO: often "Field Annotation" has "@HIDDEN" and other markers
# TODO: not sure if this can be every treated as description
# "Field Annotation": "description", # column R
# "Field Annotation": "isVis", # column R
"Section Header": "preamble", # column C (need double-check)
"Field Label": "question", # column E
"Field Type": "inputType", # column D
Expand Down Expand Up @@ -82,7 +82,9 @@ def clean_header(header):
cleaned_header = {}
for k, v in header.items():
# Strip BOM, whitespace, and enclosing quotation marks if present
cleaned_key = k.lstrip("\ufeff").strip().strip('"')
cleaned_key = (
k.lstrip("\ufeff").strip().strip('"') if isinstance(k, str) else k
)
cleaned_header[cleaned_key] = v
return cleaned_header

Expand All @@ -99,6 +101,12 @@ def normalize_condition(condition_str, field_type=None):
return False
elif condition_str is None:
return None
elif not isinstance(condition_str, str):
# Convert non-string types to string, or return as is if conversion doesn't make sense
try:
condition_str = str(condition_str)
except:
return condition_str

re_parentheses = re.compile(r"\(([0-9]*)\)")
re_non_gt_lt_equal = re.compile(r"([^>|<])=")
Expand Down Expand Up @@ -137,17 +145,42 @@ def process_field_properties(data):
else:
condition = True

# Check Field Annotation for special flags - safely handle non-string values
annotation = (
str(data.get("Field Annotation", "")).upper()
if data.get("Field Annotation") is not None
else ""
)
if (
condition
and isinstance(annotation, str)
and (
"@READONLY" in annotation
or "@HIDDEN" in annotation
or "@CALCTEXT" in annotation
)
):
condition = False

prop_obj = {
"variableName": data["Variable / Field Name"],
"isAbout": f"items/{data['Variable / Field Name']}",
"isVis": condition,
}
if data["Required Field?"]:
if data["Required Field?"] in "y":

# Handle Required Field check, accounting for NaN values and empty strings
required_field = data.get("Required Field?")
if (
pd.notna(required_field) and str(required_field).strip()
): # Check if value is not NaN and not empty
if str(required_field).lower() == "y":
prop_obj["valueRequired"] = True
else:
raise (
f"value {data['Required Field?']} not supported yet for redcap:Required Field?"
elif str(required_field).lower() not in [
"",
"n",
]: # Only raise error for unexpected values
raise ValueError(
f"value {required_field} not supported yet for redcap:Required Field?"
)
return prop_obj

Expand Down Expand Up @@ -246,6 +279,16 @@ def process_choices(choices_str, field_name):

def parse_html(input_string, default_language="en"):
result = {}

# Handle non-string input
if not isinstance(input_string, str):
if pd.isna(input_string): # Handle NaN values
return {default_language: ""}
try:
input_string = str(input_string)
except:
return {default_language: str(input_string)}

soup = BeautifulSoup(input_string, "html.parser")

lang_elements = soup.find_all(True, {"lang": True})
Expand Down Expand Up @@ -284,19 +327,30 @@ def process_row(

field_type = field.get("Field Type", "")
input_type, value_type = parse_field_type_and_value(field)
rowData["ui"] = {"inputType": input_type}

# Initialize ui object with common properties
ui_obj = {"inputType": input_type}

# Handle readonly status first - this affects UI behavior
annotation = str(field.get("Field Annotation", "")).upper()
if (
field_type in COMPUTE_LIST
or "@READONLY" in annotation
or "@CALCTEXT" in annotation
):
ui_obj["readonlyValue"] = True

rowData["ui"] = ui_obj
rowData["responseOptions"] = {"valueType": [value_type]}

# setting additional fields for some field types
# Handle specific field type configurations
if field_type == "yesno":
rowData["responseOptions"]["choices"] = [
{"name": {"en": "Yes"}, "value": 1},
{"name": {"en": "No"}, "value": 0},
]
elif field_type == "checkbox":
rowData["responseOptions"]["multipleChoice"] = True
elif field_type in COMPUTE_LIST:
rowData["ui"]["readonlyValue"] = True

for key, value in field.items():
if SCHEMA_MAP.get(key) in ["question", "description"] and value:
Expand Down Expand Up @@ -498,52 +552,79 @@ def parse_language_iso_codes(input_string):
]


def process_csv(
csv_file,
abs_folder_path,
schema_context_url,
protocol_name,
):
def process_csv(csv_file, abs_folder_path, schema_context_url, protocol_name):
datas = {}
order = {}
compute = {}
languages = []

with open(csv_file, mode="r", encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
row = clean_header(row)
form_name = row["Form Name"]
if form_name not in datas:
datas[form_name] = []
order[form_name] = []
compute[form_name] = []
os.makedirs(
f"{abs_folder_path}/activities/{form_name}/items",
exist_ok=True,
)
# Read CSV with explicit BOM handling, and maintain original order
df = pd.read_csv(
csv_file, encoding="utf-8-sig"
) # utf-8-sig handles BOM automatically

datas[form_name].append(row)
# Clean column names (headers)
df.columns = df.columns.map(
lambda x: x.strip().strip('"').lstrip("\ufeff")
)

# TODO: should we bring back the language
# if not languages:
# languages = parse_language_iso_codes(row["Field Label"])
# Clean string values in the dataframe
object_columns = df.select_dtypes(include=["object"]).columns
for col in object_columns:
df[col] = df[col].astype(str).replace("nan", "")

# Initialize structures for each unique form
unique_forms = df["Form Name"].unique()
for form_name in unique_forms:
datas[form_name] = []
order[form_name] = []
compute[form_name] = []
os.makedirs(
f"{abs_folder_path}/activities/{form_name}/items", exist_ok=True
)

field_name = row["Variable / Field Name"]
if row.get("Field Type", "") in COMPUTE_LIST:
# TODO: this right now doesn't give jsExpression
condition = normalize_condition(
row["Choices, Calculations, OR Slider Labels"],
field_type=row["Field Type"],
)
# TODO: should we bring back the language
# if not languages:
# languages = parse_language_iso_codes(row["Field Label"])

# Process rows in original order
for _, row in df.iterrows():
form_name = row["Form Name"]
field_name = row["Variable / Field Name"]
field_type = row.get("Field Type", "")
field_annotation = row.get("Field Annotation")

# Add row data to datas dictionary
datas[form_name].append(row.to_dict())

if field_type in COMPUTE_LIST:
condition = normalize_condition(
row["Choices, Calculations, OR Slider Labels"],
field_type=field_type,
)
compute[form_name].append(
{
"variableName": field_name,
"jsExpression": condition,
}
)
elif (
isinstance(field_annotation, str)
and "@CALCTEXT" in field_annotation.upper()
):
calc_text = field_annotation
match = re.search(r"@CALCTEXT\((.*)\)", calc_text)
if match:
js_expression = match.group(1)
js_expression = normalize_condition(js_expression)
compute[form_name].append(
{
"variableName": field_name,
"jsExpression": condition,
"jsExpression": js_expression,
}
)
else:
order[form_name].append(f"items/{field_name}")
else:
order[form_name].append(f"items/{field_name}")

os.makedirs(f"{abs_folder_path}/{protocol_name}", exist_ok=True)
return datas, order, compute, languages
Expand Down
117 changes: 117 additions & 0 deletions reproschema/tests/test_field_property.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import csv

import pytest

from ..redcap2reproschema import process_field_properties


def test_process_field_properties_calctext():
"""Test different CALCTEXT annotations with realistic examples"""
test_cases = [
# Simple CALCTEXT
{
"input": {
"Variable / Field Name": "test_var",
"Required Field?": "",
"Field Annotation": "@CALCTEXT",
"Branching Logic (Show field only if...)": "",
},
"expected": {
"variableName": "test_var",
"isAbout": "items/test_var",
"isVis": False,
},
},
# Complex CALCTEXT with conditional logic
{
"input": {
"Variable / Field Name": "parkinsons_diagnosis",
"Required Field?": "",
"Field Annotation": "@CALCTEXT(if(([diagnosis_parkinsons_gsd_category_1(bradykinesia)] && ([diagnosis_parkinsons_gsd_category_1(tremor)] || [diagnosis_parkinsons_gsd_category_1(rigidity)])), 'Yes', 'No'))",
"Branching Logic (Show field only if...)": "[some_other_condition] = 1",
},
"expected": {
"variableName": "parkinsons_diagnosis",
"isAbout": "items/parkinsons_diagnosis",
"isVis": False,
},
},
# CALCTEXT with numerical operations
{
"input": {
"Variable / Field Name": "bmi",
"Required Field?": "",
"Field Annotation": "@CALCTEXT([weight]/([height]*[height]))",
"Branching Logic (Show field only if...)": "[weight] > 0 and [height] > 0",
},
"expected": {
"variableName": "bmi",
"isAbout": "items/bmi",
"isVis": False,
},
},
# CALCTEXT with multiple nested conditions
{
"input": {
"Variable / Field Name": "complex_score",
"Required Field?": "",
"Field Annotation": "@CALCTEXT(if([score1] > 10 && [score2] < 5, 'High', if([score1] > 5, 'Medium', 'Low')))",
"Branching Logic (Show field only if...)": "",
},
"expected": {
"variableName": "complex_score",
"isAbout": "items/complex_score",
"isVis": False,
},
},
]

for test_case in test_cases:
result = process_field_properties(test_case["input"])
for key, expected_value in test_case["expected"].items():
assert (
result[key] == expected_value
), f"Failed for {key} in test case with annotation: {test_case['input']['Field Annotation']}"


def test_process_field_properties_mixed_annotations():
"""Test fields with multiple annotations"""
test_cases = [
# CALCTEXT with READONLY
{
"input": {
"Variable / Field Name": "test_var",
"Required Field?": "",
"Field Annotation": "@CALCTEXT @READONLY",
"Branching Logic (Show field only if...)": "",
},
"expected": {"isVis": False},
},
# CALCTEXT with HIDDEN
{
"input": {
"Variable / Field Name": "test_var",
"Required Field?": "",
"Field Annotation": "@HIDDEN @CALCTEXT(if([var1] > 0, 1, 0))",
"Branching Logic (Show field only if...)": "",
},
"expected": {"isVis": False},
},
# Complex CALCTEXT with other annotations
{
"input": {
"Variable / Field Name": "test_var",
"Required Field?": "",
"Field Annotation": "@CALCTEXT(if(([var1] && [var2]), 'Yes', 'No')) @READONLY @HIDDEN-SURVEY",
"Branching Logic (Show field only if...)": "[condition] = 1",
},
"expected": {"isVis": False},
},
]

for test_case in test_cases:
result = process_field_properties(test_case["input"])
for key, expected_value in test_case["expected"].items():
assert (
result[key] == expected_value
), f"Failed for {key} in test case with annotation: {test_case['input']['Field Annotation']}"
Loading

0 comments on commit 813005f

Please sign in to comment.