Source code for pathling.spark

#  Copyright © 2018-2025 Commonwealth Scientific and Industrial Research
#  Organisation (CSIRO) ABN 41 687 119 230.
# 
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.


import uuid

from py4j.java_gateway import JavaObject, JVMView
from pyspark import SparkContext
from pyspark.sql import SparkSession


[docs]class Dfs: """A class for interacting with the Hadoop Distributed File System (HDFS) in Spark.""" def __init__(self, spark: SparkSession): """ Initialize the Dfs class with a SparkSession. :param spark: SparkSession instance """ if not spark: raise ValueError("SparkSession must be provided") sc: SparkContext = spark.sparkContext self._jvm: JVMView = sc._jvm self._hadoop_conf: JavaObject = sc._jsc.hadoopConfiguration() self._fs = self._jvm.org.apache.hadoop.fs.FileSystem.get(self._hadoop_conf)
[docs] def get_temp_dir_path(self, prefix: str = "tmp-app", qualified=True) -> str: """ Returns a unique path for a temporary directory in Spark's filesystem. The path is constructed by appending a UUID to the base temporary directory, ensuring uniqueness for each call. The directory itself is not created, only the path is returned. :param prefix: String to insert between the base directory and the UUID (default: "tmp-app"). :param qualified: If True, returns a fully qualified Hadoop path; if False, returns a raw path string. :return: String representing the unique temporary directory path. """ base_tmp_dir = self._hadoop_conf.get("hadoop.tmp.dir") if not base_tmp_dir: raise ValueError("`hadoop.tmp.dir` must be set in Hadoop configuration.") uuid_suffix = str(uuid.uuid4()) base_tmp_path = self._jvm.org.apache.hadoop.fs.Path(base_tmp_dir) tmp_path = self._jvm.org.apache.hadoop.fs.Path(base_tmp_path, f"{prefix}-{uuid_suffix}") return self._fs.makeQualified(tmp_path).toString() if qualified else tmp_path.toString()
[docs] def exists(self, path: str) -> bool: """ Check if a given path exists in the filesystem. :param path: Path to check for existence. :return: True if the path exists, False otherwise. """ hadoop_path = self._jvm.org.apache.hadoop.fs.Path(path) return self._fs.exists(hadoop_path)
[docs] def delete(self, path: str, recursive: bool = False) -> bool: """ Delete a file or directory at the specified path. :param path: Path to the file or directory to delete. :param recursive: If True, delete directories and their contents recursively. :return: True if deletion was successful, False otherwise. """ hadoop_path = self._jvm.org.apache.hadoop.fs.Path(path) return self._fs.delete(hadoop_path, recursive)
[docs] def mkdirs(self, path: str) -> bool: """ Create a directory at the specified path. :param path: Path to the directory to create. :return: True if the directory was created successfully, False otherwise. """ hadoop_path = self._jvm.org.apache.hadoop.fs.Path(path) return self._fs.mkdirs(hadoop_path)