pathling package
Submodules
pathling.bulk module
- class pathling.bulk.BulkExportClient(java_client)[source]
Bases:
object
A client for exporting data from the FHIR Bulk Data Access API.
- export() ExportResult [source]
Export data from the FHIR server.
- Returns:
The result of the export operation as a Python ExportResult object
- classmethod for_group(spark, fhir_endpoint_url: str, output_dir: str, group_id: str, *args, **kwargs) BulkExportClient [source]
Create a builder for a group-level export.
- Parameters:
spark – The SparkSession instance
fhir_endpoint_url – The URL of the FHIR server to export from
output_dir – The directory to write the output files to
group_id – The ID of the group to export
output_format – The format of the output data
since – Only include resources modified after this timestamp
types – List of FHIR resource types to include
elements – List of FHIR elements to include
include_associated_data – Pre-defined set of FHIR resources to include
type_filters – FHIR search queries to filter resources
output_extension – File extension for output files
timeout – Optional timeout duration in seconds
max_concurrent_downloads – Maximum number of concurrent downloads
auth_config – Optional authentication configuration dictionary
- Returns:
A BulkExportClient configured for group-level export
- classmethod for_patient(spark, fhir_endpoint_url: str, output_dir: str, patients: List[str] | None = None, *args, **kwargs) BulkExportClient [source]
Create a builder for a patient-level export.
- Parameters:
spark – The SparkSession instance
fhir_endpoint_url – The URL of the FHIR server to export from
output_dir – The directory to write the output files to
patients – List of patient references to include
output_format – The format of the output data
since – Only include resources modified after this timestamp
types – List of FHIR resource types to include
elements – List of FHIR elements to include
include_associated_data – Pre-defined set of FHIR resources to include
type_filters – FHIR search queries to filter resources
output_extension – File extension for output files
timeout – Optional timeout duration in seconds
max_concurrent_downloads – Maximum number of concurrent downloads
auth_config – Optional authentication configuration dictionary
- Returns:
A BulkExportClient configured for patient-level export
- classmethod for_system(spark, *args, **kwargs) BulkExportClient [source]
Create a builder for a system-level export.
- Parameters:
spark – The SparkSession instance
fhir_endpoint_url – The URL of the FHIR server to export from
output_dir – The directory to write the output files to
output_format – The format of the output data
since – Only include resources modified after this timestamp
types – List of FHIR resource types to include
elements – List of FHIR elements to include
include_associated_data – Pre-defined set of FHIR resources to include
type_filters – FHIR search queries to filter resources
output_extension – File extension for output files
timeout – Optional timeout duration in seconds
max_concurrent_downloads – Maximum number of concurrent downloads
auth_config – Optional authentication configuration dictionary
- Returns:
A BulkExportClient configured for system-level export
- class pathling.bulk.ExportResult(transaction_time: datetime, results: List[FileResult])[source]
Bases:
object
Represents the result of a bulk export operation.
- classmethod from_java(java_result: JavaObject) ExportResult [source]
Create an ExportResult from a Java export result object.
- Parameters:
java_result – The Java export result object
- Returns:
A Python ExportResult object
- results: List[FileResult]
A list of FileResult objects representing the exported files.
- transaction_time: datetime
The time at which the transaction was processed at the server. Corresponds to transactionTime in the bulk export response.
- class pathling.bulk.FileResult(source: str, destination: str, size: int)[source]
Bases:
object
Represents the result of a single file export operation.
- destination: str
The destination URL where the file was saved.
- size: int
The size of the exported file in bytes.
- source: str
The source URL of the exported file.
pathling.coding module
- class pathling.coding.Coding(system: str, code: str, version: str | None = None, display: str | None = None, user_selected: bool | None = None)[source]
Bases:
object
A Coding represents a code in a code system. See: https://hl7.org/fhir/R4/datatypes.html#Coding
- classmethod of_snomed(code: str, version: str | None = None, display: str | None = None, user_selected: bool | None = None) Coding [source]
Creates a SNOMED Coding.
- Parameters:
code – the code
version – a URI that identifies the version of the code system
display – the display text for the Coding
user_selected – an indicator of whether the Coding was chosen directly by the user
- Returns:
a SNOMED coding with given arguments.
pathling.context module
- class pathling.context.PathlingContext(spark: SparkSession, jpc: JavaObject)[source]
Bases:
object
Main entry point for Pathling API functionality. Should be instantiated with the
PathlingContext.create()
class method.Example use:
pc = PathlingContext.create(spark) patient_df = pc.encode(spark.read.text('ndjson_resources'), 'Patient') patient_df.show()
- classmethod create(spark: SparkSession | None = None, max_nesting_level: int | None = 3, enable_extensions: bool | None = False, enabled_open_types: Sequence[str] | None = ('boolean', 'code', 'date', 'dateTime', 'decimal', 'integer', 'string', 'Coding', 'CodeableConcept', 'Address', 'Identifier', 'Reference'), enable_terminology: bool | None = True, terminology_server_url: str | None = 'https://tx.ontoserver.csiro.au/fhir', terminology_verbose_request_logging: bool | None = False, terminology_socket_timeout: int | None = 60000, max_connections_total: int | None = 32, max_connections_per_route: int | None = 16, terminology_retry_enabled: bool | None = True, terminology_retry_count: int | None = 2, enable_cache: bool | None = True, cache_max_entries: int | None = 200000, cache_storage_type: str | None = 'memory', cache_storage_path: str | None = None, cache_default_expiry: int | None = 600, cache_override_expiry: int | None = None, token_endpoint: str | None = None, enable_auth: bool | None = False, client_id: str | None = None, client_secret: str | None = None, scope: str | None = None, token_expiry_tolerance: int | None = 120, accept_language: str | None = None, enable_delta=False, enable_remote_debugging: bool | None = False, debug_port: int | None = 5005, debug_suspend: bool | None = True) PathlingContext [source]
Creates a
PathlingContext
with the given configuration options. This should only be done once within a SparkSession - subsequent calls with different configuration may produce an error.If no SparkSession is provided, and there is not one already present in this process - a new SparkSession will be created.
If a SparkSession is not provided, and one is already running within the current process, it will be reused - and it is assumed that the Pathling library API JAR is already on the classpath. If you are running your own cluster, make sure it is on the list of packages.
If a SparkSession is provided, it needs to include the Pathling library API JAR on its classpath. You can get the path for the JAR (which is bundled with the Python package) using the pathling.etc.find_jar method.
- Parameters:
spark – a pre-configured
SparkSession
instance, use this if you need to control the way that the session is set upmax_nesting_level – controls the maximum depth of nested element data that is encoded upon import. This affects certain elements within FHIR resources that contain recursive references, e.g. QuestionnaireResponse.item.
enable_extensions – enables support for FHIR extensions
enabled_open_types – the list of types that are encoded within open types, such as extensions. This default list was taken from the data types that are common to extensions found in widely-used IGs, such as the US and AU base profiles. In general, you will get the best query performance by encoding your data with the shortest possible list.
enable_terminology – enables the use of terminology functions
terminology_server_url – the endpoint of a FHIR terminology service (R4) that the server can use to resolve terminology queries. The default server is suitable for testing purposes only.
terminology_verbose_request_logging – setting this option to True will enable additional logging of the details of requests to the terminology service. Note that logging is subject to the Spark logging level, which you can set using SparkContext.setLogLevel. Verbose request logging is sent to the DEBUG logging level.
terminology_socket_timeout – the maximum period (in milliseconds) that the server should wait for incoming data from the HTTP service
max_connections_total – the maximum total number of connections for the client
max_connections_per_route – the maximum number of connections per route for the client
terminology_retry_enabled – controls whether terminology requests that fail for possibly transient reasons (network connections, DNS problems) should be retried
terminology_retry_count – the number of times to retry failed terminology requests
enable_cache – set this to false to disable caching of terminology requests (not recommended)
cache_max_entries – sets the maximum number of entries that will be held in memory
cache_storage_type – the type of storage to use for the terminology cache. See StorageType.
cache_storage_path – the path on disk to use for the cache, required when cache_storage_type is disk
cache_default_expiry – the default expiry time for cache entries (in seconds), used when the server does not provide an expiry value
cache_override_expiry – if provided, this value overrides the expiry time provided by the terminology server
enable_auth – enables authentication of requests to the terminology server
token_endpoint – an OAuth2 token endpoint for use with the client credentials grant
client_id – a client ID for use with the client credentials grant
client_secret – a client secret for use with the client credentials grant
scope – a scope value for use with the client credentials grant
token_expiry_tolerance – the minimum number of seconds that a token should have before expiry when deciding whether to send it with a terminology request
accept_language – the default value of the Accept-Language HTTP header passed to the terminology server. The value may contain multiple languages, with weighted preferences as defined in https://www.rfc-editor.org/rfc/rfc9110.html#name-accept-language. If not provided, the header is not sent. The server can use the header to return the result in the preferred language if it is able. The actual behaviour may depend on the server implementation and the code systems used.
enable_delta – enables the use of Delta for storage of FHIR data. Only supported when no SparkSession is provided.
enable_remote_debugging – enables remote debugging for the JVM process.
debug_port – the port for the debugger to listen on (default: 5005)
debug_suspend – if true, the JVM will suspend until a debugger is attached
- Returns:
a
PathlingContext
instance initialized with the specified configuration
- encode(df: DataFrame, resource_name: str, input_type: str | None = None, column: str | None = None) DataFrame [source]
Takes a dataframe with a string representations of FHIR resources in the given column and encodes the resources of the given types as Spark dataframe.
- Parameters:
df – a
DataFrame
containing the resources to encode.resource_name – the name of the FHIR resource to extract (Condition, Observation, etc.)
input_type – the mime type of input string encoding. Defaults to application/fhir+json.
column – the column in which the resources to encode are stored. If ‘None’ then the input dataframe is assumed to have one column of type string.
- Returns:
a
DataFrame
containing the given type of resources encoded into Spark columns
- encode_bundle(df: DataFrame, resource_name: str, input_type: str | None = None, column: str | None = None) DataFrame [source]
Takes a dataframe with a string representations of FHIR bundles in the given column and encodes the resources of the given types as Spark dataframe.
- Parameters:
df – a
DataFrame
containing the bundles with the resources to encode.resource_name – the name of the FHIR resource to extract (Condition, Observation, etc.)
input_type – the MIME type of the input string encoding. Defaults to application/fhir+json.
column – the column in which the resources to encode are stored. If ‘None’ then the input dataframe is assumed to have one column of type string.
- Returns:
a
DataFrame
containing the given type of resources encoded into Spark columns
- property read: DataSources
Provides access to the instance of
DataSource
factory.
- property spark: SparkSession
Returns the SparkSession associated with this context.
pathling.core module
- class pathling.core.Expression(expression: str, label: str | None = None)[source]
Bases:
object
Represents an FHIRPath expression that may have an optional name/alias. To make it easier to work with expressions, uses can alias this class with their own name, for example: ‘exp’ or ‘fp’ using import and then use the alias method to create labeled expressions. For example:
` from pathling import Expression as fp fp('some FHIRPath expression').alias('some name') `
- alias(label: str) Expression [source]
Creates a new Expression object with the specified label/alias.
- Parameters:
label – The label/alias to use for the new Expression object.
- Returns:
A new Expression object with the specified label/alias.
- classmethod as_expression(exp_or_str: Expression | str) Expression [source]
Casts the specified expression or string into an Expression object.
- Parameters:
exp_or_str – The expression or string to cast.
- Returns:
An Expression object.
- classmethod as_expression_sequence(sequence_of_exp_or_str: Sequence[Expression | str]) Sequence[Expression] [source]
Cast a sequence of expressions or strings into a sequence of Expression objects.
- Parameters:
sequence_of_exp_or_str – The sequence of expressions or strings to cast.
- Returns:
A sequence of Expression objects.
- as_tuple() Tuple [source]
Gets a tuple representing the expression and its optional label/alias.
- Returns:
A tuple representing the expression and its optional label/alias.
- property expression: str
Gets the FHIRPath expression.
- Returns:
The FHIRPath expression.
- property label: str | None
Gets the optional label/alias for the expression.
- Returns:
The optional label/alias for the expression.
- class pathling.core.Function(lambda_function, spark)[source]
Bases:
object
Wraps a Python lambda function so that it can be passed to Java functions that expect a java.util.function.Function object.
- Parameters:
lambda_function – A Python lambda function that takes one argument.
spark – A pyspark.sql.SparkSession object.
- class pathling.core.SparkConversionsMixin(spark: SparkSession)[source]
Bases:
object
A mixin that provides access to the Spark session and a number for utility methods for converting between Python and Java objects.
- property spark: SparkSession
- class pathling.core.StringMapper(gateway, fn)[source]
Bases:
object
A wrapper for a Python lambda that can be passed as a Java lambda for mapping a string value to another string value.
- class pathling.core.StringToStringSetMapper(gateway, fn)[source]
Bases:
object
A wrapper for a Python lambda that can be passed as a Java lambda for mapping a string value to a list of string values.
- class pathling.core.VariableExpression(expression: str, label: str | None, when_many: str | None = 'array')[source]
Bases:
Expression
Represents a FHIRPath variable expression that has a when many attribute, and may have an optional name/alias.
- property when_many: str | None
Gets the optional when many attribute for the expression.
- Returns:
The optional when many attribute for the expression.
pathling.datasink module
- class pathling.datasink.DataSinks(datasource: DataSource)[source]
Bases:
SparkConversionsMixin
A class for writing FHIR data to a variety of different targets.
- delta(path: str, save_mode: str | None = 'overwrite') None [source]
Writes the data to a directory of Delta files.
- Parameters:
path – The URI of the directory to write the files to.
save_mode – The save mode to use when writing the data - “overwrite” will
overwrite any existing data, “merge” will merge the new data with the existing data based on resource ID.
- ndjson(path: str, save_mode: str | None = 'error', file_name_mapper: Callable[[str], str] | None = None) None [source]
Writes the data to a directory of NDJSON files. The files will be named using the resource type and the “.ndjson” extension.
- Parameters:
path – The URI of the directory to write the files to.
save_mode – The save mode to use when writing the data: - “overwrite” will overwrite any existing data. - “append” will append the new data to the existing data. - “ignore” will only save the data if the file does not already exist. - “error” will raise an error if the file already exists.
file_name_mapper – An optional function that can be used to customise the mapping of
the resource type to the file name.
- parquet(path: str, save_mode: str | None = 'error') None [source]
Writes the data to a directory of Parquet files.
- Parameters:
path – The URI of the directory to write the files to.
save_mode – The save mode to use when writing the data: - “overwrite” will overwrite any existing data. - “append” will append the new data to the existing data. - “ignore” will only save the data if the file does not already exist. - “error” will raise an error if the file already exists.
- tables(schema: str | None = None, save_mode: str | None = 'overwrite') None [source]
Writes the data to a set of tables in the Spark catalog.
- Parameters:
schema – The name of the schema to write the tables to.
save_mode – The save mode to use when writing the data - “overwrite” will
overwrite any existing data, “merge” will merge the new data with the existing data based on resource ID.
- class pathling.datasink.SaveMode[source]
Bases:
object
Constants that represent the different save modes.
OVERWRITE: Overwrite any existing data. APPEND: Append the new data to the existing data. IGNORE: Only save the data if the file does not already exist. ERROR: Raise an error if the file already exists. MERGE: Merge the new data with the existing data based on resource ID.
- APPEND: str = 'append'
- ERROR: str = 'error'
- IGNORE: str = 'ignore'
- MERGE: str = 'merge'
- OVERWRITE: str = 'overwrite'
pathling.datasource module
- class pathling.datasource.DataSource(jds: JavaObject, pc: PathlingContext)[source]
Bases:
SparkConversionsMixin
A data source that can be used to run queries against FHIR data.
- read(resource_code: str) DataFrame [source]
Reads the data for the given resource type from the data source.
- Parameters:
resource_code – A string representing the type of FHIR resource to read data from.
- Returns:
A Spark DataFrame containing the data for the given resource type.
- resource_types()[source]
Returns a list of the resource types that are available in the data source.
- Returns:
A list of strings representing the resource types.
- view(resource: str | None = None, select: Sequence[Dict] | None = None, constants: Sequence[Dict] | None = None, where: Sequence[Dict] | None = None, json: str | None = None) DataFrame [source]
Executes a SQL on FHIR view definition and returns the result as a Spark DataFrame.
- Parameters:
resource – The FHIR resource that the view is based upon, e.g. ‘Patient’ or ‘Observation’.
select – A list of columns and nested selects to include in the view.
constants – A list of constants that can be used in FHIRPath expressions.
where – A list of FHIRPath expressions that can be used to filter the view.
json – A JSON string representing the view definition, as an alternative to providing the parameters as Python objects.
- Returns:
A Spark DataFrame containing the results of the view.
- class pathling.datasource.DataSources(pathling: PathlingContext)[source]
Bases:
SparkConversionsMixin
A factory for creating data sources.
- NDJSON_EXTENSION = 'ndjson'
- NDJSON_MIMETYPE = 'application/fhir+ndjson'
- bulk(fhir_endpoint_url: str, output_dir: str | None = None, overwrite: bool = True, group_id: str | None = None, patients: List[str] | None = None, since: datetime | None = None, types: List[str] | None = None, elements: List[str] | None = None, include_associated_data: List[str] | None = None, type_filters: List[str] | None = None, timeout: int | None = None, max_concurrent_downloads: int = 10, auth_config: Dict | None = None) DataSource [source]
Creates a data source from a FHIR Bulk Data Access API endpoint. Currently only supports bulk export in the ndjson format.
- Parameters:
fhir_endpoint_url – The URL of the FHIR server to export from
output_dir – The directory to write the output files to. This should be a valid path in the Spark’s filesystem. If set to None, a temporary directory will be used instead.
overwrite – Whether to overwrite the output directory if it already exists. Defaults to True.
group_id – Optional group ID for group-level export
patients – Optional list of patient references for patient-level export
since – Only include resources modified after this timestamp
types – List of FHIR resource types to include
elements – List of FHIR elements to include
include_associated_data – Pre-defined set of FHIR resources to include
type_filters – FHIR search queries to filter resources
timeout – Optional timeout duration in seconds
max_concurrent_downloads – Maximum number of concurrent downloads. Defaults to 10
auth_config – Optional authentication configuration dictionary with the following possible keys: - enabled: Whether authentication is enabled (default: False) - client_id: The client ID to use for authentication - private_key_jwk: The private key in JWK format - client_secret: The client secret to use for authentication - token_endpoint: The token endpoint URL - use_smart: Whether to use SMART authentication (default: True) - use_form_for_basic_auth: Whether to use form-based basic auth (default: False) - scope: The scope to request - token_expiry_tolerance: The token expiry tolerance in seconds (default: 120)
- Returns:
A DataSource object that can be used to run queries against the data
- bundles(path: str, resource_types: Sequence[str], mime_type: str = 'application/fhir+json') DataSource [source]
Creates a data source from a directory containing FHIR bundles.
- Parameters:
path – The URI of the directory containing the bundles.
resource_types – A sequence of resource type codes that should be extracted from the bundles.
mime_type – The MIME type of the bundles. Defaults to application/fhir+json.
- Returns:
A DataSource object that can be used to run queries against the data.
- datasets(resources: Dict[str, DataFrame]) DataSource [source]
Creates an immutable, ad-hoc data source from a dictionary of Spark DataFrames indexed with resource type codes.
- Parameters:
resources – A dictionary of Spark DataFrames, where the keys are resource type codes and the values are the data frames containing the resource data.
- Returns:
A DataSource object that can be used to run queries against the data.
- delta(path: str) DataSource [source]
Creates a data source from a directory containing Delta tables, as used by Pathling Server for persistence. Each table must be named according to the name of the resource type that it stores.
- Parameters:
path – The URI of the directory containing the Delta tables.
- Returns:
A DataSource object that can be used to run queries against the data.
- ndjson(path, extension: str | None = None, file_name_mapper: Callable[[str], Sequence[str]] | None = None) DataSource [source]
Creates a data source from a directory containing NDJSON files. The files must be named with the resource type code and must have the “.ndjson” extension, e.g. “Patient.ndjson” or “Observation.ndjson”.
- Parameters:
path – The URI of the directory containing the NDJSON files.
extension – The file extension to use when searching for files. Defaults to “ndjson”.
file_name_mapper – An optional function that maps a filename to the set of resource types that it contains.
- Returns:
A DataSource object that can be used to run queries against the data.
- parquet(path: str) DataSource [source]
Creates a data source from a directory containing Parquet tables. Each table must be named according to the name of the resource type that it stores.
- Parameters:
path – The URI of the directory containing the Parquet tables.
- Returns:
A DataSource object that can be used to run queries against the data.
- tables(schema: str | None = None) DataSource [source]
Creates a data source from a set of Spark tables, where the table names are the resource type codes.
- Parameters:
schema – An optional schema name that should be used to qualify the table names.
- Returns:
A DataSource object that can be used to run queries against the data.
pathling.fhir module
pathling.functions module
- pathling.functions.to_coding(coding_column: Column, system: str, version: str | None = None) Column [source]
Converts a Column containing codes into a Column that contains a Coding struct. The Coding struct Column can be used as an input to terminology functions such as member_of and translate.
- Parameters:
coding_column – the Column containing the codes
system – the URI of the system the codes belong to
version – the version of the code system
- Returns:
a Column containing a Coding struct
- pathling.functions.to_ecl_value_set(ecl: str) str [source]
Converts a SNOMED CT ECL expression into a FHIR ValueSet URI. Can be used with the member_of function.
- Parameters:
ecl – the ECL expression
- Returns:
the ValueSet URI
- pathling.functions.to_loinc_coding(coding_column: Column, version: str | None = None) Column [source]
Converts a Column containing codes into a Column that contains a LOINC Coding struct. The Coding struct Column can be used as an input to terminology functions such as member_of and translate.
- Parameters:
coding_column – the Column containing the codes
version – the version of the code system
- Returns:
a Column containing a Coding struct
- pathling.functions.to_snomed_coding(coding_column: Column, version: str | None = None) Column [source]
Converts a Column containing codes into a Column that contains a SNOMED Coding struct. The Coding struct Column can be used as an input to terminology functions such as member_of and translate.
- Parameters:
coding_column – the Column containing the codes
version – the version of the code system
- Returns:
a Column containing a Coding struct
pathling.spark module
- class pathling.spark.Dfs(spark: SparkSession)[source]
Bases:
object
A class for interacting with the Hadoop Distributed File System (HDFS) in Spark.
- delete(path: str, recursive: bool = False) bool [source]
Delete a file or directory at the specified path.
- Parameters:
path – Path to the file or directory to delete.
recursive – If True, delete directories and their contents recursively.
- Returns:
True if deletion was successful, False otherwise.
- exists(path: str) bool [source]
Check if a given path exists in the filesystem.
- Parameters:
path – Path to check for existence.
- Returns:
True if the path exists, False otherwise.
- get_temp_dir_path(prefix: str = 'tmp-app', qualified=True) str [source]
Returns a unique path for a temporary directory in Spark’s filesystem.
The path is constructed by appending a UUID to the base temporary directory, ensuring uniqueness for each call. The directory itself is not created, only the path is returned.
- Parameters:
prefix – String to insert between the base directory and the UUID (default: “tmp-app”).
qualified – If True, returns a fully qualified Hadoop path; if False, returns a raw path string.
- Returns:
String representing the unique temporary directory path.
pathling.udfs module
- class pathling.udfs.Equivalence[source]
Bases:
object
Concept map equivalences, see https://www.hl7.org/fhir/R4/valueset-concept-map-equivalence.html.
- DISJOINT = 'disjoint'
- EQUAL = 'equal'
- EQUIVALENT = 'equivalent'
- INEXACT = 'inexact'
- NARROWER = 'narrower'
- RELATEDTO = 'relatedto'
- SPECIALIZES = 'specializes'
- SUBSUMES = 'subsumes'
- UNMATCHED = 'unmatched'
- WIDER = 'wider'
- class pathling.udfs.PropertyType[source]
Bases:
object
Allowed property types.
- BOOLEAN = 'boolean'
- CODE = 'code'
- CODING = 'Coding'
- DATETIME = 'dateTime'
- DECIMAL = 'decimal'
- INTEGER = 'integer'
- STRING = 'string'
- pathling.udfs.designation(coding: Column | str | Coding, use: Column | str | Coding | None = None, language: str | None = None) Column [source]
Takes a Coding column as its input. Returns the Column, which contains the values of designations (strings) for this coding for the specified use and language. If the language is not provided (is null) then all designations with the specified type are returned regardless of their language.
- Parameters:
coding – a Column containing a struct representation of a Coding
use – the code with the use of the designations
language – the language of the designations
- Returns:
the Column containing the result of the operation (array of strings with designation values)
- pathling.udfs.display(coding: Column | str | Coding, accept_language: str | None = None) Column [source]
Takes a Coding column as its input. Returns the Column, which contains the canonical display name associated with the given code.
- Parameters:
coding – a Column containing a struct representation of a Coding.
accept_language – the optional language preferences for the returned display name. Overrides the parameter accept_language in
PathlingContext.create
.
- Returns:
a Column containing the result of the operation (String).
- pathling.udfs.member_of(coding: Column | str | Coding, value_set_uri: str) Column [source]
Takes a Coding or array of Codings column as its input. Returns the column which contains a Boolean value, indicating whether any of the input Codings is the member of the specified FHIR ValueSet.
- Parameters:
coding – a Column containing a struct representation of a Coding or an array of such structs.
value_set_uri – an identifier for a FHIR ValueSet
- Returns:
a Column containing the result of the operation.
- pathling.udfs.property_of(coding: Column | str | Coding, property_code: str, property_type: str = 'string', accept_language: str | None = None) Column [source]
Takes a Coding column as its input. Returns the Column, which contains the values of properties for this coding with specified names and types. The type of the result column depends on the types of the properties. Primitive FHIR types are mapped to their corresponding SQL primitives. Complex types are mapped to their corresponding structs. The allowed property types are: code | Coding | string | integer | boolean | dateTime | decimal. See also
PropertyType
.- Parameters:
coding – a Column containing a struct representation of a Coding
property_code – the code of the property to retrieve.
property_type – the type of the property to retrieve.
accept_language – the optional language preferences for the returned property values. Overrides the parameter accept_language in
PathlingContext.create
.
- Returns:
the Column containing the result of the operation (array of property values)
- pathling.udfs.subsumed_by(left_coding: Column | str | Coding, right_coding: Column | str | Coding) Column [source]
Takes two Coding columns as input. Returns the Column, which contains a Boolean value, indicating whether the left Coding is subsumed by the right Coding.
- Parameters:
left_coding – a Column containing a struct representation of a Coding or an array of Codings.
right_coding – a Column containing a struct representation of a Coding or an array of Codings.
- Returns:
a Column containing the result of the operation (boolean).
- pathling.udfs.subsumes(left_coding: Column | str | Coding, right_coding: Column | str | Coding) Column [source]
- Takes two Coding columns as input. Returns the Column, which contains a
Boolean value, indicating whether the left Coding subsumes the right Coding.
- Parameters:
left_coding – a Column containing a struct representation of a Coding or an array of Codings.
right_coding – a Column containing a struct representation of a Coding or an array of Codings.
- Returns:
a Column containing the result of the operation (boolean).
- pathling.udfs.translate(coding: Column | str | Coding, concept_map_uri: str, reverse: bool = False, equivalences: str | Collection[str] | None = None, target: str | None = None) Column [source]
Takes a Coding column as input. Returns the Column which contains an array of Coding value with translation targets from the specified FHIR ConceptMap. There may be more than one target concept for each input concept. Only the translation with the specified equivalences are returned. See also
Equivalence
. :param coding: a Column containing a struct representation of a Coding :param concept_map_uri: an identifier for a FHIR ConceptMap :param reverse: the direction to traverse the map - false results in “source to target”mappings, while true results in “target to source”
- Parameters:
equivalences – a value of a collection of values from the ConceptMapEquivalence ValueSet
target – identifies the value set in which a translation is sought. If there’s no target specified, the server should return all known translations.
- Returns:
a Column containing the result of the operation (an array of Coding structs).
Module contents
- class pathling.Coding(system: str, code: str, version: str | None = None, display: str | None = None, user_selected: bool | None = None)[source]
Bases:
object
A Coding represents a code in a code system. See: https://hl7.org/fhir/R4/datatypes.html#Coding
- classmethod of_snomed(code: str, version: str | None = None, display: str | None = None, user_selected: bool | None = None) Coding [source]
Creates a SNOMED Coding.
- Parameters:
code – the code
version – a URI that identifies the version of the code system
display – the display text for the Coding
user_selected – an indicator of whether the Coding was chosen directly by the user
- Returns:
a SNOMED coding with given arguments.
- class pathling.DataSource(jds: JavaObject, pc: PathlingContext)[source]
Bases:
SparkConversionsMixin
A data source that can be used to run queries against FHIR data.
- read(resource_code: str) DataFrame [source]
Reads the data for the given resource type from the data source.
- Parameters:
resource_code – A string representing the type of FHIR resource to read data from.
- Returns:
A Spark DataFrame containing the data for the given resource type.
- resource_types()[source]
Returns a list of the resource types that are available in the data source.
- Returns:
A list of strings representing the resource types.
- view(resource: str | None = None, select: Sequence[Dict] | None = None, constants: Sequence[Dict] | None = None, where: Sequence[Dict] | None = None, json: str | None = None) DataFrame [source]
Executes a SQL on FHIR view definition and returns the result as a Spark DataFrame.
- Parameters:
resource – The FHIR resource that the view is based upon, e.g. ‘Patient’ or ‘Observation’.
select – A list of columns and nested selects to include in the view.
constants – A list of constants that can be used in FHIRPath expressions.
where – A list of FHIRPath expressions that can be used to filter the view.
json – A JSON string representing the view definition, as an alternative to providing the parameters as Python objects.
- Returns:
A Spark DataFrame containing the results of the view.
- class pathling.DataSources(pathling: PathlingContext)[source]
Bases:
SparkConversionsMixin
A factory for creating data sources.
- NDJSON_EXTENSION = 'ndjson'
- NDJSON_MIMETYPE = 'application/fhir+ndjson'
- bulk(fhir_endpoint_url: str, output_dir: str | None = None, overwrite: bool = True, group_id: str | None = None, patients: List[str] | None = None, since: datetime | None = None, types: List[str] | None = None, elements: List[str] | None = None, include_associated_data: List[str] | None = None, type_filters: List[str] | None = None, timeout: int | None = None, max_concurrent_downloads: int = 10, auth_config: Dict | None = None) DataSource [source]
Creates a data source from a FHIR Bulk Data Access API endpoint. Currently only supports bulk export in the ndjson format.
- Parameters:
fhir_endpoint_url – The URL of the FHIR server to export from
output_dir – The directory to write the output files to. This should be a valid path in the Spark’s filesystem. If set to None, a temporary directory will be used instead.
overwrite – Whether to overwrite the output directory if it already exists. Defaults to True.
group_id – Optional group ID for group-level export
patients – Optional list of patient references for patient-level export
since – Only include resources modified after this timestamp
types – List of FHIR resource types to include
elements – List of FHIR elements to include
include_associated_data – Pre-defined set of FHIR resources to include
type_filters – FHIR search queries to filter resources
timeout – Optional timeout duration in seconds
max_concurrent_downloads – Maximum number of concurrent downloads. Defaults to 10
auth_config – Optional authentication configuration dictionary with the following possible keys: - enabled: Whether authentication is enabled (default: False) - client_id: The client ID to use for authentication - private_key_jwk: The private key in JWK format - client_secret: The client secret to use for authentication - token_endpoint: The token endpoint URL - use_smart: Whether to use SMART authentication (default: True) - use_form_for_basic_auth: Whether to use form-based basic auth (default: False) - scope: The scope to request - token_expiry_tolerance: The token expiry tolerance in seconds (default: 120)
- Returns:
A DataSource object that can be used to run queries against the data
- bundles(path: str, resource_types: Sequence[str], mime_type: str = 'application/fhir+json') DataSource [source]
Creates a data source from a directory containing FHIR bundles.
- Parameters:
path – The URI of the directory containing the bundles.
resource_types – A sequence of resource type codes that should be extracted from the bundles.
mime_type – The MIME type of the bundles. Defaults to application/fhir+json.
- Returns:
A DataSource object that can be used to run queries against the data.
- datasets(resources: Dict[str, DataFrame]) DataSource [source]
Creates an immutable, ad-hoc data source from a dictionary of Spark DataFrames indexed with resource type codes.
- Parameters:
resources – A dictionary of Spark DataFrames, where the keys are resource type codes and the values are the data frames containing the resource data.
- Returns:
A DataSource object that can be used to run queries against the data.
- delta(path: str) DataSource [source]
Creates a data source from a directory containing Delta tables, as used by Pathling Server for persistence. Each table must be named according to the name of the resource type that it stores.
- Parameters:
path – The URI of the directory containing the Delta tables.
- Returns:
A DataSource object that can be used to run queries against the data.
- ndjson(path, extension: str | None = None, file_name_mapper: Callable[[str], Sequence[str]] | None = None) DataSource [source]
Creates a data source from a directory containing NDJSON files. The files must be named with the resource type code and must have the “.ndjson” extension, e.g. “Patient.ndjson” or “Observation.ndjson”.
- Parameters:
path – The URI of the directory containing the NDJSON files.
extension – The file extension to use when searching for files. Defaults to “ndjson”.
file_name_mapper – An optional function that maps a filename to the set of resource types that it contains.
- Returns:
A DataSource object that can be used to run queries against the data.
- parquet(path: str) DataSource [source]
Creates a data source from a directory containing Parquet tables. Each table must be named according to the name of the resource type that it stores.
- Parameters:
path – The URI of the directory containing the Parquet tables.
- Returns:
A DataSource object that can be used to run queries against the data.
- tables(schema: str | None = None) DataSource [source]
Creates a data source from a set of Spark tables, where the table names are the resource type codes.
- Parameters:
schema – An optional schema name that should be used to qualify the table names.
- Returns:
A DataSource object that can be used to run queries against the data.
- class pathling.Equivalence[source]
Bases:
object
Concept map equivalences, see https://www.hl7.org/fhir/R4/valueset-concept-map-equivalence.html.
- DISJOINT = 'disjoint'
- EQUAL = 'equal'
- EQUIVALENT = 'equivalent'
- INEXACT = 'inexact'
- NARROWER = 'narrower'
- RELATEDTO = 'relatedto'
- SPECIALIZES = 'specializes'
- SUBSUMES = 'subsumes'
- UNMATCHED = 'unmatched'
- WIDER = 'wider'
- class pathling.Expression(expression: str, label: str | None = None)[source]
Bases:
object
Represents an FHIRPath expression that may have an optional name/alias. To make it easier to work with expressions, uses can alias this class with their own name, for example: ‘exp’ or ‘fp’ using import and then use the alias method to create labeled expressions. For example:
` from pathling import Expression as fp fp('some FHIRPath expression').alias('some name') `
- alias(label: str) Expression [source]
Creates a new Expression object with the specified label/alias.
- Parameters:
label – The label/alias to use for the new Expression object.
- Returns:
A new Expression object with the specified label/alias.
- classmethod as_expression(exp_or_str: Expression | str) Expression [source]
Casts the specified expression or string into an Expression object.
- Parameters:
exp_or_str – The expression or string to cast.
- Returns:
An Expression object.
- classmethod as_expression_sequence(sequence_of_exp_or_str: Sequence[Expression | str]) Sequence[Expression] [source]
Cast a sequence of expressions or strings into a sequence of Expression objects.
- Parameters:
sequence_of_exp_or_str – The sequence of expressions or strings to cast.
- Returns:
A sequence of Expression objects.
- as_tuple() Tuple [source]
Gets a tuple representing the expression and its optional label/alias.
- Returns:
A tuple representing the expression and its optional label/alias.
- property expression: str
Gets the FHIRPath expression.
- Returns:
The FHIRPath expression.
- property label: str | None
Gets the optional label/alias for the expression.
- Returns:
The optional label/alias for the expression.
- class pathling.MimeType[source]
Bases:
object
Constants for FHIR encoding mime types.
- FHIR_JSON: str = 'application/fhir+json'
- FHIR_XML: str = 'application/fhir+xml'
- class pathling.PathlingContext(spark: SparkSession, jpc: JavaObject)[source]
Bases:
object
Main entry point for Pathling API functionality. Should be instantiated with the
PathlingContext.create()
class method.Example use:
pc = PathlingContext.create(spark) patient_df = pc.encode(spark.read.text('ndjson_resources'), 'Patient') patient_df.show()
- classmethod create(spark: SparkSession | None = None, max_nesting_level: int | None = 3, enable_extensions: bool | None = False, enabled_open_types: Sequence[str] | None = ('boolean', 'code', 'date', 'dateTime', 'decimal', 'integer', 'string', 'Coding', 'CodeableConcept', 'Address', 'Identifier', 'Reference'), enable_terminology: bool | None = True, terminology_server_url: str | None = 'https://tx.ontoserver.csiro.au/fhir', terminology_verbose_request_logging: bool | None = False, terminology_socket_timeout: int | None = 60000, max_connections_total: int | None = 32, max_connections_per_route: int | None = 16, terminology_retry_enabled: bool | None = True, terminology_retry_count: int | None = 2, enable_cache: bool | None = True, cache_max_entries: int | None = 200000, cache_storage_type: str | None = 'memory', cache_storage_path: str | None = None, cache_default_expiry: int | None = 600, cache_override_expiry: int | None = None, token_endpoint: str | None = None, enable_auth: bool | None = False, client_id: str | None = None, client_secret: str | None = None, scope: str | None = None, token_expiry_tolerance: int | None = 120, accept_language: str | None = None, enable_delta=False, enable_remote_debugging: bool | None = False, debug_port: int | None = 5005, debug_suspend: bool | None = True) PathlingContext [source]
Creates a
PathlingContext
with the given configuration options. This should only be done once within a SparkSession - subsequent calls with different configuration may produce an error.If no SparkSession is provided, and there is not one already present in this process - a new SparkSession will be created.
If a SparkSession is not provided, and one is already running within the current process, it will be reused - and it is assumed that the Pathling library API JAR is already on the classpath. If you are running your own cluster, make sure it is on the list of packages.
If a SparkSession is provided, it needs to include the Pathling library API JAR on its classpath. You can get the path for the JAR (which is bundled with the Python package) using the pathling.etc.find_jar method.
- Parameters:
spark – a pre-configured
SparkSession
instance, use this if you need to control the way that the session is set upmax_nesting_level –
controls the maximum depth of nested element data that is encoded upon import. This affects certain elements within FHIR resources that contain recursive references, e.g. QuestionnaireResponse.item.
enable_extensions – enables support for FHIR extensions
enabled_open_types – the list of types that are encoded within open types, such as extensions. This default list was taken from the data types that are common to extensions found in widely-used IGs, such as the US and AU base profiles. In general, you will get the best query performance by encoding your data with the shortest possible list.
enable_terminology – enables the use of terminology functions
terminology_server_url – the endpoint of a FHIR terminology service (R4) that the server can use to resolve terminology queries. The default server is suitable for testing purposes only.
terminology_verbose_request_logging – setting this option to True will enable additional logging of the details of requests to the terminology service. Note that logging is subject to the Spark logging level, which you can set using SparkContext.setLogLevel. Verbose request logging is sent to the DEBUG logging level.
terminology_socket_timeout – the maximum period (in milliseconds) that the server should wait for incoming data from the HTTP service
max_connections_total – the maximum total number of connections for the client
max_connections_per_route – the maximum number of connections per route for the client
terminology_retry_enabled – controls whether terminology requests that fail for possibly transient reasons (network connections, DNS problems) should be retried
terminology_retry_count – the number of times to retry failed terminology requests
enable_cache – set this to false to disable caching of terminology requests (not recommended)
cache_max_entries – sets the maximum number of entries that will be held in memory
cache_storage_type – the type of storage to use for the terminology cache. See StorageType.
cache_storage_path – the path on disk to use for the cache, required when cache_storage_type is disk
cache_default_expiry – the default expiry time for cache entries (in seconds), used when the server does not provide an expiry value
cache_override_expiry – if provided, this value overrides the expiry time provided by the terminology server
enable_auth – enables authentication of requests to the terminology server
token_endpoint – an OAuth2 token endpoint for use with the client credentials grant
client_id – a client ID for use with the client credentials grant
client_secret – a client secret for use with the client credentials grant
scope – a scope value for use with the client credentials grant
token_expiry_tolerance – the minimum number of seconds that a token should have before expiry when deciding whether to send it with a terminology request
accept_language – the default value of the Accept-Language HTTP header passed to the terminology server. The value may contain multiple languages, with weighted preferences as defined in https://www.rfc-editor.org/rfc/rfc9110.html#name-accept-language. If not provided, the header is not sent. The server can use the header to return the result in the preferred language if it is able. The actual behaviour may depend on the server implementation and the code systems used.
enable_delta – enables the use of Delta for storage of FHIR data. Only supported when no SparkSession is provided.
enable_remote_debugging – enables remote debugging for the JVM process.
debug_port – the port for the debugger to listen on (default: 5005)
debug_suspend – if true, the JVM will suspend until a debugger is attached
- Returns:
a
PathlingContext
instance initialized with the specified configuration
- encode(df: DataFrame, resource_name: str, input_type: str | None = None, column: str | None = None) DataFrame [source]
Takes a dataframe with a string representations of FHIR resources in the given column and encodes the resources of the given types as Spark dataframe.
- Parameters:
df – a
DataFrame
containing the resources to encode.resource_name – the name of the FHIR resource to extract (Condition, Observation, etc.)
input_type – the mime type of input string encoding. Defaults to application/fhir+json.
column – the column in which the resources to encode are stored. If ‘None’ then the input dataframe is assumed to have one column of type string.
- Returns:
a
DataFrame
containing the given type of resources encoded into Spark columns
- encode_bundle(df: DataFrame, resource_name: str, input_type: str | None = None, column: str | None = None) DataFrame [source]
Takes a dataframe with a string representations of FHIR bundles in the given column and encodes the resources of the given types as Spark dataframe.
- Parameters:
df – a
DataFrame
containing the bundles with the resources to encode.resource_name – the name of the FHIR resource to extract (Condition, Observation, etc.)
input_type – the MIME type of the input string encoding. Defaults to application/fhir+json.
column – the column in which the resources to encode are stored. If ‘None’ then the input dataframe is assumed to have one column of type string.
- Returns:
a
DataFrame
containing the given type of resources encoded into Spark columns
- property read: DataSources
Provides access to the instance of
DataSource
factory.
- property spark: SparkSession
Returns the SparkSession associated with this context.
- class pathling.PropertyType[source]
Bases:
object
Allowed property types.
- BOOLEAN = 'boolean'
- CODE = 'code'
- CODING = 'Coding'
- DATETIME = 'dateTime'
- DECIMAL = 'decimal'
- INTEGER = 'integer'
- STRING = 'string'
- class pathling.VariableExpression(expression: str, label: str | None, when_many: str | None = 'array')[source]
Bases:
Expression
Represents a FHIRPath variable expression that has a when many attribute, and may have an optional name/alias.
- property when_many: str | None
Gets the optional when many attribute for the expression.
- Returns:
The optional when many attribute for the expression.
- pathling.designation(coding: Column | str | Coding, use: Column | str | Coding | None = None, language: str | None = None) Column [source]
Takes a Coding column as its input. Returns the Column, which contains the values of designations (strings) for this coding for the specified use and language. If the language is not provided (is null) then all designations with the specified type are returned regardless of their language.
- Parameters:
coding – a Column containing a struct representation of a Coding
use – the code with the use of the designations
language – the language of the designations
- Returns:
the Column containing the result of the operation (array of strings with designation values)
- pathling.display(coding: Column | str | Coding, accept_language: str | None = None) Column [source]
Takes a Coding column as its input. Returns the Column, which contains the canonical display name associated with the given code.
- Parameters:
coding – a Column containing a struct representation of a Coding.
accept_language – the optional language preferences for the returned display name. Overrides the parameter accept_language in
PathlingContext.create
.
- Returns:
a Column containing the result of the operation (String).
- pathling.member_of(coding: Column | str | Coding, value_set_uri: str) Column [source]
Takes a Coding or array of Codings column as its input. Returns the column which contains a Boolean value, indicating whether any of the input Codings is the member of the specified FHIR ValueSet.
- Parameters:
coding – a Column containing a struct representation of a Coding or an array of such structs.
value_set_uri – an identifier for a FHIR ValueSet
- Returns:
a Column containing the result of the operation.
- pathling.property_of(coding: Column | str | Coding, property_code: str, property_type: str = 'string', accept_language: str | None = None) Column [source]
Takes a Coding column as its input. Returns the Column, which contains the values of properties for this coding with specified names and types. The type of the result column depends on the types of the properties. Primitive FHIR types are mapped to their corresponding SQL primitives. Complex types are mapped to their corresponding structs. The allowed property types are: code | Coding | string | integer | boolean | dateTime | decimal. See also
PropertyType
.- Parameters:
coding – a Column containing a struct representation of a Coding
property_code – the code of the property to retrieve.
property_type – the type of the property to retrieve.
accept_language – the optional language preferences for the returned property values. Overrides the parameter accept_language in
PathlingContext.create
.
- Returns:
the Column containing the result of the operation (array of property values)
- pathling.subsumed_by(left_coding: Column | str | Coding, right_coding: Column | str | Coding) Column [source]
Takes two Coding columns as input. Returns the Column, which contains a Boolean value, indicating whether the left Coding is subsumed by the right Coding.
- Parameters:
left_coding – a Column containing a struct representation of a Coding or an array of Codings.
right_coding – a Column containing a struct representation of a Coding or an array of Codings.
- Returns:
a Column containing the result of the operation (boolean).
- pathling.subsumes(left_coding: Column | str | Coding, right_coding: Column | str | Coding) Column [source]
- Takes two Coding columns as input. Returns the Column, which contains a
Boolean value, indicating whether the left Coding subsumes the right Coding.
- Parameters:
left_coding – a Column containing a struct representation of a Coding or an array of Codings.
right_coding – a Column containing a struct representation of a Coding or an array of Codings.
- Returns:
a Column containing the result of the operation (boolean).
- pathling.to_coding(coding_column: Column, system: str, version: str | None = None) Column [source]
Converts a Column containing codes into a Column that contains a Coding struct. The Coding struct Column can be used as an input to terminology functions such as member_of and translate.
- Parameters:
coding_column – the Column containing the codes
system – the URI of the system the codes belong to
version – the version of the code system
- Returns:
a Column containing a Coding struct
- pathling.to_ecl_value_set(ecl: str) str [source]
Converts a SNOMED CT ECL expression into a FHIR ValueSet URI. Can be used with the member_of function.
- Parameters:
ecl – the ECL expression
- Returns:
the ValueSet URI
- pathling.to_snomed_coding(coding_column: Column, version: str | None = None) Column [source]
Converts a Column containing codes into a Column that contains a SNOMED Coding struct. The Coding struct Column can be used as an input to terminology functions such as member_of and translate.
- Parameters:
coding_column – the Column containing the codes
version – the version of the code system
- Returns:
a Column containing a Coding struct
- pathling.translate(coding: Column | str | Coding, concept_map_uri: str, reverse: bool = False, equivalences: str | Collection[str] | None = None, target: str | None = None) Column [source]
Takes a Coding column as input. Returns the Column which contains an array of Coding value with translation targets from the specified FHIR ConceptMap. There may be more than one target concept for each input concept. Only the translation with the specified equivalences are returned. See also
Equivalence
. :param coding: a Column containing a struct representation of a Coding :param concept_map_uri: an identifier for a FHIR ConceptMap :param reverse: the direction to traverse the map - false results in “source to target”mappings, while true results in “target to source”
- Parameters:
equivalences – a value of a collection of values from the ConceptMapEquivalence ValueSet
target – identifies the value set in which a translation is sought. If there’s no target specified, the server should return all known translations.
- Returns:
a Column containing the result of the operation (an array of Coding structs).