#
# Copyright © 2018-2026 Commonwealth Scientific and Industrial Research
# Organisation (CSIRO) ABN 41 687 119 230.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Configuration resolution for the Pathling command line interface.
A single config file is selected by the precedence explicit ``--config`` >
project-local ``pathling.toml`` (in the current working directory) > user-level
``${XDG_CONFIG_HOME:-~/.config}/pathling/config.toml`` > none. Files are never
merged: keys absent from the chosen file fall back to built-in defaults, never
to another file. The chosen file's values then resolve with the precedence flag
> config file > built-in default. When a project-local file is discovered, a
one-line notice naming it is emitted via the ``on_notice`` callback. Secret
values for authentication may be supplied as a literal, a ``@/path/to/file``
reference, or via an environment variable.
Author: John Grimes.
"""
import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional
# The library's built-in default terminology server, mirrored here so the CLI
# can report it in error messages without starting a Spark session.
DEFAULT_TX_SERVER = "https://tx.ontoserver.csiro.au/fhir"
# The default FHIR version used when none is configured.
DEFAULT_FHIR_VERSION = "R4"
# The FHIR versions the CLI accepts.
SUPPORTED_FHIR_VERSIONS = ("R4",)
# The name of the project-local config file discovered in the current working
# directory, deliberately distinct from the user-level ``config.toml`` so the
# two are never confused.
PROJECT_CONFIG_FILENAME = "pathling.toml"
# Valid top-level keys in the config file.
VALID_CONFIG_KEYS = frozenset(
{"tx-server", "fhir-version", "terminology-auth", "bulk-auth", "spark"}
)
# Valid keys within the [terminology-auth] and [bulk-auth] tables.
VALID_AUTH_KEYS = frozenset(
{"client-id", "client-secret", "private-key-jwk", "token-endpoint", "scope"}
)
[docs]@dataclass
class TxAuth:
"""Terminology server authentication settings.
:param client_id: the OAuth2 client identifier.
:param client_secret: the resolved client secret, or None.
:param token_endpoint: the OAuth2 token endpoint.
:param scope: an optional OAuth2 scope.
"""
client_id: Optional[str] = None
client_secret: Optional[str] = None
token_endpoint: Optional[str] = None
scope: Optional[str] = None
@property
def enabled(self) -> bool:
"""Whether enough has been supplied to attempt authentication.
:return: True when a client identifier and token endpoint are present.
"""
return bool(self.client_id and self.token_endpoint)
[docs]@dataclass
class BulkAuth:
"""SMART backend services authentication settings for bulk export.
:param client_id: the OAuth2 client identifier.
:param private_key_jwk: the resolved private key JWK, or None.
:param client_secret: the resolved client secret, or None.
:param token_endpoint: the OAuth2 token endpoint.
:param scope: an optional OAuth2 scope.
"""
client_id: str
token_endpoint: Optional[str] = None
private_key_jwk: Optional[str] = None
client_secret: Optional[str] = None
scope: Optional[str] = None
@property
def mechanism(self) -> str:
"""A human-readable name for the credential mechanism in use.
:return: a description of the credential type for error messages.
"""
if self.private_key_jwk:
return "a private key JWK"
if self.client_secret:
return "a client secret"
return "no credential"
[docs]@dataclass
class CliConfig:
"""Resolved global configuration for a single invocation.
:param tx_server: the terminology server URL.
:param tx_auth: terminology authentication settings, or None.
:param fhir_version: the FHIR version code.
:param verbose: whether verbose logging and stack traces are enabled.
:param config_path: the path the config file was read from, or None.
:param spark_conf: the resolved, validated, and merged Spark configuration
map to apply when a session is built; empty when nothing is set.
:param bulk_auth_table: the parsed ``[bulk-auth]`` table from the chosen
config file, or None when absent. Carried so ``export`` resolves bulk
credentials from the already-loaded config rather than re-reading a
file.
"""
tx_server: str = DEFAULT_TX_SERVER
tx_auth: Optional[TxAuth] = None
fhir_version: str = DEFAULT_FHIR_VERSION
verbose: bool = False
config_path: Optional[Path] = None
spark_conf: dict = field(default_factory=dict)
bulk_auth_table: Optional[dict] = None
def _load_toml(path: Path) -> dict:
"""Reads and parses a TOML file using the available TOML library.
:param path: the path to the TOML file.
:return: the parsed contents as a dict.
:raises CliError: if the file cannot be read or parsed as TOML.
"""
# Prefer the standard library on Python 3.11+, fall back to tomli.
try:
import tomllib as toml_lib
except ModuleNotFoundError: # pragma: no cover - exercised on Python < 3.11.
import tomli as toml_lib
from pathling.cli.errors import CliError
try:
with open(path, "rb") as handle:
return toml_lib.load(handle)
except toml_lib.TOMLDecodeError as exc:
raise CliError(
f"Could not parse the config file at {path}: {exc}. "
"Check that it is valid TOML."
) from exc
except OSError as exc:
# A directory or an unreadable file surfaces a clear, file-naming error
# rather than a raw OSError or a silent fallback to another config file.
raise CliError(
f"Could not read the config file at {path}: {exc}. "
"Check that it is a readable file."
) from exc
[docs]def default_config_path() -> Path:
"""Computes the default config file path, honouring ``XDG_CONFIG_HOME``.
:return: the path to the default config file location.
"""
xdg = os.environ.get("XDG_CONFIG_HOME")
base = Path(xdg) if xdg else Path.home() / ".config"
return base / "pathling" / "config.toml"
[docs]def resolve_secret(
value: Optional[str],
env_var: Optional[str] = None,
env: Optional[dict] = None,
) -> Optional[str]:
"""Resolves a secret value from a literal, a ``@file`` reference, or an
environment variable.
A value beginning with ``@`` is treated as a path to a file whose stripped
contents are returned. When ``value`` is None and ``env_var`` is given, the
environment variable is consulted.
:param value: the literal value, ``@path`` reference, or None.
:param env_var: the name of a fallback environment variable, or None.
:param env: the environment mapping to read from; defaults to ``os.environ``.
:return: the resolved secret, or None when nothing is available.
:raises CliError: if a ``@file`` reference cannot be read.
"""
environment = env if env is not None else os.environ
if value is None:
if env_var is not None:
return environment.get(env_var)
return None
if value.startswith("@"):
file_path = Path(value[1:])
from pathling.cli.errors import CliError
try:
return file_path.read_text(encoding="utf-8").strip()
except OSError as exc:
raise CliError(
f"Could not read the secret file at {file_path}: {exc}. "
"Check the path and permissions."
) from exc
return value
[docs]def load_config_file(
path: Path,
on_warning: Optional[Callable[[str], None]] = None,
) -> dict:
"""Loads a config file, warning about unknown keys.
:param path: the config file path.
:param on_warning: an optional callback invoked with each warning message;
defaults to writing to stderr.
:return: the parsed config as a dict, or an empty dict when the file is
absent.
"""
if not path.exists():
return {}
warn = on_warning or (lambda message: print(message, file=sys.stderr))
data = _load_toml(path)
valid_keys = ", ".join(sorted(VALID_CONFIG_KEYS))
for key in data:
if key not in VALID_CONFIG_KEYS:
warn(
f"Ignoring unknown config key '{key}' in {path}. "
f"Valid keys are: {valid_keys}."
)
for table_name in ("terminology-auth", "bulk-auth"):
table = data.get(table_name)
if isinstance(table, dict):
valid_auth = ", ".join(sorted(VALID_AUTH_KEYS))
for key in table:
if key not in VALID_AUTH_KEYS:
warn(
f"Ignoring unknown config key '{table_name}.{key}' in "
f"{path}. Valid keys are: {valid_auth}."
)
return data
def _resolve_tx_auth(
file_data: dict,
client_id: Optional[str],
client_secret: Optional[str],
token_endpoint: Optional[str],
scope: Optional[str],
env: Optional[dict],
) -> Optional[TxAuth]:
"""Merges terminology auth settings from flags and the config file.
:param file_data: the parsed config file contents.
:param client_id: the ``--tx-client-id`` flag value, or None.
:param client_secret: the ``--tx-client-secret`` flag value, or None.
:param token_endpoint: the ``--tx-token-endpoint`` flag value, or None.
:param scope: the ``--tx-scope`` flag value, or None.
:param env: the environment mapping for secret resolution.
:return: a populated :class:`TxAuth`, or None when no auth is configured.
"""
table = file_data.get("terminology-auth") or {}
resolved_client_id = client_id or table.get("client-id")
resolved_token_endpoint = token_endpoint or table.get("token-endpoint")
resolved_scope = scope or table.get("scope")
resolved_secret = resolve_secret(
client_secret or table.get("client-secret"), None, env
)
if not any(
[resolved_client_id, resolved_token_endpoint, resolved_scope, resolved_secret]
):
return None
return TxAuth(
client_id=resolved_client_id,
client_secret=resolved_secret,
token_endpoint=resolved_token_endpoint,
scope=resolved_scope,
)
[docs]def resolve_bulk_auth(
file_bulk_auth: Optional[dict],
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
private_key_jwk: Optional[str] = None,
token_endpoint: Optional[str] = None,
scope: Optional[str] = None,
env: Optional[dict] = None,
) -> Optional[BulkAuth]:
"""Resolves bulk export authentication from flags and the config file.
Authentication is considered configured only when a client identifier is
present. Secret values are resolved as a literal, a ``@file`` reference, or
the ``PATHLING_CLIENT_SECRET`` / ``PATHLING_PRIVATE_KEY_JWK`` environment
variables.
:param file_bulk_auth: the parsed ``[bulk-auth]`` table, or None.
:param client_id: the ``--client-id`` flag value, or None.
:param client_secret: the ``--client-secret`` flag value, or None.
:param private_key_jwk: the ``--private-key-jwk`` flag value, or None.
:param token_endpoint: the ``--token-endpoint`` flag value, or None.
:param scope: the ``--scope`` flag value, or None.
:param env: the environment mapping for secret resolution.
:return: a populated :class:`BulkAuth`, or None when no auth input is given.
:raises CliError: when the auth configuration is incomplete or ambiguous.
"""
table = file_bulk_auth or {}
resolved_client_id = client_id or table.get("client-id")
resolved_token_endpoint = token_endpoint or table.get("token-endpoint")
resolved_scope = scope or table.get("scope")
resolved_secret = resolve_secret(
client_secret or table.get("client-secret"), "PATHLING_CLIENT_SECRET", env
)
resolved_jwk = resolve_secret(
private_key_jwk or table.get("private-key-jwk"),
"PATHLING_PRIVATE_KEY_JWK",
env,
)
from pathling.cli.errors import EXIT_USAGE, CliError
# With no auth input at all, the export runs unauthenticated, which is valid.
if not any(
[
resolved_client_id,
resolved_token_endpoint,
resolved_scope,
resolved_secret,
resolved_jwk,
]
):
return None
# Any other auth input without a client ID is an error rather than a silent
# fall-through to an unauthenticated export (FR-004).
if not resolved_client_id:
raise CliError(
"Bulk export authentication requires a client ID. Add --client-id, "
"or set client-id in the [bulk-auth] config table.",
exit_code=EXIT_USAGE,
)
if not resolved_token_endpoint:
raise CliError(
"Bulk export authentication requires a token endpoint. "
"Add --token-endpoint, or set it in the [bulk-auth] config table.",
exit_code=EXIT_USAGE,
)
if resolved_secret and resolved_jwk:
raise CliError(
"Provide exactly one of --client-secret or --private-key-jwk, not both.",
exit_code=EXIT_USAGE,
)
if not resolved_secret and not resolved_jwk:
raise CliError(
"Bulk export authentication requires a credential. Provide one of "
"--client-secret or --private-key-jwk.",
exit_code=EXIT_USAGE,
)
return BulkAuth(
client_id=resolved_client_id,
token_endpoint=resolved_token_endpoint,
private_key_jwk=resolved_jwk,
client_secret=resolved_secret,
scope=resolved_scope,
)
[docs]def resolve_config_source(
config_path: Optional[Path],
cwd: Path,
) -> tuple[Optional[Path], str]:
"""Selects the single config file to load and reports where it came from.
Exactly one file is chosen, by the precedence explicit ``--config`` >
project-local ``pathling.toml`` in ``cwd`` > user-level config file > none.
Values are never merged across files.
:param config_path: an explicit ``--config`` path, or None.
:param cwd: the directory searched for a project-local ``pathling.toml``.
:return: a tuple of the chosen path and an origin tag, one of
``"explicit"``, ``"project"``, ``"user"``, or ``"none"``. The
``"none"`` case returns the (non-existent) user-level path so the
existing "missing file yields defaults" behaviour is preserved.
"""
if config_path is not None:
return config_path, "explicit"
project_path = cwd / PROJECT_CONFIG_FILENAME
if project_path.exists():
return project_path, "project"
user_path = default_config_path()
if user_path.exists():
return user_path, "user"
return user_path, "none"
[docs]def resolve_config(
tx_server: Optional[str] = None,
tx_client_id: Optional[str] = None,
tx_client_secret: Optional[str] = None,
tx_token_endpoint: Optional[str] = None,
tx_scope: Optional[str] = None,
fhir_version: Optional[str] = None,
spark_conf_flags: Optional[dict] = None,
verbose: bool = False,
config_path: Optional[Path] = None,
cwd: Optional[Path] = None,
env: Optional[dict] = None,
on_warning: Optional[Callable[[str], None]] = None,
on_notice: Optional[Callable[[str], None]] = None,
) -> CliConfig:
"""Resolves global configuration from flags, the config file, and defaults.
A single config file is selected by the precedence explicit ``--config`` >
project-local ``pathling.toml`` (in ``cwd``) > user-level config file >
none, then its values flow through the precedence flag > config file >
built-in default. Files are never merged: keys absent from the chosen file
fall back to built-in defaults, never to another file.
:param tx_server: the ``--tx-server`` flag value, or None.
:param tx_client_id: the ``--tx-client-id`` flag value, or None.
:param tx_client_secret: the ``--tx-client-secret`` flag value, or None.
:param tx_token_endpoint: the ``--tx-token-endpoint`` flag value, or None.
:param tx_scope: the ``--tx-scope`` flag value, or None.
:param fhir_version: the ``--fhir-version`` flag value, or None.
:param spark_conf_flags: the parsed ``--spark-conf`` flag map, or None; flag
values override the ``[spark]`` table for the same key.
:param verbose: the ``--verbose`` flag value.
:param config_path: an explicit config file path, or None for discovery.
:param cwd: the directory searched for a project-local ``pathling.toml``;
defaults to the current working directory.
:param env: the environment mapping for secret resolution.
:param on_warning: an optional warning callback passed to the file loader and
used to surface the managed Spark-package version-override warning;
defaults to writing to stderr so warnings appear even in quiet mode.
:param on_notice: an optional callback invoked with a one-line notice when a
project-local ``pathling.toml`` is discovered and used.
:return: the resolved :class:`CliConfig`.
:raises CliError: if the resolved FHIR version is unsupported.
"""
# Exactly one file is loaded; there is no merge across config files, so any
# key absent from the chosen file falls back to a built-in default rather
# than to another file.
search_cwd = cwd if cwd is not None else Path.cwd()
path, origin = resolve_config_source(config_path, search_cwd)
# An explicit --config path must exist; failing fast avoids silently falling
# back to another config file's credentials (FR-002).
if origin == "explicit" and not path.exists():
from pathling.cli.errors import EXIT_USAGE, CliError
raise CliError(
f"Config file does not exist: {path}. Check the --config path.",
exit_code=EXIT_USAGE,
)
file_data = load_config_file(path, on_warning)
# Surface a project-local override so the active configuration is never a
# silent surprise (only when discovered, not for explicit or user-level
# files).
if origin == "project" and on_notice is not None:
user_path = default_config_path()
if user_path.exists():
on_notice(f"Using project config {path} (overrides {user_path}).")
else:
on_notice(f"Using project config {path}.")
resolved_tx_server = tx_server or file_data.get("tx-server") or DEFAULT_TX_SERVER
resolved_fhir_version = (
fhir_version or file_data.get("fhir-version") or DEFAULT_FHIR_VERSION
)
if resolved_fhir_version not in SUPPORTED_FHIR_VERSIONS:
from pathling.cli.errors import CliError
supported = ", ".join(SUPPORTED_FHIR_VERSIONS)
raise CliError(
f"Unsupported FHIR version '{resolved_fhir_version}'. "
f"Supported versions are: {supported}.",
exit_code=2,
)
tx_auth = _resolve_tx_auth(
file_data,
tx_client_id,
tx_client_secret,
tx_token_endpoint,
tx_scope,
env,
)
# Some terminology auth input was supplied but it is insufficient to
# authenticate (a client ID and a token endpoint are both required); tell the
# user rather than silently disabling it (FR-005).
if tx_auth is not None and not tx_auth.enabled:
notify = on_warning or (lambda message: print(message, file=sys.stderr))
notify(
"Terminology authentication is incomplete and will be disabled: a "
"client ID and a token endpoint are both required. Provide "
"--tx-client-id and --tx-token-endpoint."
)
# Resolve the [spark] table (combined with any --spark-conf flags, flag
# wins) into the effective Spark configuration, merged with Pathling's
# managed defaults. Any invalid key or value aborts here, before a Spark
# session is started; the managed-version-override warning is surfaced even
# in quiet mode.
from pathling.cli.sparkconf import merge_spark_conf, resolve_spark_conf
spark_warn = on_warning or (lambda message: print(message, file=sys.stderr))
spark_table = file_data.get("spark") or {}
resolved_spark = resolve_spark_conf(spark_table, spark_conf_flags, env)
spark_conf = merge_spark_conf(resolved_spark, on_warning=spark_warn)
return CliConfig(
tx_server=resolved_tx_server,
tx_auth=tx_auth,
fhir_version=resolved_fhir_version,
verbose=verbose,
config_path=path if path.exists() else None,
spark_conf=spark_conf,
bulk_auth_table=file_data.get("bulk-auth"),
)