import os import sqlite3 import tempfile import datetime as dt from pathlib import Path from typing import Optional, Dict, List, Self, Iterable from .trove import Note, Trove, TreeNote, BlobNote, Blob, Tree, BadNoteType, TreeEntry, NoteNotFound class FSNote(Note): def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None): self._trove: FSTrove = trove self._fs_path: Path | None = path self._inode: int | None = inode if self._fs_path is not None: inode = self._fs_path.stat().st_ino if self._inode != inode and self._inode is not None: raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}") self._inode = inode @property def object_id(self) -> int: if self._inode is None: raise ValueError("Note not yet saved to disk") return self._inode @property def mtime(self): """Return modification time as datetime.""" stat = self._path.stat() return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc) @property def readonly(self) -> bool: """Check if the note is readonly based on file permissions.""" if self._inode is None: return False return not os.access(self._path, os.W_OK) @property def mime(self) -> str: """Return MIME type, defaulting to generic binary stream.""" return "application/octet-stream" @property def _path(self) -> Path: if self._fs_path is not None: if self._fs_path.exists(): return self._fs_path self._fs_path = None if self._inode is None: raise ValueError("Note not yet saved to disk") self._fs_path = self._trove.get_path_by_inode(self._inode) assert self._fs_path is not None return self._fs_path def get_raw_metadata(self, key: str) -> Optional[bytes]: return self._trove._get_metadata(self._inode, key) def set_raw_metadata(self, key: str, value: bytes) -> None: self._trove._set_metadata(self._inode, key, value) class FSBlobNote(FSNote, BlobNote): def read(self) -> bytes: if self._inode is None: return b"" return self._path.read_bytes() def write(self, data: bytes) -> None: self._path.write_bytes(data) # Update cache just in case inode changed (some editors do this) try: new_inode = self._path.stat().st_ino if new_inode != self._inode: self._trove._update_cache(new_inode, self._path) self._inode = new_inode except OSError: pass class FSTreeNote(FSNote, TreeNote): @property def mime(self) -> str: """Return MIME type for directory/tree nodes.""" return "inode/directory" def link(self, name: str, note: Note): if not isinstance(note, FSBlobNote): raise BadNoteType("Only blob notes can be linked") target_path = self._path / name if target_path.exists(): self.unlink(name) note_path = note._path # If the note is in .working, move it to the new location. if self._trove.working in note_path.parents: os.rename(note_path, target_path) self._trove._update_cache(note.object_id, target_path) else: # If it's already linked somewhere, create a hard link if it's a file. if note_path.is_file(): try: os.link(note_path, target_path) except OSError: # Fallback to rename if link fails (e.g. cross-device, though we assume single FS) os.rename(note_path, target_path) else: # Directories cannot be hardlinked. # We move it to the new location. os.rename(note_path, target_path) self._trove._update_cache(note.object_id, target_path) def unlink(self, name: str): target_path = self._path / name if not target_path.exists(): return if target_path.is_dir(): target_path.rmdir() else: target_path.unlink() def mkdir(self, name: str) -> 'FSTreeNote': target_path = self._path / name target_path.mkdir(exist_ok=True) inode = target_path.stat().st_ino self._trove._update_cache(inode, target_path) return FSTreeNote(self._trove, inode=inode, path=target_path) def entries(self) -> Iterable[TreeEntry]: try: for item in self._path.iterdir(): if item.name == ".trove": continue inode = item.stat().st_ino self._trove._update_cache(inode, item) yield TreeEntry(name=item.name, object_id=inode) except OSError: pass def list(self) -> dict[str, int]: res = {} try: for item in self._path.iterdir(): if item.name == ".trove": continue res[item.name] = item.stat().st_ino self._trove._update_cache(res[item.name], item) except OSError: pass return res def child(self, name: str) -> Note: """Retrieve a child not by name.""" target_path = self._path / name return self._trove.get_raw_note_by_path(target_path) class FSTrove(Trove): def __init__(self, root: str | Path): self.root = Path(root).absolute() self._root_inode = self.root.stat().st_ino self.dot_trove = self.root / ".trove" self.working = self.dot_trove / ".working" self.dot_trove.mkdir(exist_ok=True) self.working.mkdir(exist_ok=True) db_path = self.dot_trove / "trovecache.db" self.con = sqlite3.connect(str(db_path)) self._init_db() @classmethod def open(cls, path: str | Path, create: bool = False) -> 'FSTrove': p = Path(path) if not p.exists(): if not create: raise FileNotFoundError(f"Root path not found: {p}") p.mkdir(parents=True) return cls(p) def _init_db(self): self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)") self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))") self.con.commit() def _update_cache(self, inode: int, path: Path): try: rel_path = path.relative_to(self.root) path_str = str(rel_path) if path_str == ".": path_str = "" except ValueError: # Path not under root, maybe it's the root itself? if path == self.root: path_str = "" else: return # Not under root, don't cache self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str)) self.con.commit() def get_path_by_inode(self, inode: int) -> Optional[Path]: if inode == self._root_inode: return self.root row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone() if row: p = self.root / row[0] try: if p.exists() and p.stat().st_ino == inode: return p except OSError: pass # search for root_dir, dirs, files in os.walk(self.root): # Skip .trove if ".trove" in dirs: dirs.remove(".trove") for name in dirs + files: p = Path(root_dir) / name try: st = p.stat() if st.st_ino == inode: self._update_cache(inode, p) return p except OSError: continue return None def get_raw_note_by_path(self, target_path: Path) -> Note: if not target_path.exists(): raise NoteNotFound(target_path.relative_to(self.root)) note_id = target_path.stat().st_ino if target_path.is_dir(): return FSTreeNote(self, inode=note_id, path=target_path) else: return FSBlobNote(self, inode=note_id, path=target_path) def get_raw_note(self, note_id: int) -> Note: p = self.get_path_by_inode(note_id) if not p: raise NoteNotFound(note_id) return self.get_raw_note_by_path(p) def create_blob(self, data: bytes | None = None) -> BlobNote: fd, temp_path = tempfile.mkstemp(dir=self.working) try: if data: os.write(fd, data) finally: os.close(fd) p = Path(temp_path) inode = p.stat().st_ino self._update_cache(inode, p) return FSBlobNote(self, inode=inode, path=p) def get_root(self) -> TreeNote: return FSTreeNote(self, inode=self._root_inode, path=self.root) def _get_metadata(self, inode: int, key: str) -> Optional[bytes]: row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone() return row[0] if row else None def _set_metadata(self, inode: int, key: str, value: bytes): self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value)) self.con.commit() def close(self): self.con.close() def __enter__(self): return self def __exit__(self, *args): self.close()