Source code for pathling.bulk

#  Copyright © 2018-2025 Commonwealth Scientific and Industrial Research
#  Organisation (CSIRO) ABN 41 687 119 230.
# 
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
# 
#      http://www.apache.org/licenses/LICENSE-2.0
# 
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Callable

from py4j.java_gateway import JavaObject, JVMView
from pyspark.sql import SparkSession


[docs]@dataclass class FileResult: """ Represents the result of a single file export operation. """ source: str """ The source URL of the exported file. """ destination: str """ The destination URL where the file was saved. """ size: int """ The size of the exported file in bytes. """
[docs]@dataclass class ExportResult: """ Represents the result of a bulk export operation. """ transaction_time: datetime """ The time at which the transaction was processed at the server. Corresponds to `transactionTime` in the bulk export response. """ results: List[FileResult] """ A list of FileResult objects representing the exported files. """
[docs] @classmethod def from_java(cls, java_result: JavaObject) -> 'ExportResult': """ Create an ExportResult from a Java export result object. :param java_result: The Java export result object :return: A Python ExportResult object """ # Convert transaction time from Java Instant to Python datetime transaction_time = datetime.fromtimestamp( java_result.getTransactionTime().toEpochMilli() / 1000.0, tz=timezone.utc) # Convert file results file_results = [ FileResult( source=str(java_file_result.getSource()), destination=str(java_file_result.getDestination()), size=java_file_result.getSize()) for java_file_result in java_result.getResults() ] return cls( transaction_time=transaction_time, results=file_results )
[docs]class BulkExportClient: """ A client for exporting data from the FHIR Bulk Data Access API. """ def __init__(self, java_client): """ Create a new BulkExportClient that wraps a Java BulkExportClient. :param java_client: The Java BulkExportClient instance to wrap """ self._java_client = java_client
[docs] def export(self) -> ExportResult: """ Export data from the FHIR server. :return: The result of the export operation as a Python ExportResult object """ java_result = self._java_client.export() return ExportResult.from_java(java_result)
@classmethod def _configure_builder(cls, jvm, builder, fhir_endpoint_url: str, output_dir: str, output_format: str = "application/fhir+ndjson", since: Optional[datetime] = None, types: Optional[List[str]] = None, elements: Optional[List[str]] = None, include_associated_data: Optional[List[str]] = None, type_filters: Optional[List[str]] = None, output_extension: str = "ndjson", timeout: Optional[int] = None, max_concurrent_downloads: int = 10, auth_config: Optional[dict] = None): """ Configure common builder parameters. :param jvm: The JVM instance :param builder: The builder instance to configure :param fhir_endpoint_url: The URL of the FHIR server :param output_dir: Output directory :param output_format: Output format :param since: Timestamp filter (must include timezone information) :param types: Resource types to include :param elements: Elements to include :param include_associated_data: Associated data to include :param type_filters: Resource filters :param output_extension: File extension for output files :param timeout: Optional timeout duration in seconds :param max_concurrent_downloads: Maximum number of concurrent downloads :param auth_config: Optional authentication configuration dictionary with the following possible keys: - enabled: Whether authentication is enabled (default: False) - client_id: The client ID to use for authentication - private_key_jwk: The private key in JWK format - client_secret: The client secret to use for authentication - token_endpoint: The token endpoint URL - use_smart: Whether to use SMART authentication (default: True) - use_form_for_basic_auth: Whether to use form-based basic auth (default: False) - scope: The scope to request - token_expiry_tolerance: The token expiry tolerance in seconds (default: 120) """ builder.withFhirEndpointUrl(fhir_endpoint_url) builder.withOutputDir(output_dir) builder.withOutputFormat(output_format) builder.withOutputExtension(output_extension) builder.withMaxConcurrentDownloads(max_concurrent_downloads) if timeout is not None: java_duration = jvm.java.time.Duration.ofSeconds(timeout) builder.withTimeout(java_duration) if since is not None: if since.tzinfo is None: raise ValueError("datetime must include timezone information") # Format with microsecond precision and timezone offset instant_str = since.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] # Truncate to milliseconds if since.utcoffset() is None: instant_str += 'Z' else: offset = since.strftime('%z') # Insert colon in timezone offset instant_str += f"{offset[:3]}:{offset[3:]}" java_instant = jvm.java.time.Instant.parse(instant_str) builder.withSince(java_instant) if types is not None: for type_ in types: builder.withType(type_) if elements is not None: for element in elements: builder.withElement(element) if include_associated_data is not None: # Convert Python list to Java List<String> java_list = jvm.java.util.ArrayList() for data in include_associated_data: java_list.add(data) builder.withIncludeAssociatedData(java_list) if type_filters is not None: for filter_ in type_filters: builder.withTypeFilter(filter_) if auth_config is not None: auth_builder = jvm.au.csiro.fhir.auth.AuthConfig.builder() # Set defaults to match Java class auth_builder.enabled(False) auth_builder.useSMART(True) auth_builder.useFormForBasicAuth(False) auth_builder.tokenExpiryTolerance(120) # Map Python config to Java builder methods if 'enabled' in auth_config: auth_builder.enabled(auth_config['enabled']) if 'use_smart' in auth_config: auth_builder.useSMART(auth_config['use_smart']) if 'token_endpoint' in auth_config: auth_builder.tokenEndpoint(auth_config['token_endpoint']) if 'client_id' in auth_config: auth_builder.clientId(auth_config['client_id']) if 'client_secret' in auth_config: auth_builder.clientSecret(auth_config['client_secret']) if 'private_key_jwk' in auth_config: auth_builder.privateKeyJWK(auth_config['private_key_jwk']) if 'use_form_for_basic_auth' in auth_config: auth_builder.useFormForBasicAuth(auth_config['use_form_for_basic_auth']) if 'scope' in auth_config: auth_builder.scope(auth_config['scope']) if 'token_expiry_tolerance' in auth_config: auth_builder.tokenExpiryTolerance(auth_config['token_expiry_tolerance']) auth_config_obj = auth_builder.build() builder.withAuthConfig(auth_config_obj)
[docs] @classmethod def for_system(cls, spark, *args, **kwargs) -> 'BulkExportClient': """ Create a builder for a system-level export. :param spark: The SparkSession instance :param fhir_endpoint_url: The URL of the FHIR server to export from :param output_dir: The directory to write the output files to :param output_format: The format of the output data :param since: Only include resources modified after this timestamp :param types: List of FHIR resource types to include :param elements: List of FHIR elements to include :param include_associated_data: Pre-defined set of FHIR resources to include :param type_filters: FHIR search queries to filter resources :param output_extension: File extension for output files :param timeout: Optional timeout duration in seconds :param max_concurrent_downloads: Maximum number of concurrent downloads :param auth_config: Optional authentication configuration dictionary :return: A BulkExportClient configured for system-level export """ builder, jvm = cls._create_builder(spark, lambda bc: bc.systemBuilder()) cls._configure_builder(jvm, builder, *args, **kwargs) return cls(builder.build())
[docs] @classmethod def for_group(cls, spark, fhir_endpoint_url: str, output_dir: str, group_id: str, *args, **kwargs) -> 'BulkExportClient': """ Create a builder for a group-level export. :param spark: The SparkSession instance :param fhir_endpoint_url: The URL of the FHIR server to export from :param output_dir: The directory to write the output files to :param group_id: The ID of the group to export :param output_format: The format of the output data :param since: Only include resources modified after this timestamp :param types: List of FHIR resource types to include :param elements: List of FHIR elements to include :param include_associated_data: Pre-defined set of FHIR resources to include :param type_filters: FHIR search queries to filter resources :param output_extension: File extension for output files :param timeout: Optional timeout duration in seconds :param max_concurrent_downloads: Maximum number of concurrent downloads :param auth_config: Optional authentication configuration dictionary :return: A BulkExportClient configured for group-level export """ # Pass group_id directly to groupBuilder builder, jvm = cls._create_builder(spark, lambda bc: bc.groupBuilder(group_id)) cls._configure_builder(jvm, builder, fhir_endpoint_url, output_dir, *args, **kwargs) return cls(builder.build())
[docs] @classmethod def for_patient(cls, spark, fhir_endpoint_url: str, output_dir: str, patients: Optional[List[str]] = None, *args, **kwargs) -> 'BulkExportClient': """ Create a builder for a patient-level export. :param spark: The SparkSession instance :param fhir_endpoint_url: The URL of the FHIR server to export from :param output_dir: The directory to write the output files to :param patients: List of patient references to include :param output_format: The format of the output data :param since: Only include resources modified after this timestamp :param types: List of FHIR resource types to include :param elements: List of FHIR elements to include :param include_associated_data: Pre-defined set of FHIR resources to include :param type_filters: FHIR search queries to filter resources :param output_extension: File extension for output files :param timeout: Optional timeout duration in seconds :param max_concurrent_downloads: Maximum number of concurrent downloads :param auth_config: Optional authentication configuration dictionary :return: A BulkExportClient configured for patient-level export """ builder, jvm = cls._create_builder(spark, lambda bc: bc.patientBuilder()) if patients is not None: for patient in patients: ref = jvm.au.csiro.fhir.model.Reference.of(patient) builder.withPatient(ref) cls._configure_builder(jvm, builder, fhir_endpoint_url, output_dir, *args, **kwargs) return cls(builder.build())
@classmethod def _create_builder(cls, spark: SparkSession, factory_f: Callable[[JavaObject], JavaObject]) -> Tuple[ JavaObject, JVMView]: jvm: JVMView = spark._jvm client_class = jvm.au.csiro.fhir.export.BulkExportClient builder: JavaObject = factory_f(client_class) builder = builder.withFileStoreFactory( jvm.au.csiro.filestore.hdfs.HdfsFileStoreFactory(spark._jsc.sc().hadoopConfiguration()) ) return (builder, jvm)