trove/trovedb/fs_util.py

201 lines
7.1 KiB
Python
Raw Permalink Normal View History

2026-03-26 00:33:29 -05:00
"""
Filesystem utilities for interactive with Trove directory structures
"""
import ctypes
import ctypes.util
import errno
import os
import platform
import tempfile
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# renameat2(2) ctypes binding — Linux only
# ---------------------------------------------------------------------------
FS_LOCAL_SPECIAL = ':'
_renameat2 = None
if platform.system() == "Linux":
try:
_libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
_renameat2_func = _libc.renameat2
_renameat2_func.argtypes = [
ctypes.c_int, # olddirfd
ctypes.c_char_p, # oldpath
ctypes.c_int, # newdirfd
ctypes.c_char_p, # newpath
ctypes.c_uint, # flags
]
_renameat2_func.restype = ctypes.c_int
_renameat2 = _renameat2_func
except (OSError, AttributeError):
pass
_AT_FDCWD = -100
_RENAME_EXCHANGE = 2
# errnos that mean "not supported here" rather than a real failure
_NOT_SUPPORTED = frozenset((errno.EINVAL, errno.ENOSYS, errno.EOPNOTSUPP))
_HARDLINK_UNAVAIL = _NOT_SUPPORTED | frozenset((errno.EXDEV, errno.EPERM))
# ---------------------------------------------------------------------------
# Low-level primitives
# ---------------------------------------------------------------------------
class _OpUnavailable(Exception):
pass
def _rename_exchange(old: str | Path, new: str | Path) -> None:
"""Atomically swap two filesystem entries.
Both paths must exist. Raises RuntimeError if renameat2 is not
available, OSError on kernel/fs failure.
"""
if _renameat2 is None:
raise _OpUnavailable("renameat2 not available on this platform")
ret = _renameat2(
_AT_FDCWD, os.fsencode(str(old)),
_AT_FDCWD, os.fsencode(str(new)),
_RENAME_EXCHANGE,
)
if ret != 0:
err = ctypes.get_errno()
if err in _NOT_SUPPORTED:
raise _OpUnavailable(f"renameat2 exchange not supported on filesystem")
raise OSError(err, os.strerror(err), str(old), None, str(new))
def _mklink_checked(src: Path, dest: Path) -> bool:
# Step 1 → 2: hardlink
try:
os.link(str(src), str(dest))
return True
except OSError as e:
if e.errno in _HARDLINK_UNAVAIL:
return False
raise
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def get_content_index_name_from_path(path: Path) -> str:
"""Return the name of the index file for a directory."""
# TODO: improve handling and mimetype logic
if not path.suffix:
return 'index.dat'
return f'index{path.suffix}'
2026-03-26 00:33:29 -05:00
def get_content_path(path: str | Path) -> Path:
"""Return the path to the content file for a directory or file"""
path = Path(path).resolve()
if path.is_dir():
return path / get_content_index_name_from_path(path)
return path
def promote_path(path: str | Path) -> Path:
"""Promote a regular file to a directory, preserving content as an index file.
Transforms ``/some/dir/notes.md`` into::
/some/dir/notes.md/
_index.md original content (suffix preserved)
Atomicity depends on platform and filesystem capabilities, with
automatic fallback through three tiers:
1. hardlink + renameat2(RENAME_EXCHANGE) observers see either
the old file or the new directory, never absence. Requires
Linux >=3.15 and a supporting filesystem (ext4, xfs, tmpfs).
A temporary entry briefly appears in the parent directory.
2. hardlink + unlink + rename data is preserved via hard link
so content is never at risk, but observers see a brief window
where the original name is absent.
3. move + rename works on any POSIX system. Same brief
absence window.
A temporary directory (``<name>:<random>:trove_promote_tmp``) is
created in the parent for the duration of the operation. On success
it is removed (or, in tier 1, contains only the original file's
hard link and is unlinked). If the final rename fails in tier 3,
the temp directory is left in place for recovery later.
Args:
path: Regular file to promote. Must exist and be a regular file.
Returns:
Path to the index file inside the resulting directory
(e.g., ``path / _index.md``).
Raises:
ValueError: *path* is not a regular file.
OSError: Promotion failed. In tiers 1-2, the original file is
restored. In tier 3, the temp directory may remain for
manual or automated recovery.
"""
path = Path(path).resolve()
if not path.is_file():
raise ValueError(f"not a regular file: {path}")
# Create a temporary directory
target_path_tmp = Path(tempfile.mkdtemp(dir=str(path.parent.absolute()), prefix=f"{path.name}{FS_LOCAL_SPECIAL}", suffix=f"{FS_LOCAL_SPECIAL}trove_promote_tmp"))
target_name = get_content_index_name_from_path(path)
target_path = path / target_name
# Attempt to preserve original file during operation via hardlink
# This is the 'mostly' safe method
try:
hardlink_created = _mklink_checked(path, target_path_tmp / target_name)
except OSError:
try:
os.rmdir(target_path_tmp)
except OSError:
logger.warning("Failed to remove temporary file: %s", target_path_tmp)
raise
if hardlink_created:
try:
_rename_exchange(path, target_path_tmp)
except _OpUnavailable:
# Exchanging isn't supported, unlink and rename instead
# This results in a 'blip' to observers of the filesystems
os.unlink(path)
os.rename(target_path_tmp, path)
return target_path
try:
os.unlink(target_path_tmp)
except OSError:
# Failed to delete hard link to the original file. THis means the temporary
# path still exists, but we can't do much so ignore.
logger.error("Failed to remove temporary file: %s", target_path_tmp)
return target_path
# Hard linking isn't an option, try dual move
# If the first step fails, the original file is still there and we attempt to
# delete the temporary directory before passing along the error.
try:
os.rename(path, target_path_tmp / target_name)
except OSError:
try:
os.rmdir(target_path_tmp)
except OSError:
logger.warning("Failed to remove temporary file: %s", target_path_tmp)
raise
# Move back to the original path now
# If this fails, trove scan of the path will be able to tell what happened
# and present user an option for recovery. Failure here indicates something
# _very_ wrong - we created the new tmp path and removed the old path
# without error. We exit intentionally to avoid further damage.
os.rename(target_path_tmp, path)
return target_path