# Copyright © 2018-2025 Commonwealth Scientific and Industrial Research
# Organisation (CSIRO) ABN 41 687 119 230.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Dict, Sequence, Optional, Callable
from typing import List, TYPE_CHECKING
from json import dumps, loads
from py4j.java_gateway import JavaObject
from py4j.java_collections import SetConverter
from pyspark.sql import DataFrame
from pathling import PathlingContext
from pathling.core import StringToStringSetMapper, SparkConversionsMixin
from pathling.fhir import MimeType
from pathling.spark import Dfs
if TYPE_CHECKING:
from pathling.datasink import DataSinks
[docs]class DataSource(SparkConversionsMixin):
"""
A data source that can be used to run queries against FHIR data.
"""
def __init__(self, jds: JavaObject, pc: PathlingContext):
SparkConversionsMixin.__init__(self, pc.spark)
self._jds = jds
self.pc = pc
[docs] def read(self, resource_code: str) -> DataFrame:
"""
Reads the data for the given resource type from the data source.
:param resource_code: A string representing the type of FHIR resource to read data from.
:return: A Spark DataFrame containing the data for the given resource type.
"""
return self._wrap_df(self._jds.read(resource_code))
[docs] def resource_types(self):
"""
Returns a list of the resource types that are available in the data source.
:return: A list of strings representing the resource types.
"""
return list(self._jds.getResourceTypes())
@property
def write(self) -> "DataSinks":
"""
Provides access to a :class:`DataSinks` object that can be used to persist data.
"""
# Import here to avoid circular dependency
from pathling.datasink import DataSinks
return DataSinks(self)
[docs] def view(
self,
resource: Optional[str] = None,
select: Optional[Sequence[Dict]] = None,
constants: Optional[Sequence[Dict]] = None,
where: Optional[Sequence[Dict]] = None,
json: Optional[str] = None,
) -> DataFrame:
"""
Executes a SQL on FHIR view definition and returns the result as a Spark DataFrame.
:param resource: The FHIR resource that the view is based upon, e.g. 'Patient' or
'Observation'.
:param select: A list of columns and nested selects to include in the view.
:param constants: A list of constants that can be used in FHIRPath expressions.
:param where: A list of FHIRPath expressions that can be used to filter the view.
:param json: A JSON string representing the view definition, as an alternative to providing
the parameters as Python objects.
:return: A Spark DataFrame containing the results of the view.
"""
if json:
query_json = json
parsed = loads(json)
resource = parsed.get("resource")
else:
args = locals()
query = {
key: args[key]
for key in ["resource", "select", "constants", "where"]
if args[key] is not None
}
query_json = dumps(query)
jquery = self._jds.view(resource)
jquery.json(query_json)
return self._wrap_df(jquery.execute())
[docs]class DataSources(SparkConversionsMixin):
"""
A factory for creating data sources.
"""
# Default extension and MIME type for NDJSON files
NDJSON_EXTENSION = "ndjson"
NDJSON_MIMETYPE = "application/fhir+ndjson"
def __init__(self, pathling: PathlingContext):
SparkConversionsMixin.__init__(self, pathling.spark)
self._pc = pathling
self._jdataSources = pathling._jpc.read()
def _wrap_ds(self, jds: JavaObject) -> DataSource:
return DataSource(jds, self._pc)
[docs] def ndjson(
self,
path,
extension: Optional[str] = None,
file_name_mapper: Callable[[str], Sequence[str]] = None,
) -> DataSource:
"""
Creates a data source from a directory containing NDJSON files. The files must be named with
the resource type code and must have the ".ndjson" extension, e.g. "Patient.ndjson"
or "Observation.ndjson".
:param path: The URI of the directory containing the NDJSON files.
:param extension: The file extension to use when searching for files. Defaults to "ndjson".
:param file_name_mapper: An optional function that maps a filename to the set of resource
types that it contains.
:return: A DataSource object that can be used to run queries against the data.
"""
extension = extension or DataSources.NDJSON_EXTENSION
if file_name_mapper:
wrapped_mapper = StringToStringSetMapper(
self.spark._jvm._gateway_client, file_name_mapper
)
return self._wrap_ds(
self._jdataSources.ndjson(path, extension, wrapped_mapper)
)
else:
return self._wrap_ds(self._jdataSources.ndjson(path, extension))
[docs] def bundles(
self,
path: str,
resource_types: Sequence[str],
mime_type: str = MimeType.FHIR_JSON,
) -> DataSource:
"""
Creates a data source from a directory containing FHIR bundles.
:param path: The URI of the directory containing the bundles.
:param resource_types: A sequence of resource type codes that should be extracted from the
bundles.
:param mime_type: The MIME type of the bundles. Defaults to `application/fhir+json`.
:return: A DataSource object that can be used to run queries against the data.
"""
return self._wrap_ds(
self._jdataSources.bundles(
path,
SetConverter().convert(resource_types, self.spark._jvm._gateway_client),
mime_type,
)
)
[docs] def datasets(self, resources: Dict[str, DataFrame]) -> DataSource:
"""
Creates an immutable, ad-hoc data source from a dictionary of Spark DataFrames indexed with
resource type codes.
:param resources: A dictionary of Spark DataFrames, where the keys are resource type codes
and the values are the data frames containing the resource data.
:return: A DataSource object that can be used to run queries against the data.
"""
jbuilder = self._jdataSources.datasets()
for resource_code, resource_data in resources.items():
jbuilder.dataset(resource_code, resource_data._jdf)
return self._wrap_ds(jbuilder)
[docs] def parquet(self, path: str) -> DataSource:
"""
Creates a data source from a directory containing Parquet tables. Each table must be named
according to the name of the resource type that it stores.
:param path: The URI of the directory containing the Parquet tables.
:return: A DataSource object that can be used to run queries against the data.
"""
return self._wrap_ds(self._jdataSources.parquet(path))
[docs] def delta(self, path: str) -> DataSource:
"""
Creates a data source from a directory containing Delta tables, as used by Pathling Server
for persistence. Each table must be named according to the name of the resource type that
it stores.
:param path: The URI of the directory containing the Delta tables.
:return: A DataSource object that can be used to run queries against the data.
"""
return self._wrap_ds(self._jdataSources.delta(path))
[docs] def tables(
self,
schema: Optional[str] = None,
) -> DataSource:
"""
Creates a data source from a set of Spark tables, where the table names are the resource
type codes.
:param schema: An optional schema name that should be used to qualify the table names.
:return: A DataSource object that can be used to run queries against the data.
"""
if schema:
return self._wrap_ds(self._jdataSources.tables(schema))
else:
return self._wrap_ds(self._jdataSources.tables())
[docs] def bulk(
self,
fhir_endpoint_url: str,
output_dir: Optional[str] = None,
overwrite: bool = True,
group_id: Optional[str] = None,
patients: Optional[List[str]] = None,
since: Optional[datetime] = None,
types: Optional[List[str]] = None,
elements: Optional[List[str]] = None,
include_associated_data: Optional[List[str]] = None,
type_filters: Optional[List[str]] = None,
timeout: Optional[int] = None,
max_concurrent_downloads: int = 10,
auth_config: Optional[Dict] = None
) -> DataSource:
"""
Creates a data source from a FHIR Bulk Data Access API endpoint.
Currently only supports bulk export in the ndjson format.
:param fhir_endpoint_url: The URL of the FHIR server to export from
:param output_dir: The directory to write the output files to.
This should be a valid path in the Spark's filesystem.
If set to `None`, a temporary directory will be used instead.
:param overwrite: Whether to overwrite the output directory if it already exists. Defaults to True.
:param group_id: Optional group ID for group-level export
:param patients: Optional list of patient references for patient-level export
:param since: Only include resources modified after this timestamp
:param types: List of FHIR resource types to include
:param elements: List of FHIR elements to include
:param include_associated_data: Pre-defined set of FHIR resources to include
:param type_filters: FHIR search queries to filter resources
:param timeout: Optional timeout duration in seconds
:param max_concurrent_downloads: Maximum number of concurrent downloads. Defaults to 10
:param auth_config: Optional authentication configuration dictionary with the following possible keys:
- enabled: Whether authentication is enabled (default: False)
- client_id: The client ID to use for authentication
- private_key_jwk: The private key in JWK format
- client_secret: The client secret to use for authentication
- token_endpoint: The token endpoint URL
- use_smart: Whether to use SMART authentication (default: True)
- use_form_for_basic_auth: Whether to use form-based basic auth (default: False)
- scope: The scope to request
- token_expiry_tolerance: The token expiry tolerance in seconds (default: 120)
:return: A DataSource object that can be used to run queries against the data
"""
from pathling.bulk import BulkExportClient
dfs = Dfs(self._pc.spark)
# If `output_dir` is not provided, create a temporary directory
output_dir = output_dir or dfs.get_temp_dir_path(prefix="tmp-bulk-export", qualified=True)
# If `overwrite`, then ensure the output directory does not exist
if overwrite and dfs.exists(output_dir):
dfs.delete(output_dir, recursive=True)
output_format = DataSources.NDJSON_MIMETYPE
output_extension = DataSources.NDJSON_EXTENSION
# Create appropriate client based on parameters
if group_id is not None:
client = BulkExportClient.for_group(
self.spark,
fhir_endpoint_url=fhir_endpoint_url,
output_dir=output_dir,
group_id=group_id,
output_format=output_format,
since=since,
types=types,
elements=elements,
include_associated_data=include_associated_data,
type_filters=type_filters,
output_extension=output_extension,
timeout=timeout,
max_concurrent_downloads=max_concurrent_downloads,
auth_config=auth_config
)
elif patients is not None:
client = BulkExportClient.for_patient(
self.spark,
fhir_endpoint_url=fhir_endpoint_url,
output_dir=output_dir,
patients=patients,
output_format=output_format,
since=since,
types=types,
elements=elements,
include_associated_data=include_associated_data,
type_filters=type_filters,
output_extension=output_extension,
timeout=timeout,
max_concurrent_downloads=max_concurrent_downloads,
auth_config=auth_config
)
else:
client = BulkExportClient.for_system(
self.spark,
fhir_endpoint_url=fhir_endpoint_url,
output_dir=output_dir,
output_format=output_format,
since=since,
types=types,
elements=elements,
include_associated_data=include_associated_data,
type_filters=type_filters,
output_extension=output_extension,
timeout=timeout,
max_concurrent_downloads=max_concurrent_downloads,
auth_config=auth_config
)
# Perform the export
client.export()
# Return a DataSource that reads from the exported files
return self.ndjson(output_dir)