#
# Copyright © 2018-2026 Commonwealth Scientific and Industrial Research
# Organisation (CSIRO) ABN 41 687 119 230.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Lazy Spark session and Pathling context creation for the CLI.
The Spark session is created only when a command actually needs it, behind a
status spinner on stderr. Spark and JVM logging is suppressed by default by
pointing the driver JVM at a packaged log4j2 configuration before launch, and
lowering the log level once the context exists; ``--verbose`` leaves logging at
its defaults.
Author: John Grimes.
"""
import atexit
import os
import sys
from contextlib import ExitStack
from importlib.resources import as_file, files
from typing import Optional
from rich.console import Console
from pathling.cli.config import CliConfig
from pathling.cli.render import progress_status, stderr_console
# The packaged log4j2 configuration that silences Spark and JVM logging. It is
# supplied to the driver JVM before launch so that startup noise, and
# task-failure stack traces, are suppressed (the CLI surfaces its own concise
# error message instead). The level is raised again by ``--verbose``.
_QUIET_LOG4J2_RESOURCE = "quiet-log4j2.properties"
# Keeps the materialised packaged resource path valid for the process lifetime.
# For a directory-installed wheel the resource is a real file and needs no
# cleanup; for a zipped install it is a temporary extraction removed when the
# stack closes at interpreter exit. Either way no per-run temporary file is left
# behind, unlike the previous NamedTemporaryFile approach (FR-017).
_RESOURCE_STACK = ExitStack()
atexit.register(_RESOURCE_STACK.close)
[docs]def quiet_log4j2_path() -> str:
"""Resolves a filesystem path to the packaged quiet log4j2 configuration.
The configuration ships as static package data, so no per-run temporary file
is created. The materialised path is kept valid for the process lifetime so
the driver JVM can read it during and after session start.
:return: the filesystem path to the quiet log4j2 properties file.
"""
resource = files("pathling.cli").joinpath("resources", _QUIET_LOG4J2_RESOURCE)
path = _RESOURCE_STACK.enter_context(as_file(resource))
return str(path)
def _build_quiet_spark(config: CliConfig):
"""Builds a Spark session with logging suppressed before JVM launch.
The Pathling and Delta package configuration is shared with
:func:`pathling.context._build_spark_session`; this function only adds the
CLI-specific quiet-logging options on top.
:param config: the resolved CLI configuration.
:return: a configured :class:`SparkSession`.
"""
from pathling.context import _build_spark_session
# Pin Spark's Python workers to the interpreter running the CLI. Without
# this, Spark launches workers using whatever ``python3`` is first on the
# PATH, which may be a different minor version from the driver and causes a
# PYTHON_VERSION_MISMATCH error on any operation that uses Python workers.
os.environ.setdefault("PYSPARK_PYTHON", sys.executable)
extra_configs = {
# Enable Arrow-based columnar transfer so that any ``toPandas`` call in
# the interactive console collects via Arrow record batches rather than
# pickling rows one at a time. File output no longer collects to the
# driver - Spark writes it directly - so this setting now serves only
# the interactive console. Set before the user overlay below so an
# explicit --spark-conf value still wins.
"spark.sql.execution.arrow.pyspark.enabled": "true",
}
if not config.verbose:
log4j2_path = quiet_log4j2_path()
extra_configs["spark.driver.extraJavaOptions"] = (
f"-Dlog4j2.configurationFile=file:{log4j2_path}"
)
extra_configs["spark.ui.showConsoleProgress"] = "false"
# Overlay the user-supplied Spark configuration last so that a user value
# wins over the CLI's quiet-logging options for the same key (FR-012). A
# user-set spark.driver.extraJavaOptions therefore replaces the quiet
# log4j2 option, leaving Spark logging at its defaults.
extra_configs.update(config.spark_conf)
return _build_spark_session(extra_configs)
def _create_pathling_context(config: CliConfig):
"""Builds the Spark session and Pathling context from the configuration.
:param config: the resolved CLI configuration.
:return: a configured :class:`PathlingContext`.
"""
from pathling import PathlingContext
spark = _build_quiet_spark(config)
if not config.verbose:
# In local mode the driver JVM is launched before builder-level Java
# options apply, so log suppression relies on lowering the level here.
# OFF (rather than ERROR) also hides task-failure stack traces, which
# the CLI surfaces as its own concise message.
spark.sparkContext.setLogLevel("OFF")
auth = config.tx_auth
return PathlingContext.create(
spark,
terminology_server_url=config.tx_server,
enable_auth=bool(auth and auth.enabled),
token_endpoint=auth.token_endpoint if auth else None,
client_id=auth.client_id if auth else None,
client_secret=auth.client_secret if auth else None,
scope=auth.scope if auth else None,
)
[docs]def create_context(config: CliConfig, console: Optional[Console] = None):
"""Creates a :class:`PathlingContext` configured from the CLI settings.
In the default (non-verbose) mode the JVM launcher's startup banner and Ivy
dependency-resolution report - which Spark prints to file descriptor 2
before any log configuration can take effect in local mode - are swallowed
by redirecting that descriptor for the duration of session creation, while
the status spinner is routed to a preserved copy of the real stderr so that
progress remains visible.
:param config: the resolved CLI configuration.
:param console: the stderr console for the status spinner; created when
None.
:return: a configured :class:`PathlingContext`.
"""
console = console or stderr_console()
if config.verbose:
with progress_status(console, "Starting Spark session...", True):
return _create_pathling_context(config)
# Preserve the real stderr for the spinner, then point fd 2 at /dev/null so
# the launcher's banner is discarded during session creation.
saved_fd = os.dup(2)
devnull_fd = os.open(os.devnull, os.O_WRONLY)
spinner_stream = os.fdopen(os.dup(saved_fd), "w")
spinner_console = Console(file=spinner_stream, markup=False, highlight=False)
try:
os.dup2(devnull_fd, 2)
with progress_status(spinner_console, "Starting Spark session...", False):
return _create_pathling_context(config)
finally:
os.dup2(saved_fd, 2)
os.close(saved_fd)
os.close(devnull_fd)
spinner_stream.close()