Skip to main content

FHIRPath query

The Pathling library can be used to query datasets of FHIR resources using FHIRPath. This is useful for aggregating data, and creating custom views.

Extract

This operation allows a user to create arbitrary tabular extracts from FHIR data, by specifying columns in terms of set of FHIRPath expressions that are used to populate them. This is useful for preparing data for use within other tools, and helps to alleviate some of the burden of dealing with FHIR data in its raw form.

The query can also be optionally filtered by a set of FHIRPath expressions, which are combined using Boolean AND logic.

from pathling import PathlingContext, Expression as exp

pc = PathlingContext.create()
data = pc.read.ndjson("s3://somebucket/synthea/ndjson")

# For patients that have not received a COVID-19 vaccine, extract the given
# name, family name, phone number and whether the patient has heart disease.
result = data.extract("Patient",
columns=[
exp("name.first().given.first()", "Given name"),
exp("name.first().family", "Family name"),
exp("telecom.where(system = 'phone').value",
"Phone number"),
exp("reverseResolve(Condition.subject).exists("
"code.subsumedBy(http://snomed.info/sct|56265001))",
"Heart disease")
],
filters=[
"reverseResolve(Immunization.patient).vaccineCode"
".exists(memberOf('https://aehrc.csiro.au/fhir/ValueSet/covid-19-vaccines'))"
".not()"]
)
display(result)

The result of this query would look something like this:

Given nameFamily namePhone numberHeart disease
JohnSmith0412345678false
JaneDoe0412345678true

Aggregate

This operation allows a user to perform aggregate queries on FHIR data, by specifying aggregation, grouping and filter expressions. Grouped results are returned.

The aggregate operation is useful for exploratory data analysis, as well as powering visualisations and other summarized views of the data.

from pathling import PathlingContext, Expression as exp

pc = PathlingContext.create()
data = pc.read.ndjson("s3://somebucket/synthea/ndjson")

# Count the number of female patients, grouped by the type of diabetes that they
# have been diagnosed with.
result = data.aggregate(
"Patient",
aggregations=[exp("count()", "Number of patients")],
groupings=[
exp("reverseResolve(Condition.subject)"
".where(code.subsumedBy(http://snomed.info/sct|73211009))" +
".code.coding.display()",
"Type of diabetes")
],
filters=["gender = 'female'"],
)

display(result)

The result of this query would look something like this:

Type of diabetesNumber of patients
Diabetes mellitus due to cystic fibrosis3
Type 2 diabetes mellitus122
Type 1 diabetes mellitus14
NULL1472

Reading FHIR data

There are several ways of making FHIR data available for FHIRPath query.

NDJSON

You can load all the NDJSON files from a directory, assuming the following naming scheme:

[resource type].ndjson OR [resource type].[tag].ndjson

Pathling will detect the resource type from the file name, and convert it to a Spark dataset using the corresponding resource encoder.

The tag can be any string, and is used to accommodate multiple different files that contain the same resource type. For example, you might have one file called Observation.chart.ndjson and another called Observation.lab.ndjson.

data = pc.read.ndjson("/usr/share/staging/ndjson")

You can also accommodate a custom naming scheme within the NDJSON files by using the file_name_mapper argument. Here is an example of how to import the MIMIC-IV FHIR data set:

data = pc.read.ndjson(
"/usr/share/staging/ndjson",
file_name_mapper=lambda file_name: re.findall(r"Mimic(\w+?)(?:ED|ICU|"
r"Chartevents|Datetimeevents|Labevents|MicroOrg|MicroSusc|MicroTest|"
r"Outputevents|Lab|Mix|VitalSigns|VitalSignsED)?$", file_name))

FHIR Bundles

You can load data from a directory containing either JSON or XML FHIR Bundles. The specified resource types will be extracted from the Bundles and made available for query.

data = pc.read.bundles("/usr/share/staging/bundles",
resource_types=["Patient", "Condition", "Immunization"])

Datasets

You can make data that is already held in Spark datasets available for query using the datasets method. This method returns an object that can be populated with pairs of resource type and dataset, using the dataset method.

data = pc.read.datasets({
"Patient": patient_dataset,
"Condition": condition_dataset,
"Immunization": immunization_dataset,
})

Parquet

You can load data from a directory containing Parquet files. The Parquet files must have been saved using the schema used by the Pathling encoders ( see Writing FHIR data).

The files are assumed to be named according to their resource type ([resource type].parquet), e.g. Patient.parquet, Condition.parquet.

data = pc.read.parquet("/usr/share/staging/parquet")

Delta Lake

You can load data from a directory containing Delta Lake tables. Delta tables are a specialisation of Parquet that enable additional functionality, such as incremental update and history. The Delta tables must have been saved using the schema used by the Pathling encoders (see Writing FHIR data).

Note that you will need to use the enable_delta parameter when initialising the Pathling context.

The files are assumed to be named according to their resource type ([resource type].parquet), e.g. Patient.parquet, Condition.parquet.

data = pc.read.delta("/usr/share/staging/delta")

Managed tables

You can load data from managed tables that have previously been saved within the Spark catalog. You can optionally specify a schema that will be used to locate the tables, otherwise the default schema will be used.

The tables are assumed to be named according to their resource type, e.g. Patient, Condition.

This also works with the Unity Catalog feature of Databricks.

data = pc.read.tables("mimic-iv")

Writing FHIR data

Once you have read data in from a data source, you can also optionally write it back out to a variety of targets. This is useful for persisting source data in a more efficient form for query (e.g. Parquet or Delta), or for exporting data to NDJSON for use in other systems.

NDJSON

You can write data to a directory containing NDJSON files. The files are named according to their resource type ([resource type].ndjson), e.g. Patient.ndjson, Condition.ndjson.

data.write.ndjson("/tmp/ndjson")

Parquet

You can write data to a directory containing Parquet files. The files are named according to their resource type ([resource type].parquet), e.g. Patient.parquet, Condition.parquet.

data.write.parquet("/usr/share/warehouse/parquet")

Delta Lake

You can write data to a directory containing Delta Lake tables. Delta tables are a specialisation of Parquet that enable additional functionality, such as incremental update and history.

Note that you will need to use the enable_delta parameter when initialising the Pathling context.

The files are named according to their resource type ([resource type].parquet), e.g. Patient.parquet, Condition.parquet.

data.write.delta("/usr/share/warehouse/delta")

Managed tables

You can write data to managed tables that will be saved within the Spark catalog. You can optionally specify a schema that will be used to locate the tables, otherwise the default schema will be used.

The tables are named according to their resource type, e.g. Patient, Condition.

This also works with the Unity Catalog feature of Databricks.

data.write.tables("test")