trove/trovedb/fs.py

234 lines
7.9 KiB
Python

import os
import datetime as dt
from pathlib import Path
from typing import Optional, Iterable, Iterator, override
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound, ObjectId
from . import trove as tr
class FSNote(Note):
def __init__(self, trove: 'FSTrove', path: Path):
self._trove: FSTrove = trove
self._fs_path: Path = path.resolve()
if not self._fs_path.is_relative_to(trove.root):
raise ValueError("Path must be relative to the root directory")
self._object_id: str = path.relative_to(trove.root).as_posix()
@property
def object_id(self) -> tr.ObjectId:
return self._object_id
@property
def fs_path(self) -> Path:
return self._fs_path
@property
def mtime(self):
"""Return modification time as datetime."""
stat = self._fs_path.stat()
return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc)
@property
def readonly(self) -> bool:
"""Check if the note is readonly based on file permissions."""
return not os.access(self._fs_path, os.W_OK)
@property
def mime(self) -> str:
"""Return MIME type, defaulting to generic binary stream."""
if self._fs_path.is_dir():
return "inode/directory"
return "application/octet-stream"
def get_raw_metadata(self, key: str) -> Optional[bytes]:
# TODO: FIXME
return None
def set_raw_metadata(self, key: str, value: bytes) -> None:
# TODO: FIXME
pass
def read_content(self) -> bytes:
"""Read the raw content of the note."""
if self._fs_path.is_file():
return self._fs_path.read_bytes()
return b""
def write_content(self, data:bytes) -> None:
"""Write the raw content of the note."""
self._fs_path.write_bytes(data)
def _new_child_subdir(self, name: str, exist_ok: bool = True) -> Path:
ex_path = self._fs_path / name
try:
ex_path.mkdir(exist_ok=exist_ok)
return ex_path
except FileExistsError:
raise tr.ErrorExists(str(ex_path)) from None
except OSError as e:
raise tr.ErrorWithErrno(e.errno, str(e)) from None
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note:
"""Create a new child note."""
if content is None:
content = b""
if mime == 'inode/directory':
if content:
raise NotImplementedError("FSNote does not support children")
return FSTreeNote(self._trove, self._new_child_subdir(name, False))
ex_path = self._fs_path / name
ex_path.write_bytes(content)
return FSNote(self._trove, ex_path)
def child(self, name: str) -> Note:
"""Retrieve a child not by name."""
target_path = self._fs_path / name
return self._trove.get_raw_note_by_path(target_path)
def rm_child(self, name: str, recurse: bool):
target_path = self._fs_path / name
if not target_path.exists():
raise tr.ErrorNotFound(name)
if target_path.is_dir():
if recurse:
raise NotImplementedError("Recursive deletion not supported")
else:
target_path.rmdir()
else:
target_path.unlink()
# TODO: remove meta directory!
def unlink_(self, name: str):
target_path = self._fs_path / name
if not target_path.exists():
return
if target_path.is_dir():
target_path.rmdir()
else:
target_path.unlink()
def link_(self, name: str, note: Note):
if not isinstance(note, FSNote):
raise BadNoteType("Only blob notes can be linked")
target_path = self._fs_path / name
if target_path.exists():
self.unlink_(name)
note_path = note._fs_path
# If the note is in .working, move it to the new location.
if self._trove.working in note_path.parents:
os.rename(note_path, target_path)
else:
# If it's already linked somewhere, create a hard link if it's a file.
if note_path.is_file():
try:
os.link(note_path, target_path)
except OSError:
# Fallback to rename if link fails (e.g. cross-device, though we assume single FS)
os.rename(note_path, target_path)
else:
# Directories cannot be hardlinked.
# We move it to the new location.
os.rename(note_path, target_path)
def children(self) -> Iterator[TreeEntry]:
"""Get all children of this note."""
if not self._fs_path.is_dir():
return
for item in self._fs_path.iterdir():
if item.name == ".trove":
continue
yield TreeEntry(name=item.name, object_id=item.stat().st_ino)
class FSTreeNote(FSNote, TreeNote):
def mkdir(self, name: str) -> 'FSTreeNote':
target_path = self._fs_path / name
target_path.mkdir(exist_ok=True)
return FSTreeNote(self._trove, path=target_path)
def entries(self) -> Iterable[TreeEntry]:
try:
for item in self._fs_path.iterdir():
if item.name == ".trove":
continue
yield TreeEntry(name=item.name, object_id=str(item))
except OSError:
pass
def unlink(self, name: str):
return self.unlink_(name)
def link(self, name: str, note: Note):
return self.link_(name, note)
class FSTrove(Trove):
def __init__(self, root: str | Path):
self.root = Path(root).absolute()
self._root_inode = self.root.stat().st_ino
self.dot_trove = self.root / ".trove"
self.working = self.dot_trove / ".working"
self.dot_trove.mkdir(exist_ok=True)
self.working.mkdir(exist_ok=True)
@classmethod
def open(cls, path: str | Path, create: bool = False) -> 'FSTrove':
p = Path(path)
if not p.exists():
if not create:
raise FileNotFoundError(f"Root path not found: {p}")
p.mkdir(parents=True)
return cls(p)
def get_raw_note_by_path(self, path: Path) -> Note:
if not path.exists():
raise tr.ErrorNotFound(str(path))
if path.is_dir():
return FSTreeNote(self, path=path)
return FSNote(self, path=path)
def get_raw_note(self, note_id: ObjectId) -> Note:
p = self.root / str(note_id)
if not p.exists():
raise NoteNotFound(note_id)
return self.get_raw_note_by_path(p)
@override
def move(self, src_parent: Note, src_name: str, dst_parent: Note, dst_name: str, overwrite: bool):
"""Move a child note to a new location."""
src_note = src_parent.child(src_name)
if not isinstance(src_note, FSNote):
raise tr.ErrorBadType("not a valid DB note")
if not isinstance(dst_parent, FSNote):
raise tr.ErrorBadType("not a valid DB note")
if not dst_parent._fs_path.is_dir():
raise NotImplementedError("FSTrove promoting during move")
# Remove existing target
if overwrite:
dst_parent.unlink_(dst_name)
# Link to new parent, unlink from old
dst_parent.link_(dst_name, src_note)
src_parent.rm_child(src_name, True)
def create_blob(self, data: bytes | None = None) -> Note:
raise NotImplementedError("FSTrove does not support blobs")
def get_root(self) -> TreeNote:
return FSTreeNote(self, path=self.root)
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
raise NotImplementedError("FSTrove does not support metadata")
def _set_metadata(self, inode: int, key: str, value: bytes):
raise NotImplementedError("FSTrove does not support metadata")
def close(self):
pass