Source code for pathling.context

#  Copyright © 2018-2025 Commonwealth Scientific and Industrial Research
#  Organisation (CSIRO) ABN 41 687 119 230.
# 
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

# noinspection PyPackageRequirements

from py4j.java_gateway import JavaObject
from pyspark.sql import DataFrame, SparkSession
from typing import Optional, Sequence, TYPE_CHECKING

from pathling._version import (
    __java_version__,
    __scala_version__,
    __delta_version__,
    __hadoop_version__,
)
from pathling.fhir import MimeType

if TYPE_CHECKING:
    from .datasource import DataSources

__all__ = ["PathlingContext"]


[docs]class StorageType: MEMORY: str = "memory" DISK: str = "disk"
# noinspection PyProtectedMember
[docs]class PathlingContext: """ Main entry point for Pathling API functionality. Should be instantiated with the :func:`PathlingContext.create` class method. Example use:: pc = PathlingContext.create(spark) patient_df = pc.encode(spark.read.text('ndjson_resources'), 'Patient') patient_df.show() """ @property def spark(self) -> SparkSession: """ Returns the SparkSession associated with this context. """ return self._spark
[docs] @classmethod def create( cls, spark: Optional[SparkSession] = None, max_nesting_level: Optional[int] = 3, enable_extensions: Optional[bool] = False, enabled_open_types: Optional[Sequence[str]] = ( "boolean", "code", "date", "dateTime", "decimal", "integer", "string", "Coding", "CodeableConcept", "Address", "Identifier", "Reference", ), enable_terminology: Optional[bool] = True, terminology_server_url: Optional[str] = "https://tx.ontoserver.csiro.au/fhir", terminology_verbose_request_logging: Optional[bool] = False, terminology_socket_timeout: Optional[int] = 60_000, max_connections_total: Optional[int] = 32, max_connections_per_route: Optional[int] = 16, terminology_retry_enabled: Optional[bool] = True, terminology_retry_count: Optional[int] = 2, enable_cache: Optional[bool] = True, cache_max_entries: Optional[int] = 200_000, cache_storage_type: Optional[str] = StorageType.MEMORY, cache_storage_path: Optional[str] = None, cache_default_expiry: Optional[int] = 600, cache_override_expiry: Optional[int] = None, token_endpoint: Optional[str] = None, enable_auth: Optional[bool] = False, client_id: Optional[str] = None, client_secret: Optional[str] = None, scope: Optional[str] = None, token_expiry_tolerance: Optional[int] = 120, accept_language: Optional[str] = None, enable_delta=False, enable_remote_debugging: Optional[bool] = False, debug_port: Optional[int] = 5005, debug_suspend: Optional[bool] = True, ) -> "PathlingContext": """ Creates a :class:`PathlingContext` with the given configuration options. This should only be done once within a SparkSession - subsequent calls with different configuration may produce an error. If no SparkSession is provided, and there is not one already present in this process - a new SparkSession will be created. If a SparkSession is not provided, and one is already running within the current process, it will be reused - and it is assumed that the Pathling library API JAR is already on the classpath. If you are running your own cluster, make sure it is on the list of packages. If a SparkSession is provided, it needs to include the Pathling library API JAR on its classpath. You can get the path for the JAR (which is bundled with the Python package) using the `pathling.etc.find_jar` method. :param spark: a pre-configured :class:`SparkSession` instance, use this if you need to control the way that the session is set up :param max_nesting_level: controls the maximum depth of nested element data that is encoded upon import. This affects certain elements within FHIR resources that contain recursive references, e.g. `QuestionnaireResponse.item <https://hl7.org/fhir/R4/questionnaireresponse.html>`_. :param enable_extensions: enables support for FHIR extensions :param enabled_open_types: the list of types that are encoded within open types, such as extensions. This default list was taken from the data types that are common to extensions found in widely-used IGs, such as the US and AU base profiles. In general, you will get the best query performance by encoding your data with the shortest possible list. :param enable_terminology: enables the use of terminology functions :param terminology_server_url: the endpoint of a FHIR terminology service (R4) that the server can use to resolve terminology queries. The default server is suitable for testing purposes only. :param terminology_verbose_request_logging: setting this option to `True` will enable additional logging of the details of requests to the terminology service. Note that logging is subject to the Spark logging level, which you can set using `SparkContext.setLogLevel`. Verbose request logging is sent to the `DEBUG` logging level. :param terminology_socket_timeout: the maximum period (in milliseconds) that the server should wait for incoming data from the HTTP service :param max_connections_total: the maximum total number of connections for the client :param max_connections_per_route: the maximum number of connections per route for the client :param terminology_retry_enabled: controls whether terminology requests that fail for possibly transient reasons (network connections, DNS problems) should be retried :param terminology_retry_count: the number of times to retry failed terminology requests :param enable_cache: set this to false to disable caching of terminology requests (not recommended) :param cache_max_entries: sets the maximum number of entries that will be held in memory :param cache_storage_type: the type of storage to use for the terminology cache. See `StorageType`. :param cache_storage_path: the path on disk to use for the cache, required when `cache_storage_type` is `disk` :param cache_default_expiry: the default expiry time for cache entries (in seconds), used when the server does not provide an expiry value :param cache_override_expiry: if provided, this value overrides the expiry time provided by the terminology server :param enable_auth: enables authentication of requests to the terminology server :param token_endpoint: an OAuth2 token endpoint for use with the client credentials grant :param client_id: a client ID for use with the client credentials grant :param client_secret: a client secret for use with the client credentials grant :param scope: a scope value for use with the client credentials grant :param token_expiry_tolerance: the minimum number of seconds that a token should have before expiry when deciding whether to send it with a terminology request :param accept_language: the default value of the Accept-Language HTTP header passed to the terminology server. The value may contain multiple languages, with weighted preferences as defined in https://www.rfc-editor.org/rfc/rfc9110.html#name-accept-language. If not provided, the header is not sent. The server can use the header to return the result in the preferred language if it is able. The actual behaviour may depend on the server implementation and the code systems used. :param enable_delta: enables the use of Delta for storage of FHIR data. Only supported when no SparkSession is provided. :param enable_remote_debugging: enables remote debugging for the JVM process. :param debug_port: the port for the debugger to listen on (default: 5005) :param debug_suspend: if true, the JVM will suspend until a debugger is attached :return: a :class:`PathlingContext` instance initialized with the specified configuration """ def _new_spark_session(): spark_builder = ( SparkSession.builder.config( "spark.jars.packages", f"au.csiro.pathling:library-runtime:{__java_version__}," f"io.delta:delta-spark_{__scala_version__}:{__delta_version__}," ) .config( "spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension" ) .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) ) # Add remote debugging configuration if enabled if enable_remote_debugging: suspend_option = "y" if debug_suspend else "n" debug_options = f"-agentlib:jdwp=transport=dt_socket,server=y,suspend={suspend_option},address={debug_port}" spark_builder = spark_builder.config("spark.driver.extraJavaOptions", debug_options) return spark_builder.getOrCreate() spark = spark or SparkSession.getActiveSession() or _new_spark_session() jvm = spark._jvm # Build an encoders configuration object from the provided parameters. encoders_config = ( jvm.au.csiro.pathling.config.EncodingConfiguration.builder() .maxNestingLevel(max_nesting_level) .enableExtensions(enable_extensions) .openTypes(jvm.java.util.HashSet(enabled_open_types)) .build() ) # Build a terminology client configuration object from the provided parameters. client_config = ( jvm.au.csiro.pathling.config.HttpClientConfiguration.builder() .socketTimeout(terminology_socket_timeout) .maxConnectionsTotal(max_connections_total) .maxConnectionsPerRoute(max_connections_per_route) .retryEnabled(terminology_retry_enabled) .retryCount(terminology_retry_count) .build() ) # Build a terminology cache configuration object from the provided parameters. cache_storage_type_enum = ( jvm.au.csiro.pathling.config.HttpClientCachingStorageType.fromCode( cache_storage_type ) ) cache_config = ( jvm.au.csiro.pathling.config.HttpClientCachingConfiguration.builder() .enabled(enable_cache) .maxEntries(cache_max_entries) .storageType(cache_storage_type_enum) .storagePath(cache_storage_path) .defaultExpiry(cache_default_expiry) .overrideExpiry(cache_override_expiry) .build() ) # Build a terminology authentication configuration object from the provided parameters. auth_config = ( jvm.au.csiro.pathling.config.TerminologyAuthConfiguration.builder() .enabled(enable_auth) .tokenEndpoint(token_endpoint) .clientId(client_id) .clientSecret(client_secret) .scope(scope) .tokenExpiryTolerance(token_expiry_tolerance) .build() ) # Build a terminology configuration object from the provided parameters. terminology_config = ( jvm.au.csiro.pathling.config.TerminologyConfiguration.builder() .enabled(enable_terminology) .serverUrl(terminology_server_url) .verboseLogging(terminology_verbose_request_logging) .client(client_config) .cache(cache_config) .authentication(auth_config) .acceptLanguage(accept_language) .build() ) jpc: JavaObject = jvm.au.csiro.pathling.library.PathlingContext.create( spark._jsparkSession, encoders_config, terminology_config ) return PathlingContext(spark, jpc)
def __init__(self, spark: SparkSession, jpc: JavaObject) -> None: self._spark: SparkSession = spark self._jpc: JavaObject = jpc def _wrap_df(self, jdf: JavaObject) -> DataFrame: # # Before Spark v3.3 Dataframes were constructs with SQLContext, which was available # in `_wrapped` attribute of SparkSession. # Since v3.3 Dataframes are constructed with SparkSession instance directly. # return DataFrame( jdf, self._spark._wrapped if hasattr(self._spark, "_wrapped") else self._spark, )
[docs] def version(self): """ :return: The version of the Pathling library. """ return self._jpc.getVersion()
[docs] def encode( self, df: DataFrame, resource_name: str, input_type: Optional[str] = None, column: Optional[str] = None, ) -> DataFrame: """ Takes a dataframe with a string representations of FHIR resources in the given column and encodes the resources of the given types as Spark dataframe. :param df: a :class:`DataFrame` containing the resources to encode. :param resource_name: the name of the FHIR resource to extract (Condition, Observation, etc.) :param input_type: the mime type of input string encoding. Defaults to `application/fhir+json`. :param column: the column in which the resources to encode are stored. If 'None' then the input dataframe is assumed to have one column of type string. :return: a :class:`DataFrame` containing the given type of resources encoded into Spark columns """ return self._wrap_df( self._jpc.encode( df._jdf, resource_name, input_type or MimeType.FHIR_JSON, column ) )
[docs] def encode_bundle( self, df: DataFrame, resource_name: str, input_type: Optional[str] = None, column: Optional[str] = None, ) -> DataFrame: """ Takes a dataframe with a string representations of FHIR bundles in the given column and encodes the resources of the given types as Spark dataframe. :param df: a :class:`DataFrame` containing the bundles with the resources to encode. :param resource_name: the name of the FHIR resource to extract (Condition, Observation, etc.) :param input_type: the MIME type of the input string encoding. Defaults to `application/fhir+json`. :param column: the column in which the resources to encode are stored. If 'None' then the input dataframe is assumed to have one column of type string. :return: a :class:`DataFrame` containing the given type of resources encoded into Spark columns """ return self._wrap_df( self._jpc.encodeBundle( df._jdf, resource_name, input_type or MimeType.FHIR_JSON, column ) )
@property def read(self) -> "DataSources": """ Provides access to the instance of :class:`DataSource` factory. """ from pathling.datasource import DataSources return DataSources(self)