# Copyright 2023 Commonwealth Scientific and Industrial Research
# Organisation (CSIRO) ABN 41 687 119 230.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Callable, Sequence, Tuple, Optional, Union
from py4j.java_collections import SetConverter
from py4j.java_gateway import JavaObject
from pyspark.sql import DataFrame, SparkSession
[docs]class Function:
"""
Wraps a Python lambda function so that it can be passed to Java functions
that expect a `java.util.function.Function` object.
:param lambda_function: A Python lambda function that takes one argument.
:param spark: A `pyspark.sql.SparkSession` object.
"""
def __init__(self, lambda_function, spark):
self._lambda_function = lambda_function
self._jvm = spark._jvm
def _wrap_result(self, result: Any) -> Any:
# we need to manually convert the python object result to the java object
# result, so that it can be returned to the java side and py4j does not seem to be doing
# it automatically. So we just call the identity() to force the conversion.
return self._jvm.java.util.function.Function.identity().apply(result)
[docs] def apply(self, arg):
"""
Invokes the wrapped lambda function with the given argument.
:param arg: The argument to pass to the lambda function.
:return: The result of the lambda function, converted to a Java object.
"""
return self._wrap_result(self._lambda_function(arg))
[docs] class Java:
implements = ["java.util.function.Function"]
[docs]class SparkConversionsMixin:
"""
A mixin that provides access to the Spark session and a number for utility methods for
converting between Python and Java objects.
"""
def __init__(self, spark: SparkSession):
self._spark = spark
@property
def spark(self) -> SparkSession:
return self._spark
def _wrap_df(self, jdf: JavaObject) -> DataFrame:
#
# Before Spark v3.3 Dataframes were constructs with SQLContext, which was available
# in `_wrapped` attribute of SparkSession.
# Since v3.3 Dataframes are constructed with SparkSession instance directly.
#
return DataFrame(
jdf,
self._spark._wrapped if hasattr(self._spark, "_wrapped") else self._spark,
)
def _lambda_to_function(self, lambda_function: Callable) -> Function:
return Function(lambda_function, self._spark)
ExpOrStr = Union["Expression", str]
[docs]class Expression:
"""
Represents an FHIRPath expression that may have an optional name/alias.
To make it easier to work with expressions, uses can alias this class with their own name,
for example: 'exp' or 'fp' using import and then use the alias method to create labeled
expressions. For example:
```
from pathling import Expression as fp
fp('some FHIRPath expression').alias('some name')
```
"""
def __init__(self, expression: str, label: Optional[str] = None):
"""
Initializes a new instance of the Expression class.
:param expression: The FHIRPath expression.
:param label: The optional label/alias for the expression.
"""
self._expression = expression
self._label = label
@property
def expression(self) -> str:
"""
Gets the FHIRPath expression.
:return: The FHIRPath expression.
"""
return self._expression
@property
def label(self) -> Optional[str]:
"""
Gets the optional label/alias for the expression.
:return: The optional label/alias for the expression.
"""
return self._label
[docs] def alias(self, label: str) -> "Expression":
"""
Creates a new Expression object with the specified label/alias.
:param label: The label/alias to use for the new Expression object.
:return: A new Expression object with the specified label/alias.
"""
return Expression(self.expression, label)
[docs] def as_tuple(self) -> Tuple:
"""
Gets a tuple representing the expression and its optional label/alias.
:return: A tuple representing the expression and its optional label/alias.
"""
return (self.expression, self.label) if self.label else (self.expression,)
[docs] @classmethod
def as_expression(cls, exp_or_str: ExpOrStr) -> "Expression":
"""
Casts the specified expression or string into an Expression object.
:param exp_or_str: The expression or string to cast.
:return: An Expression object.
"""
if isinstance(exp_or_str, Expression):
return exp_or_str
return Expression(exp_or_str)
[docs] @classmethod
def as_expression_sequence(
cls, sequence_of_exp_or_str: Sequence[ExpOrStr]
) -> "Sequence[Expression]":
"""
Cast a sequence of expressions or strings into a sequence of Expression objects.
:param sequence_of_exp_or_str: The sequence of expressions or strings to cast.
:return: A sequence of Expression objects.
"""
return tuple(
cls.as_expression(exp_or_str) for exp_or_str in sequence_of_exp_or_str
)
[docs]class StringMapper:
"""
A wrapper for a Python lambda that can be passed as a Java lambda for mapping a string value to
another string value.
"""
def __init__(self, gateway, fn):
self._gateway = gateway
self._fn = fn
[docs] def apply(self, arg):
return self._fn(arg)
[docs] class Java:
implements = ["java.util.function.UnaryOperator"]
[docs]class StringToStringSetMapper:
"""
A wrapper for a Python lambda that can be passed as a Java lambda for mapping a string value
to a list of string values.
"""
def __init__(self, gateway, fn):
self._gateway = gateway
self._fn = fn
[docs] def apply(self, arg):
return SetConverter().convert(self._fn(arg), self._gateway)
[docs] class Java:
implements = ["java.util.function.Function"]