Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

writing fhir data #50

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,82 @@ from (select timestamp, bundleUUID, explode(MessageHeader) as messageheader from
limit 10
```

## Usage: Writing FHIR Data
## Usage: Writing FHIR Data Using No Code/Low Code

Writing FHIR is supported from Dataframes into standard FHIR schemas thanks to contributions from our partners at [XponentL Data](https://xponentl.ai/). This can be accomplished only by defining a mapping of src column to FHIR column and the export is by row as a FHIR bundle.

e.g.

``` python
from dbignite.writer.bundler import *
from dbignite.writer.fhir_encoder import *

# Create a dummy Dataframe with 2 rows of data
data = spark.createDataFrame([('CLM123', 'PAT01', 'COH123'), ('CLM345', 'PAT02', 'COH123')],['CLAIM_ID', 'PATIENT_ID', 'PATIENT_COHORT_NUM'])

# Define a mapping from DF columns to FHIR Schema, including a hardcoded value for Patient.identifier.system
maps = [Mapping('CLAIM_ID', 'Claim.id'),
Mapping('PATIENT_COHORT_NUM', 'Patient.identifier.value'),
Mapping('<url of a hardcoded system reference>', 'Patient.identifier.system', True),
Mapping('PATIENT_ID', 'Patient.id')]

# Instance of the encoder & bundle writer
# - Encoder transforms data to valid FHIR format in Spark
# - bundler maps data to json format
m = MappingManager(maps, data.schema)
b = Bundle(m)
result = b.df_to_fhir(data)

#Pretty printing the resulting RDD
import json
result.map(lambda x: json.loads(x)).foreach(lambda x: print(json.dumps(x, indent=4)))
"""
#Row 1 in FHIR format
{
"resourceType": "Bundle",
"entry": [
{
"resourceType": "Claim",
"id": "CLM123"
},
{
"resourceType": "Patient",
"id": "PAT01",
"identifier": [
{
"system": "<url of a hardcoded system reference>",
"value": "COH123"
}
]
}
]
}
#Row 2 in FHIR format
{
"resourceType": "Bundle",
"entry": [
{
"resourceType": "Claim",
"id": "CLM345"
},
{
"resourceType": "Patient",
"id": "PAT02",
"identifier": [
{
"system": "<url of a hardcoded system reference>",
"value": "COH123"
}
]
}
]
}
"""
```

For limitations and more advanced usage, see [sample notebook](https://github.com/databrickslabs/dbignite/tree/main/dbignite/writer](https://github.com/databrickslabs/dbignite/blob/main/notebooks/dbignite_patient_sample.py)


> **Warning**
> This section is under construction

## Internal Representation of a FHIR Bundle in DBIgnite

Expand Down
44 changes: 44 additions & 0 deletions dbignite/writer/bundler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json

class Bundle():

#
# Create bundles from FHIR resources
#
def __init__(self, mm):
self.mm = mm


#
# Return new FHIR resource for each row
#
def df_to_fhir(self, df):
return self._encode_to_json(self._encode_df(df))

def _encode_df(self, df):
return (df
.rdd
.map(lambda row:
list(map(lambda resourceType: self.mm.encode(row, resourceType), self.mm.fhir_resource_list())
))
)


#
# Given an RDD of rows, return
#
def _encode_to_json(self, rdd):
return (
rdd
.map(lambda row: [self._resource_to_fhir(x) for x in row])
.map(lambda row: {'resourceType': 'Bundle', 'entry': row})
.map(lambda row: json.dumps(row))
)


#
# Given an encoded row, return a single resource value
#
def _resource_to_fhir(self, resource):
return {'resourceType': list(resource.keys())[0], **resource[list(resource.keys())[0]] }

218 changes: 218 additions & 0 deletions dbignite/writer/fhir_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
from pyspark.sql.types import *
from pyspark.sql.types import _infer_type
from dbignite.fhir_mapping_model import FhirSchemaModel
from itertools import groupby, chain
from collections import ChainMap


class MappingManager():

#
# Mappings
#
def __init__(self, mappings, src_schema, em = None):
self.mappings = mappings
self.src_schema = src_schema
if len([x.fhir_resource() for x in self.mappings]) > 1:
Exception("Support for only 1 FHIR resource within a mapping at a time")
self.em = em if em is not None else FhirEncoderManager()

#
# Given a tgt_resource type, return all encoded values
#
def encode(self, row, fhirResourceType):
data = [(resourceType, mappings) for resourceType, mappings in self.level(0) if resourceType[0] == fhirResourceType][0]
return self.to_fhir(data[0], data[1], row.asDict())

#
# Given a target field, get the source mapping else none
# tgt = array of ["Patient","identifier","value"]
#
def get_src(self, tgt):
return None if len([x for x in self.mappings if x.tgt == ".".join(tgt)]) == 0 else [x for x in self.mappings if x.tgt == ".".join(tgt)][0]



#
# Get the func needed to transform x to y
# @param tgt = ["Patient", "identifier", "value"]
#
#
def get_func(self, tgt):
return self.em.get_encoder(
src_type = None if self.get_src(tgt) is None else SchemaDataType.traverse_schema(self.get_src(tgt).src.split("."), self.src_schema),
tgt_name = tgt
)

#
# fhir_resources to be mapped
#
def fhir_resource_list(self):
return list(set([x.fhir_resource() for x in self.mappings]))

"""
# Return a grouping of all resources that match at a level
# @param level = numeric value starting at 0 = FHIR_RESOURCE
#
# @return - list of lists that are at the same level, each value is a tuples
# tuple - (<matcing fhir path>, <list of mapping objects matched>)
#
e.g. level=3 [(['Patient', 'identifier', 'value'], [<dbignite.writer.fhir_encoder.Mapping object at 0x104ef81f0>]),
(['Patient', 'identifier', 'system'], [<dbignite.writer.fhir_encoder.Mapping object at 0x104eaef40>])]

e.g. level=2 [(['Patient', 'identifier'],
[<dbignite.writer.fhir_encoder.Mapping object at 0x104ef81f0>,
<dbignite.writer.fhir_encoder.Mapping object at 0x104eaef40>])]

e.g. level=2 [('Pateint', 'extension'],
<dbignite.writer.fhir_encoder.Mapping object at 0x104eaef40>])
<dbignite.writer.fhir_encoder.Mapping object at 0x104eaef40>])]

"""

#
# Level number to match
# if mapping provided, only provide level that intersects with this mapping
#
def level(self, level, resources = None):
mappings = resources if resources is not None else self.mappings
return [(k,list(g)) for k,g, in groupby(mappings, lambda x: x.tgt.split(".")[:level+1]) if len(k) >= level+1]

def to_fhir(self, tgt_prefix, resource_list, row_dict):
field_name = tgt_prefix[-1:][0]
#converting an individual field to fhir
if len(resource_list) == 1 and ".".join(tgt_prefix) == resource_list[0].tgt:
return {
field_name: (self.get_func(tgt_prefix).f(row_dict.get(resource_list[0].src)) if resource_list[0].hardcoded == False else resource_list[0].src )
}
#converting multiple fields to a single field in FHIR
elif len(resource_list) > 1 and ".".join(tgt_prefix) == resource_list[0].tgt:
return {
field_name: self.em.get_encoder("array<" + _infer_type(type(resource_list[0].src).__name__).simpleString() + ">", tgt_prefix).f([row_dict.get(x.src) for x in resource_list if row_dict.get(x.src) is not None])
}
else:
return { field_name: self.get_func(tgt_prefix).f(
[self.to_fhir(prefix,resources, row_dict) for prefix,resources in self.level(len(tgt_prefix), resource_list)]
)}

class Mapping():

def __init__(self, src, tgt, hardcoded = False):
self.src = src
self.tgt = tgt
self.hardcoded = hardcoded

def fhir_resource(self):
return self.tgt.split(".")[0]

def __str__(self):
return "src:" + str(self.src) +", tgt:" + (self.tgt)

#
# Holds logic for a single encoding
#
class FhirEncoder():

#
# @param src - source value class type
# @param tgt - target value class type
# @param one_to_one - exact match true/false
# @param precision_loss - true/false if converting from source to target loses value
#
def __init__(self, one_to_one, precision_loss, f, default = ''):
self.one_to_one = one_to_one
self.precision_loss = precision_loss
self.f = self.handle(f)
self.default = default

def handle(self, f):
def wrapper_func(*args, **kw):
try:
return f(*args, **kw)
except:
return self.default
return wrapper_func

#
# Logic for all bindings in fhir translation logic
#
class FhirEncoderManager():
"""
A class for converting values between different types.
"""

#
# @param map - dictionary of key/value pairs for encoding values through lambda functions
# @parma override_encoders - override functions to run when encountering a tgt value,
# - e.g. patient.name.given
#
def __init__(self, map = None, override_encoders = {}, fhir_schema = FhirSchemaModel()):
self.map = map if map is not None else self.DEFAULT_ENCODERS
self.override_encoders = override_encoders
self.fhir_schema = fhir_schema

#src python binding, tgt Spark Schema.typeName() binding
DEFAULT_ENCODERS = {
"IDENTITY": FhirEncoder(True, False, lambda x: x),
"string": {
"string": FhirEncoder(True, False, lambda x: x),
"integer": FhirEncoder(False, False, lambda x: int(x.strip())),
"float": FhirEncoder(False, False, lambda x: float(x.strip())),
"double": FhirEncoder(False, False, lambda x: float(x.strip())),
"bool": FhirEncoder(False, False, lambda x: bool(x.strip())),
"array<string>": FhirEncoder(False, False, lambda x: [x])
},
"array<string>":{
"string": FhirEncoder(False, False, lambda x: ','.join(x))
},
"integer": {
"string": FhirEncoder(False, True, lambda x: str)
},
"struct": FhirEncoder(False, True, lambda l: dict(ChainMap(*l))),
"array<struct>": FhirEncoder(False, True, lambda l: [dict(ChainMap(*l))] ) #default behavior to union dictionary
}


#
# Get encoders for src->tgt type. If src_type is None, default to just getting tgt type
#
def _get_encoder(self, src_type, tgt_type):
return self.map.get("IDENTITY") if src_type == tgt_type else self.map.get(src_type, {}).get(tgt_type, self.map.get(tgt_type))

#
# @param tgt_name - target field name
# @param src_type - the source spark data type
#
def get_encoder(self, src_type, tgt_name):
return ( self.override_encoders.get('.'.join(tgt_name), None)
if self.override_encoders.get('.'.join(tgt_name), None) is not None
else self._get_encoder(src_type, SchemaDataType.traverse_schema(tgt_name[1:], self.fhir_schema.schema(tgt_name[0]))))

class SchemaDataType:

#
# field = List of.. ["Patient"
#
@staticmethod
def traverse_schema(field, struct):
if not field and type(struct) != StructField and type(struct) != StructType:
return struct.dataType
elif not field and type(struct) == StructField:
if struct.dataType.typeName() == "array":
return "array<" + struct.dataType.elementType.typeName() +">"
return struct.dataType.typeName()
elif not field and type(struct) == StructType:
return "struct"
else:
if type(struct) == StructType:
return SchemaDataType.traverse_schema(field[1:], struct[field[0]])
elif type(struct) == StructField:
return SchemaDataType.traverse_schema(field, struct.dataType)
elif type(struct) == ArrayType:
return SchemaDataType.traverse_schema(field, struct.elementType)

@staticmethod
def schema_to_python(schema):
return schema.simpleString()


12 changes: 12 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# 🔥 dbignite
__Health Data Interoperability__

This library is designed to provide a low friction entry to performing analytics on
[FHIR](https://hl7.org/fhir/bundle.html) bundles by extracting resources and flattening.

# Running Tests


``` python
python -m unittest tests/*py
```
Empty file added tests/base.py
Empty file.
14 changes: 14 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import unittest
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

class PysparkBaseTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.appName("DBIgniteTests").config("spark.driver.memory", "8g").getOrCreate()

@classmethod
def tearDownClass(cls):
cls.spark.stop()

12 changes: 2 additions & 10 deletions tests/test_readers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
import unittest
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

class PysparkBaseTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.appName("TestReaders").config("spark.driver.memory", "8g").getOrCreate()

@classmethod
def tearDownClass(cls):
cls.spark.stop()
from pyspark.sql.types import *
from .test_base import PysparkBaseTest

class TestReaders(PysparkBaseTest):
def test_FhirFormat(self):
Expand Down
Loading
Loading