Source code for pygeodes.utils.io

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""This module regroups all functions related to file INPUT/OUTPUT"""
# -----------------------------------------------------------------------------
# Copyright (c) 2024, CNES
#
# REFERENCES:
# https://cnes.fr/
# -----------------------------------------------------------------------------

# stdlib imports -------------------------------------------------------
import json
import os
import hashlib
from time import perf_counter
from difflib import SequenceMatcher as SM
from typing import List
import re
from pathlib import Path
from json import JSONEncoder

# third-party imports -----------------------------------------------

# local imports ---------------------------------------------------
from pygeodes.utils.logger import logger
from pygeodes.utils.consts import (
    MAX_TIME_BEFORE_ABORTING_FOLDER_CHECKSUM,
    FOLDER_CONSIDERED_BIG_SIZE,
)

DEFAULT_FILESEARCH_TIMEOUT = 2


# this piece of code is used to make stac items json serializable
[docs]def wrapped_default(self, obj): return getattr(obj.__class__, "__json__", wrapped_default.default)(obj)
wrapped_default.default = JSONEncoder().default JSONEncoder.original_default = JSONEncoder.default JSONEncoder.default = wrapped_default
[docs]def file_exists(filepath: str, raise_exception: bool = True) -> bool: """This checks if the file exists, and, in function of a boolean parameter, raises an exception if it doesn't Parameters ---------- filepath : str the filepath to check the existence of raise_exception : bool, optional whether to raise an exception, by default True Returns ------- bool whether the file exists Raises ------ FileNotFoundError error raised if the file doesn't exist See Also -------- similar_filenames : to find the most similar filenames to a filename Examples -------- .. code-block:: python name = "file.txt" exists = file_exists(name) """ filepath = os.path.abspath(filepath) exists = os.path.exists(filepath) if exists: return True else: if raise_exception: raise FileNotFoundError(f"The file {filepath} doesn't exist") else: return False
[docs]def similar_filenames( filename: str, other_filenames: List[str], nb: int = 10 ) -> List[str]: """This function returns the ``nb`` most resembling filenames to the filename provided in the list of filenames Args: `filename` (``str``): the filename `other_filenames` (``List[str]``): the other filenames to be compared to `nb` (``int``, optional): the number of filenames to keep. Defaults to 10. Returns: ``List[str]``: the most resembling filenames """ similarities = sorted( [ (SM(None, filename, other_filename).ratio(), other_filename) for other_filename in other_filenames ], key=lambda tp: tp[0], reverse=True, ) return [tp[1] for tp in similarities][ :nb ] # on ne retourne que les nb premiers noms de fichiers
[docs]def find_unused_filename(filepath: str) -> str: """This functions finds an unused filename for the filepath provided by adding -{number} after Parameters ---------- filepath : str the original filename Returns ------- str an unused filename """ if file_exists(filepath, raise_exception=False): root, ext = os.path.splitext(filepath) i = 1 filepath = f"{root}-{i}{ext}" while file_exists(filepath, raise_exception=False): i += 1 filepath = f"{root}-{i}{ext}" return filepath else: return filepath
[docs]def write_json(content: dict, filepath: str) -> None: """This functions dumps a dict at a filepath Parameters ---------- content : dict the dict to dump filepath : str the filepath to dump to See also -------- load_json : to read a JSON from a file """ with open(filepath, "w") as file: json.dump(content, file, indent=4)
[docs]def load_json(filepath: str) -> dict: """This functions loads a JSON into a Python dict from a filepath Parameters ---------- filepath : str the file to load the dict from Returns ------- dict the dict loaded Raises ------ Exception an Exception if the JSON is not a valid JSON See also -------- write_json : to dump a Python dict into a JSON file """ try: with open(filepath, "r") as file: content = json.load(file) return content except json.decoder.JSONDecodeError: raise Exception(f"The JSON file {filepath} is not a valid JSON")
[docs]def get_homedir() -> Path: """To get the home directory path of the current user Returns ------- Path the homedir """ # maybe only works on unix systems ? return Path(os.path.expanduser("~"))
[docs]def compute_md5(filepath: str) -> str: """Computes md5sum of the contents of the given filepath""" logger.debug(f"checking md5 for {filepath}") begin_time = perf_counter() hash_md5 = hashlib.md5() with open(filepath, "rb") as file: for chunk in iter(lambda: file.read(4096), b""): hash_md5.update(chunk) end_time = perf_counter() logger.debug(f"checked md5 checksum in {end_time - begin_time} seconds") return hash_md5.hexdigest()
[docs]def check_if_folder_already_contains_file(filepath: str, file_checksum: str): """The goal of this function is to check if, in the folder of the filepath provided, it already exists a file with the same checksum as the one provided, with in aim not to download several times the same file. As it's not something required but more of an help for the user, we assume that if the operation takes more than 5 seconds, we abort it.""" begin_time = perf_counter() folder = os.path.dirname(filepath) filename = os.path.basename(filepath) listdir = [ name for name in os.listdir(folder) if not os.path.isdir(os.path.join(folder, name)) ] # que les fichiers logger.debug( f"checking if folder {folder} ({len(listdir)} files) already contains file {filepath}" ) if ( len(listdir) > FOLDER_CONSIDERED_BIG_SIZE ): # too big folder to test all checksums sim_filenames = similar_filenames( filename, listdir ) # que les noms de fichiers ressemblant, qui ont plus de chance d'avoir le même contenu logger.debug( f"{folder} folder is too big, checking only files : {sim_filenames}" ) listdir = sim_filenames for filename in listdir: filepath = os.path.join(folder, filename) current_time = perf_counter() time_since_begin = current_time - begin_time if time_since_begin > MAX_TIME_BEFORE_ABORTING_FOLDER_CHECKSUM: return None try: md5 = compute_md5(filepath) if ( md5 == file_checksum ): # if the current file has the same checksum as the target file return filepath except PermissionError: pass return None
[docs]def filename_in_folder(name: str, folder_path: str) -> bool: """This functions checks if a filename is in a folder Parameters ---------- name : str the filename folder_path : str the folder Returns ------- bool wether the filename exists in the folder """ filename = os.path.join(folder_path, name) return file_exists(filename, False)
[docs]def filenames_respecting_regex(filenames: List[str], regex: str) -> List[str]: """This functions find all the filenames in a list of filenames matching a given regex Parameters ---------- filenames : List[str] the list of filenames regex : str the regex Returns ------- List[str] the filenames matching the regex """ pattern = re.compile(regex) return [ filename for filename in filenames if re.fullmatch(pattern, filename) ]