Source code for pathling.cli.run

#
# Copyright © 2018-2026 Commonwealth Scientific and Industrial Research
# Organisation (CSIRO) ABN 41 687 119 230.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""The ``pathling run`` command.

Executes user-supplied Python code - from a script file, standard input, or an
inline ``-c`` option - with ``spark`` (the Spark session) and ``pathling`` (the
configured Pathling context) bound in the code's global scope, reproducing
Python interpreter script semantics (``sys.argv``, ``__main__``, ``__file__``,
``sys.path``, traceback fidelity, and ``SystemExit`` propagation).

Author: John Grimes.
"""

import os
import sys
import traceback
from dataclasses import dataclass
from typing import Optional, Tuple

import click

from pathling.cli import session
from pathling.cli.errors import EXIT_RUNTIME

# The context key under which the raw command arguments are recorded.
_RAW_ARGS_KEY = "pathling.run.raw_args"


[docs]@dataclass class CodeSource: """A resolved source of program text for execution. :param text: the program source code. :param filename: the filename to compile under, which appears in tracebacks and syntax errors (the script path, ``<stdin>``, or ``<string>``). :param argv0: the value for ``sys.argv[0]``, following Python interpreter conventions (the script path, ``-``, or ``-c``). :param path_entry: the entry to prepend to ``sys.path`` (the script's directory for files, ``""`` for stdin and inline code). :param file_attr: the value for ``__file__`` in the program's globals, or None to leave it unset (stdin and inline code). """ text: str filename: str argv0: str path_entry: str file_attr: Optional[str]
[docs]class RunCommand(click.Command): """A command that records its raw argument list before parsing. The raw arguments are needed to distinguish ``run script.py -c CODE`` (a usage error: two code sources) from ``run -c CODE a b`` (inline code with trailing arguments), which parse to the same option values. """
[docs] def parse_args(self, ctx, args): """Stores the raw arguments on the context, then parses as normal. :param ctx: the Click context. :param args: the raw argument list for this command. :return: the remaining arguments after parsing. """ ctx.meta[_RAW_ARGS_KEY] = list(args) return super().parse_args(ctx, args)
def _positional_precedes_code_flag(raw_args) -> bool: """Determines whether a positional argument appears before ``-c``. A positional (a script path or ``-``) before the inline-code flag means the user supplied two code sources, which is a usage error. A positional after the flag is simply an argument to the inline program. :param raw_args: the raw argument list as typed on the command line. :return: True when a script positional precedes the ``-c`` flag. """ for token in raw_args: if token == "-c" or token == "--code" or token.startswith("--code="): return False if token == "--": return False if token == "-" or not token.startswith("-"): return True return False def _resolve_source(ctx, script, code, args) -> Tuple[CodeSource, list]: """Validates the code-source rules and reads the program text. Exactly one source is required: a script path, ``-`` (stdin), or ``-c CODE``. All validation happens here, before the Spark session is started. :param ctx: the Click context. :param script: the script positional, or None. :param code: the ``-c`` option value, or None. :param args: the trailing arguments tuple. :return: the resolved :class:`CodeSource` and the program's argument list (``sys.argv[1:]``). :raises click.UsageError: when the code-source rules are violated or the script file cannot be read. """ if code is not None: if _positional_precedes_code_flag(ctx.meta.get(_RAW_ARGS_KEY, [])): raise click.UsageError( "Cannot use both a script and -c; supply exactly one code source." ) # Any positional that Click captured belongs to the program's argv. trailing = ([script] if script is not None else []) + list(args) return ( CodeSource( text=code, filename="<string>", argv0="-c", path_entry="", file_attr=None, ), trailing, ) if script is None: raise click.UsageError( "Supply a code source: a script path, '-' for stdin, or -c CODE." ) if script == "-": return ( CodeSource( text=sys.stdin.read(), filename="<stdin>", argv0="-", path_entry="", file_attr=None, ), list(args), ) try: with open(script, encoding="utf-8") as handle: text = handle.read() except OSError as exc: raise click.UsageError(f"Cannot read script '{script}': {exc}") from exc return ( CodeSource( text=text, filename=script, argv0=script, path_entry=os.path.dirname(os.path.abspath(script)), file_attr=script, ), list(args), ) def _execute(source: CodeSource, program_args, namespace) -> None: """Compiles and executes the program with interpreter semantics. ``sys.argv`` and ``sys.path`` are set for the duration of execution and restored afterwards. Uncaught exceptions print a standard traceback with the CLI's own frames removed and exit 1; ``SystemExit`` propagates untouched so its status becomes the process exit code. :param source: the resolved code source. :param program_args: the program's arguments (``sys.argv[1:]``). :param namespace: the extra globals to bind (``spark`` and ``pathling``). """ try: code_object = compile(source.text, source.filename, "exec") except SyntaxError as exc: # Pass no traceback: the interpreter prints only the error excerpt and # the SyntaxError line for a syntax error, with no traceback header or # frames. traceback.print_exception(type(exc), exc, None, file=sys.stderr) # Flush explicitly: the stream may be block-buffered (for example the # capture stream used by Click's test runner) and the process exits # immediately after. sys.stderr.flush() sys.exit(EXIT_RUNTIME) program_globals = {"__name__": "__main__", **namespace} if source.file_attr is not None: program_globals["__file__"] = source.file_attr saved_argv = sys.argv saved_path = list(sys.path) sys.argv = [source.argv0] + list(program_args) sys.path.insert(0, source.path_entry) try: exec(code_object, program_globals) except Exception as exc: # Drop the CLI's own frame (this function's exec call) so the # traceback starts at the user's code, exactly as the interpreter # would print it. tb = exc.__traceback__.tb_next if exc.__traceback__ else None traceback.print_exception(type(exc), exc, tb, file=sys.stderr) # See the flush comment in the syntax-error path above. sys.stderr.flush() sys.exit(EXIT_RUNTIME) finally: sys.argv = saved_argv sys.path[:] = saved_path @click.command( name="run", cls=RunCommand, context_settings={"ignore_unknown_options": True}, ) @click.argument("script", required=False) @click.option("-c", "--code", "code", help="Inline Python code to execute.") @click.argument("args", nargs=-1, type=click.UNPROCESSED) @click.pass_context def run(ctx, script, code, args): """Run Python code with the Pathling environment ready. Executes a script file (or '-' for standard input, or inline code via -c) with two variables already in scope: spark (the Spark session) and pathling (the configured Pathling context). Trailing arguments are passed to the code as sys.argv, following Python interpreter conventions. \b See the Pathling Python API reference: https://pathling.csiro.au/docs/python/pathling.html Example - project a tabular view of patients, then summarise it with SQL. Save this as summary.py and run "pathling run summary.py": \b patients = pathling.read.ndjson("data").view( "Patient", select=[{"column": [{"path": "gender", "name": "gender"}]}], ) patients.createOrReplaceTempView("patient") spark.sql("SELECT gender, count(*) AS count " "FROM patient GROUP BY gender").show() """ source, program_args = _resolve_source(ctx, script, code, args) obj = ctx.obj pc = session.create_context(obj.config, obj.console) namespace = {"spark": pc.spark, "pathling": pc} _execute(source, program_args, namespace)