import os import sqlite3 import tempfile import datetime as dt from pathlib import Path, PurePosixPath from typing import Optional, Dict, List, Self, Iterable, Iterator from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound, ObjectId from . import fs_util as fsu from . import trove as tr class FSNote(Note): def __init__(self, trove: 'FSTrove', path: Path): self._trove: FSTrove = trove self._fs_path: Path = path.resolve() if not self._fs_path.is_relative_to(trove.root): raise ValueError("Path must be relative to the root directory") self._object_id: str = path.relative_to(trove.root).as_posix() @property def object_id(self) -> tr.ObjectId: return self._object_id @property def fs_path(self) -> Path: return self._fs_path @property def mtime(self): """Return modification time as datetime.""" stat = self._fs_path.stat() return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc) @property def readonly(self) -> bool: """Check if the note is readonly based on file permissions.""" return not os.access(self._fs_path, os.W_OK) @property def mime(self) -> str: """Return MIME type, defaulting to generic binary stream.""" if self._fs_path.is_dir(): return "inode/directory" return "application/octet-stream" def get_raw_metadata(self, key: str) -> Optional[bytes]: # TODO: FIXME return None def set_raw_metadata(self, key: str, value: bytes) -> None: # TODO: FIXME pass def read_content(self) -> bytes: """Read the raw content of the note.""" if self._fs_path.is_file(): return self._fs_path.read_bytes() return b"" def write_content(self, data:bytes) -> None: """Write the raw content of the note.""" self._fs_path.write_bytes(data) def _new_child_subdir(self, name: str, exist_ok: bool = True) -> Path: ex_path = self._fs_path / name try: ex_path.mkdir(exist_ok=exist_ok) return ex_path except FileExistsError: raise tr.ErrorExists(str(ex_path)) from None except OSError as e: raise tr.ErrorWithErrno(e.errno, str(e)) from None def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note: """Create a new child note.""" if content is None: content = b"" if mime == 'inode/directory': if content is not None: raise NotImplementedError("FSNote does not support children") return FSTreeNote(self._trove, self._new_child_subdir(name, False)) ex_path = self._fs_path / name ex_path.write_bytes(content) return FSNote(self._trove, ex_path) def child(self, name: str) -> Note: """Retrieve a child not by name.""" target_path = self._fs_path / name return self._trove.get_raw_note_by_path(target_path) def rm_child(self, name: str, recurse: bool): target_path = self._fs_path / name if not target_path.exists(): raise tr.ErrorNotFound(name) if target_path.is_dir(): if recurse: raise NotImplementedError("Recursive deletion not supported") else: target_path.rmdir() else: target_path.unlink() # TODO: remove meta directory! def children(self) -> Iterator[TreeEntry]: """Get all children of this note.""" if not self._fs_path.is_dir(): return for item in self._fs_path.iterdir(): if item.name == ".trove": continue yield TreeEntry(name=item.name, object_id=item.stat().st_ino) class FSTreeNote(FSNote, TreeNote): def link(self, name: str, note: Note): if not isinstance(note, FSNote): raise BadNoteType("Only blob notes can be linked") target_path = self._fs_path / name if target_path.exists(): self.unlink(name) note_path = note._fs_path # If the note is in .working, move it to the new location. if self._trove.working in note_path.parents: os.rename(note_path, target_path) else: # If it's already linked somewhere, create a hard link if it's a file. if note_path.is_file(): try: os.link(note_path, target_path) except OSError: # Fallback to rename if link fails (e.g. cross-device, though we assume single FS) os.rename(note_path, target_path) else: # Directories cannot be hardlinked. # We move it to the new location. os.rename(note_path, target_path) def unlink(self, name: str): target_path = self._fs_path / name if not target_path.exists(): return if target_path.is_dir(): target_path.rmdir() else: target_path.unlink() def mkdir(self, name: str) -> 'FSTreeNote': target_path = self._fs_path / name target_path.mkdir(exist_ok=True) return FSTreeNote(self._trove, path=target_path) def entries(self) -> Iterable[TreeEntry]: try: for item in self._fs_path.iterdir(): if item.name == ".trove": continue yield TreeEntry(name=item.name, object_id=str(item)) except OSError: pass class FSTrove(Trove): def __init__(self, root: str | Path): self.root = Path(root).absolute() self._root_inode = self.root.stat().st_ino self.dot_trove = self.root / ".trove" self.working = self.dot_trove / ".working" self.dot_trove.mkdir(exist_ok=True) self.working.mkdir(exist_ok=True) @classmethod def open(cls, path: str | Path, create: bool = False) -> 'FSTrove': p = Path(path) if not p.exists(): if not create: raise FileNotFoundError(f"Root path not found: {p}") p.mkdir(parents=True) return cls(p) def get_raw_note_by_path(self, path: Path) -> Note: if not path.exists(): raise tr.ErrorNotFound(str(path)) if path.is_dir(): return FSTreeNote(self, path=path) return FSNote(self, path=path) def get_raw_note(self, note_id: ObjectId) -> Note: p = self.root / str(note_id) if not p.exists(): raise NoteNotFound(note_id) return self.get_raw_note_by_path(p) def create_blob(self, data: bytes | None = None) -> Note: raise NotImplementedError("FSTrove does not support blobs") def get_root(self) -> TreeNote: return FSTreeNote(self, path=self.root) def _get_metadata(self, inode: int, key: str) -> Optional[bytes]: raise NotImplementedError("FSTrove does not support metadata") def _set_metadata(self, inode: int, key: str, value: bytes): raise NotImplementedError("FSTrove does not support metadata") def close(self): pass