""" Filesystem utilities for interactive with Trove directory structures """ import ctypes import ctypes.util import errno import os import platform import tempfile import logging from pathlib import Path logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # renameat2(2) ctypes binding — Linux only # --------------------------------------------------------------------------- FS_LOCAL_SPECIAL = ':' _renameat2 = None if platform.system() == "Linux": try: _libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True) _renameat2_func = _libc.renameat2 _renameat2_func.argtypes = [ ctypes.c_int, # olddirfd ctypes.c_char_p, # oldpath ctypes.c_int, # newdirfd ctypes.c_char_p, # newpath ctypes.c_uint, # flags ] _renameat2_func.restype = ctypes.c_int _renameat2 = _renameat2_func except (OSError, AttributeError): pass _AT_FDCWD = -100 _RENAME_EXCHANGE = 2 # errnos that mean "not supported here" rather than a real failure _NOT_SUPPORTED = frozenset((errno.EINVAL, errno.ENOSYS, errno.EOPNOTSUPP)) _HARDLINK_UNAVAIL = _NOT_SUPPORTED | frozenset((errno.EXDEV, errno.EPERM)) # --------------------------------------------------------------------------- # Low-level primitives # --------------------------------------------------------------------------- class _OpUnavailable(Exception): pass def _rename_exchange(old: str | Path, new: str | Path) -> None: """Atomically swap two filesystem entries. Both paths must exist. Raises RuntimeError if renameat2 is not available, OSError on kernel/fs failure. """ if _renameat2 is None: raise _OpUnavailable("renameat2 not available on this platform") ret = _renameat2( _AT_FDCWD, os.fsencode(str(old)), _AT_FDCWD, os.fsencode(str(new)), _RENAME_EXCHANGE, ) if ret != 0: err = ctypes.get_errno() if err in _NOT_SUPPORTED: raise _OpUnavailable(f"renameat2 exchange not supported on filesystem") raise OSError(err, os.strerror(err), str(old), None, str(new)) def _mklink_checked(src: Path, dest: Path) -> bool: # Step 1 → 2: hardlink try: os.link(str(src), str(dest)) return True except OSError as e: if e.errno in _HARDLINK_UNAVAIL: return False raise # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def get_content_index_name_from_path(path: Path) -> str: """Return the name of the index file for a directory.""" # TODO: improve handling and mimetype logic if not path.suffix: return 'index.dat' return f'index{path.suffix}' def get_content_path(path: str | Path) -> Path: """Return the path to the content file for a directory or file""" path = Path(path).resolve() if path.is_dir(): return path / get_content_index_name_from_path(path) return path def promote_path(path: str | Path) -> Path: """Promote a regular file to a directory, preserving content as an index file. Transforms ``/some/dir/notes.md`` into:: /some/dir/notes.md/ _index.md ← original content (suffix preserved) Atomicity depends on platform and filesystem capabilities, with automatic fallback through three tiers: 1. hardlink + renameat2(RENAME_EXCHANGE) — observers see either the old file or the new directory, never absence. Requires Linux >=3.15 and a supporting filesystem (ext4, xfs, tmpfs). A temporary entry briefly appears in the parent directory. 2. hardlink + unlink + rename — data is preserved via hard link so content is never at risk, but observers see a brief window where the original name is absent. 3. move + rename — works on any POSIX system. Same brief absence window. A temporary directory (``::trove_promote_tmp``) is created in the parent for the duration of the operation. On success it is removed (or, in tier 1, contains only the original file's hard link and is unlinked). If the final rename fails in tier 3, the temp directory is left in place for recovery later. Args: path: Regular file to promote. Must exist and be a regular file. Returns: Path to the index file inside the resulting directory (e.g., ``path / _index.md``). Raises: ValueError: *path* is not a regular file. OSError: Promotion failed. In tiers 1-2, the original file is restored. In tier 3, the temp directory may remain for manual or automated recovery. """ path = Path(path).resolve() if not path.is_file(): raise ValueError(f"not a regular file: {path}") # Create a temporary directory target_path_tmp = Path(tempfile.mkdtemp(dir=str(path.parent.absolute()), prefix=f"{path.name}{FS_LOCAL_SPECIAL}", suffix=f"{FS_LOCAL_SPECIAL}trove_promote_tmp")) target_name = get_content_index_name_from_path(path) target_path = path / target_name # Attempt to preserve original file during operation via hardlink # This is the 'mostly' safe method try: hardlink_created = _mklink_checked(path, target_path_tmp / target_name) except OSError: try: os.rmdir(target_path_tmp) except OSError: logger.warning("Failed to remove temporary file: %s", target_path_tmp) raise if hardlink_created: try: _rename_exchange(path, target_path_tmp) except _OpUnavailable: # Exchanging isn't supported, unlink and rename instead # This results in a 'blip' to observers of the filesystems os.unlink(path) os.rename(target_path_tmp, path) return target_path try: os.unlink(target_path_tmp) except OSError: # Failed to delete hard link to the original file. THis means the temporary # path still exists, but we can't do much so ignore. logger.error("Failed to remove temporary file: %s", target_path_tmp) return target_path # Hard linking isn't an option, try dual move # If the first step fails, the original file is still there and we attempt to # delete the temporary directory before passing along the error. try: os.rename(path, target_path_tmp / target_name) except OSError: try: os.rmdir(target_path_tmp) except OSError: logger.warning("Failed to remove temporary file: %s", target_path_tmp) raise # Move back to the original path now # If this fails, trove scan of the path will be able to tell what happened # and present user an option for recovery. Failure here indicates something # _very_ wrong - we created the new tmp path and removed the old path # without error. We exit intentionally to avoid further damage. os.rename(target_path_tmp, path) return target_path