import os import sqlite3 import tempfile from pathlib import Path from typing import Optional, Dict, List, Self, Iterable from .trove import NODE_ROOT_ID, Note, Trove, TreeNote, BlobNote, Blob, Tree, BadNoteType, TreeEntry, NoteNotFound class FSNote(Note): def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None): self._trove: FSTrove = trove self._fs_path: Path | None = path self._inode: int | None = inode if self._fs_path is not None: inode = self._fs_path.stat().st_ino if self._inode != inode and self._inode is not None and self._inode != NODE_ROOT_ID: raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}") self._inode = inode @property def object_id(self) -> int: if self._inode is None: raise ValueError("Note not yet saved to disk") return self._inode @property def _path(self) -> Path: if self._fs_path is not None: if self._fs_path.exists(): return self._fs_path self._fs_path = None if self._inode is None: raise ValueError("Note not yet saved to disk") self._fs_path = self._trove.get_path_by_inode(self._inode) assert self._fs_path is not None return self._fs_path def get_raw_metadata(self, key: str) -> Optional[bytes]: return self._trove._get_metadata(self._inode, key) def set_raw_metadata(self, key: str, value: bytes) -> None: self._trove._set_metadata(self._inode, key, value) class FSBlobNote(FSNote, BlobNote): def read(self) -> bytes: if self._inode is None: return b"" return self._path.read_bytes() def write(self, data: bytes) -> None: self._path.write_bytes(data) # Update cache just in case inode changed (some editors do this) try: new_inode = self._path.stat().st_ino if new_inode != self._inode: self._trove._update_cache(new_inode, self._path) self._inode = new_inode except OSError: pass class FSTreeNote(FSNote, TreeNote): def link(self, name: str, note: Note): if not isinstance(note, FSBlobNote): raise BadNoteType("Only blob notes can be linked") target_path = self._path / name if target_path.exists(): self.unlink(name) note_path = note._path # If the note is in .working, move it to the new location. if self._trove.working in note_path.parents: os.rename(note_path, target_path) self._trove._update_cache(note.object_id, target_path) else: # If it's already linked somewhere, create a hard link if it's a file. if note_path.is_file(): try: os.link(note_path, target_path) except OSError: # Fallback to rename if link fails (e.g. cross-device, though we assume single FS) os.rename(note_path, target_path) else: # Directories cannot be hardlinked. # We move it to the new location. os.rename(note_path, target_path) self._trove._update_cache(note.object_id, target_path) def unlink(self, name: str): target_path = self._path / name if not target_path.exists(): return if target_path.is_dir(): target_path.rmdir() else: target_path.unlink() def mkdir(self, name: str) -> 'FSTreeNote': target_path = self._path / name target_path.mkdir(exist_ok=True) inode = target_path.stat().st_ino self._trove._update_cache(inode, target_path) return FSTreeNote(self._trove, inode=inode, path=target_path) def entries(self) -> Iterable[TreeEntry]: try: for item in self._path.iterdir(): if item.name == ".trove": continue inode = item.stat().st_ino self._trove._update_cache(inode, item) yield TreeEntry(name=item.name, object_id=inode) except OSError: pass def list(self) -> dict[str, int]: res = {} try: for item in self._path.iterdir(): if item.name == ".trove": continue res[item.name] = item.stat().st_ino self._trove._update_cache(res[item.name], item) except OSError: pass return res def child(self, name: str) -> Note: """Retrieve a child not by name.""" target_path = self._path / name return self._trove.get_raw_note_by_path(target_path) class FSTrove(Trove): def __init__(self, root: str | Path): self.root = Path(root).absolute() self._root_inode = self.root.stat().st_ino self.dot_trove = self.root / ".trove" self.working = self.dot_trove / ".working" self.dot_trove.mkdir(exist_ok=True) self.working.mkdir(exist_ok=True) db_path = self.dot_trove / "trovecache.db" self.con = sqlite3.connect(str(db_path)) self._init_db() # Ensure root mapping. self._update_cache(NODE_ROOT_ID, self.root) @classmethod def open(cls, path: str | Path, create: bool = False) -> 'FSTrove': p = Path(path) if not p.exists(): if not create: raise FileNotFoundError(f"Root path not found: {p}") p.mkdir(parents=True) return cls(p) def _init_db(self): self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)") self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))") self.con.commit() def _update_cache(self, inode: int, path: Path): try: rel_path = path.relative_to(self.root) path_str = str(rel_path) if path_str == ".": path_str = "" except ValueError: # Path not under root, maybe it's the root itself? if path == self.root: path_str = "" else: return # Not under root, don't cache self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str)) self.con.commit() def get_path_by_inode(self, inode: int) -> Optional[Path]: if inode == self._root_inode: return self.root row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone() if row: p = self.root / row[0] try: if p.exists() and p.stat().st_ino == inode: return p except OSError: pass # search for root_dir, dirs, files in os.walk(self.root): # Skip .trove if ".trove" in dirs: dirs.remove(".trove") for name in dirs + files: p = Path(root_dir) / name try: st = p.stat() if st.st_ino == inode: self._update_cache(inode, p) return p except OSError: continue return None def get_raw_note_by_path(self, target_path: Path) -> Note: if not target_path.exists(): raise NoteNotFound(target_path.relative_to(self.root)) note_id = target_path.stat().st_ino if target_path.is_dir(): return FSTreeNote(self, inode=note_id, path=target_path) else: return FSBlobNote(self, inode=note_id, path=target_path) def get_raw_note(self, note_id: int) -> Note: p = self.get_path_by_inode(note_id) if not p: raise NoteNotFound(note_id) return self.get_raw_note_by_path(p) def create_blob(self, data: bytes | None = None) -> BlobNote: fd, temp_path = tempfile.mkstemp(dir=self.working) try: if data: os.write(fd, data) finally: os.close(fd) p = Path(temp_path) inode = p.stat().st_ino self._update_cache(inode, p) return FSBlobNote(self, inode=inode, path=p) def get_root(self) -> TreeNote: return FSTreeNote(self, inode=self._root_inode, path=self.root) def _get_metadata(self, inode: int, key: str) -> Optional[bytes]: row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone() return row[0] if row else None def _set_metadata(self, inode: int, key: str, value: bytes): self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value)) self.con.commit() def close(self): self.con.close() def __enter__(self): return self def __exit__(self, *args): self.close()