diff --git a/.gitignore b/.gitignore index 5549b63..64a185a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,14 +7,6 @@ __pycache__/ # C extensions *.so -# Project specify temporary / test files -*.db -.trove/ -ai -tmp/ -ui/ - - # Distribution / packaging .Python build/ diff --git a/trovedb/db.py b/trovedb/db.py index 3156f15..762f4f2 100644 --- a/trovedb/db.py +++ b/trovedb/db.py @@ -213,7 +213,6 @@ class Sqlite3Trove: if object_id is None: object_id = uuid.uuid4() sid = _sql_id(object_id) - assert sid is not None # Preserve created timestamp on update row = self._con.execute( diff --git a/trovedb/fs.py b/trovedb/fs.py index 3b97e1b..97ba565 100644 --- a/trovedb/fs.py +++ b/trovedb/fs.py @@ -2,132 +2,99 @@ import os import sqlite3 import tempfile import datetime as dt -from pathlib import Path, PurePosixPath -from typing import Optional, Dict, List, Self, Iterable, Iterator +from pathlib import Path +from typing import Optional, Dict, List, Self, Iterable -from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound, ObjectId +from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound from . import fs_util as fsu -from . import trove as tr + class FSNote(Note): - def __init__(self, trove: 'FSTrove', path: Path): + def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None): self._trove: FSTrove = trove - self._fs_path: Path = path.resolve() - if not self._fs_path.is_relative_to(trove.root): - raise ValueError("Path must be relative to the root directory") - self._object_id: str = path.relative_to(trove.root).as_posix() + self._fs_path: Path | None = path + self._inode: int | None = inode + + if self._fs_path is not None: + inode = self._fs_path.stat().st_ino + if self._inode != inode and self._inode is not None: + raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}") + self._inode = inode @property - def object_id(self) -> tr.ObjectId: - return self._object_id - - @property - def fs_path(self) -> Path: - return self._fs_path + def object_id(self) -> int: + if self._inode is None: + raise ValueError("Note not yet saved to disk") + return self._inode @property def mtime(self): """Return modification time as datetime.""" - stat = self._fs_path.stat() + stat = self._path.stat() return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc) @property def readonly(self) -> bool: """Check if the note is readonly based on file permissions.""" - return not os.access(self._fs_path, os.W_OK) + if self._inode is None: + return False + return not os.access(self._path, os.W_OK) @property def mime(self) -> str: """Return MIME type, defaulting to generic binary stream.""" - if self._fs_path.is_dir(): - return "inode/directory" return "application/octet-stream" + @property + def _path(self) -> Path: + if self._fs_path is not None: + if self._fs_path.exists(): + return self._fs_path + self._fs_path = None + if self._inode is None: + raise ValueError("Note not yet saved to disk") + self._fs_path = self._trove.get_path_by_inode(self._inode) + assert self._fs_path is not None + return self._fs_path + def get_raw_metadata(self, key: str) -> Optional[bytes]: - # TODO: FIXME - return None + return self._trove._get_metadata(self._inode, key) def set_raw_metadata(self, key: str, value: bytes) -> None: - # TODO: FIXME - pass + self._trove._set_metadata(self._inode, key, value) def read_content(self) -> bytes: """Read the raw content of the note.""" - if self._fs_path.is_file(): - return self._fs_path.read_bytes() + content_file = fsu.get_content_path(self._path) + if content_file.exists(): + return content_file.read_bytes() return b"" def write_content(self, data:bytes) -> None: """Write the raw content of the note.""" - self._fs_path.write_bytes(data) - - def _new_child_subdir(self, name: str, exist_ok: bool = True) -> Path: - ex_path = self._fs_path / name - try: - ex_path.mkdir(exist_ok=exist_ok) - return ex_path - except FileExistsError: - raise tr.ErrorExists(str(ex_path)) from None - except OSError as e: - raise tr.ErrorWithErrno(e.errno, str(e)) from None - - def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note: - """Create a new child note.""" - - if content is None: - content = b"" - - if mime == 'inode/directory': - if content is not None: - raise NotImplementedError("FSNote does not support children") - return FSTreeNote(self._trove, self._new_child_subdir(name, False)) - - ex_path = self._fs_path / name - ex_path.write_bytes(content) - return FSNote(self._trove, ex_path) - - def child(self, name: str) -> Note: - """Retrieve a child not by name.""" - target_path = self._fs_path / name - return self._trove.get_raw_note_by_path(target_path) - - def rm_child(self, name: str, recurse: bool): - target_path = self._fs_path / name - if not target_path.exists(): - raise tr.ErrorNotFound(name) - if target_path.is_dir(): - if recurse: - raise NotImplementedError("Recursive deletion not supported") - else: - target_path.rmdir() - else: - target_path.unlink() - # TODO: remove meta directory! - - - def children(self) -> Iterator[TreeEntry]: - """Get all children of this note.""" - if not self._fs_path.is_dir(): - return - for item in self._fs_path.iterdir(): - if item.name == ".trove": - continue - yield TreeEntry(name=item.name, object_id=item.stat().st_ino) + content_file = fsu.get_content_path(self._path) + content_file.write_bytes(data) class FSTreeNote(FSNote, TreeNote): + @property + def mime(self) -> str: + """Return MIME type for directory/tree nodes.""" + return "inode/directory" + def link(self, name: str, note: Note): if not isinstance(note, FSNote): raise BadNoteType("Only blob notes can be linked") - target_path = self._fs_path / name + target_path = self._path / name if target_path.exists(): self.unlink(name) - note_path = note._fs_path + note_path = note._path # If the note is in .working, move it to the new location. if self._trove.working in note_path.parents: os.rename(note_path, target_path) + self._trove._update_cache(note.object_id, target_path) else: # If it's already linked somewhere, create a hard link if it's a file. if note_path.is_file(): @@ -140,9 +107,11 @@ class FSTreeNote(FSNote, TreeNote): # Directories cannot be hardlinked. # We move it to the new location. os.rename(note_path, target_path) + + self._trove._update_cache(note.object_id, target_path) def unlink(self, name: str): - target_path = self._fs_path / name + target_path = self._path / name if not target_path.exists(): return if target_path.is_dir(): @@ -151,20 +120,39 @@ class FSTreeNote(FSNote, TreeNote): target_path.unlink() def mkdir(self, name: str) -> 'FSTreeNote': - target_path = self._fs_path / name + target_path = self._path / name target_path.mkdir(exist_ok=True) - return FSTreeNote(self._trove, path=target_path) + inode = target_path.stat().st_ino + self._trove._update_cache(inode, target_path) + return FSTreeNote(self._trove, inode=inode, path=target_path) def entries(self) -> Iterable[TreeEntry]: try: - for item in self._fs_path.iterdir(): + for item in self._path.iterdir(): if item.name == ".trove": continue - yield TreeEntry(name=item.name, object_id=str(item)) + inode = item.stat().st_ino + self._trove._update_cache(inode, item) + yield TreeEntry(name=item.name, object_id=inode) except OSError: pass + def list(self) -> dict[str, int]: + res = {} + try: + for item in self._path.iterdir(): + if item.name == ".trove": + continue + res[item.name] = item.stat().st_ino + self._trove._update_cache(res[item.name], item) + except OSError: + pass + return res + def child(self, name: str) -> Note: + """Retrieve a child not by name.""" + target_path = self._path / name + return self._trove.get_raw_note_by_path(target_path) class FSTrove(Trove): @@ -173,9 +161,14 @@ class FSTrove(Trove): self._root_inode = self.root.stat().st_ino self.dot_trove = self.root / ".trove" self.working = self.dot_trove / ".working" + self.dot_trove.mkdir(exist_ok=True) self.working.mkdir(exist_ok=True) + db_path = self.dot_trove / "trovecache.db" + self.con = sqlite3.connect(str(db_path)) + self._init_db() + @classmethod def open(cls, path: str | Path, create: bool = False) -> 'FSTrove': p = Path(path) @@ -185,30 +178,101 @@ class FSTrove(Trove): p.mkdir(parents=True) return cls(p) - def get_raw_note_by_path(self, path: Path) -> Note: - if not path.exists(): - raise tr.ErrorNotFound(str(path)) - if path.is_dir(): - return FSTreeNote(self, path=path) - return FSNote(self, path=path) + def _init_db(self): + self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)") + self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))") + self.con.commit() - def get_raw_note(self, note_id: ObjectId) -> Note: - p = self.root / str(note_id) - if not p.exists(): + def _update_cache(self, inode: int, path: Path): + try: + rel_path = path.relative_to(self.root) + path_str = str(rel_path) + if path_str == ".": + path_str = "" + except ValueError: + # Path not under root, maybe it's the root itself? + if path == self.root: + path_str = "" + else: + return # Not under root, don't cache + + self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str)) + self.con.commit() + + def get_path_by_inode(self, inode: int) -> Optional[Path]: + if inode == self._root_inode: + return self.root + + row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone() + if row: + p = self.root / row[0] + try: + if p.exists() and p.stat().st_ino == inode: + return p + except OSError: + pass + + # search + for root_dir, dirs, files in os.walk(self.root): + # Skip .trove + if ".trove" in dirs: + dirs.remove(".trove") + + for name in dirs + files: + p = Path(root_dir) / name + try: + st = p.stat() + if st.st_ino == inode: + self._update_cache(inode, p) + return p + except OSError: + continue + return None + + def get_raw_note_by_path(self, target_path: Path) -> Note: + if not target_path.exists(): + raise NoteNotFound(target_path.relative_to(self.root)) + note_id = target_path.stat().st_ino + if target_path.is_dir(): + return FSTreeNote(self, inode=note_id, path=target_path) + else: + return FSNote(self, inode=note_id, path=target_path) + + def get_raw_note(self, note_id: int) -> Note: + p = self.get_path_by_inode(note_id) + if not p: raise NoteNotFound(note_id) return self.get_raw_note_by_path(p) + def create_blob(self, data: bytes | None = None) -> Note: - raise NotImplementedError("FSTrove does not support blobs") + fd, temp_path = tempfile.mkstemp(dir=self.working) + try: + if data: + os.write(fd, data) + finally: + os.close(fd) + p = Path(temp_path) + inode = p.stat().st_ino + self._update_cache(inode, p) + return FSNote(self, inode=inode, path=p) def get_root(self) -> TreeNote: - return FSTreeNote(self, path=self.root) + return FSTreeNote(self, inode=self._root_inode, path=self.root) def _get_metadata(self, inode: int, key: str) -> Optional[bytes]: - raise NotImplementedError("FSTrove does not support metadata") + row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone() + return row[0] if row else None def _set_metadata(self, inode: int, key: str, value: bytes): - raise NotImplementedError("FSTrove does not support metadata") + self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value)) + self.con.commit() def close(self): - pass + self.con.close() + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() diff --git a/trovedb/fs_util.py b/trovedb/fs_util.py index 83dd673..2654be7 100644 --- a/trovedb/fs_util.py +++ b/trovedb/fs_util.py @@ -90,8 +90,8 @@ def get_content_index_name_from_path(path: Path) -> str: """Return the name of the index file for a directory.""" # TODO: improve handling and mimetype logic if not path.suffix: - return 'index.dat' - return f'index{path.suffix}' + return '_index.dat' + return f'_index{path.suffix}' def get_content_path(path: str | Path) -> Path: """Return the path to the content file for a directory or file""" diff --git a/trovedb/fuse/server.py b/trovedb/fuse/server.py index fea7c92..f933972 100644 --- a/trovedb/fuse/server.py +++ b/trovedb/fuse/server.py @@ -20,9 +20,7 @@ import pyfuse3 import trio from pyfuse3 import InodeT, FileHandleT -from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, ObjectId, TreeExists - -import trovedb.trove as tr +from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, Blob as TroveBlob, ObjectId, TreeExists logger = logging.getLogger(__name__) @@ -142,7 +140,7 @@ class TroveFuseOps(pyfuse3.Operations): raise pyfuse3.FUSEError(errno.ENOTDIR) try: note = parent.child(name.decode()) - except (KeyError, tr.ErrorNotFound): + except KeyError: logger.debug("lookup failed: %d -> %s", parent_inode, name.decode()) raise pyfuse3.FUSEError(errno.ENOENT) from None ent = self._create_get_ent_from_note(note) @@ -186,8 +184,8 @@ class TroveFuseOps(pyfuse3.Operations): # Determine basic information is_tree = True size = 0 - if not hasattr(note, 'mkdir'): - size = len(note.read_content()) + if isinstance(note, TroveBlob): + size = len(note.read()) is_tree = False # Create and fill attr structure @@ -241,13 +239,13 @@ class TroveFuseOps(pyfuse3.Operations): ent = self._get_ent_from_inode(inode) note = self._get_ent_note(ent) if fields.update_size: - if not hasattr(note, 'mkdir'): - current = note.read_content() + if isinstance(note, TroveBlob): + current = note.read() new_size = attr.st_size if new_size < len(current): - note.write_content(current[:new_size]) + note.write(current[:new_size]) elif new_size > len(current): - note.write_content(current + b"\x00" * (new_size - len(current))) + note.write(current + b"\x00" * (new_size - len(current))) else: raise pyfuse3.FUSEError(errno.EINVAL) return self._get_attr(ent, note) @@ -280,13 +278,13 @@ class TroveFuseOps(pyfuse3.Operations): if not isinstance(note, TroveTree): logger.debug("attempted readdir on %d not a tree", fh) raise pyfuse3.FUSEError(errno.ENOTDIR) - entries = list(note.entries()) # [(name, object_id), ...] + entries = list(note.list().items()) # [(name, object_id), ...] - for idx, entry in enumerate(entries): + for idx, (name, child_id) in enumerate(entries): if idx < start_id: continue - child = self._trove.get_raw_note(entry.object_id) + child = self._trove.get_raw_note(child_id) if child is None: continue @@ -294,7 +292,7 @@ class TroveFuseOps(pyfuse3.Operations): attr = self._get_attr(child_ent, child) self._ref_entry(child_ent) - if not pyfuse3.readdir_reply(token, entry.name.encode(), attr, idx + 1): + if not pyfuse3.readdir_reply(token, name.encode(), attr, idx + 1): break async def releasedir(self, fh: FileHandleT) -> None: @@ -304,18 +302,19 @@ class TroveFuseOps(pyfuse3.Operations): async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes: logger.debug("mkdir inode:%d name:%s", parent_inode, name) - + # Grab parent note, verify is tree parent = self._get_inode_note(parent_inode) - # TODO: consider implications here, maybe look at ext on dir for mime? + if not isinstance(parent, TreeNote): + raise pyfuse3.FUSEError(errno.ENOTDIR) + # Create new directory in note try: - note = tr.new_child(parent, name.decode(), mime='inode/directory') - except tr.ErrorWithErrno as e: - raise pyfuse3.FUSEError(e.errno) from None - + new_tree: TreeNote = parent.mkdir(name.decode()) + except TreeExists: + raise pyfuse3.FUSEError(errno.EEXIST) from None # Grab entity for kernel - ent = self._create_get_ent_from_note(note) + ent = self._create_get_ent_from_note(new_tree) self._ref_entry(ent) - return self._get_attr(ent, note) + return self._get_attr(ent, new_tree) async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None: logger.debug("rmdir inode:%d name:%s", parent_inode, name) @@ -342,35 +341,37 @@ class TroveFuseOps(pyfuse3.Operations): async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple: logger.debug("create inode:%d name:%s", parent_inode, name) parent = self._get_inode_note(parent_inode) - - # TODO: handle mode - # TODO: handle flags - + if not isinstance(parent, TroveTree): + raise pyfuse3.FUSEError(errno.ENOTDIR) name_str = name.decode() - note = tr.new_child(parent, name_str) - ent = self._create_get_ent_from_note(note) + if name_str in parent.list(): + raise pyfuse3.FUSEError(errno.EEXIST) + blob = self._trove.create_blob(b"") + parent.link(name_str, blob) + + ent = self._create_get_ent_from_note(blob) self._ref_entry(ent) handle = self._open_handle(ent.sys_inode) - attr = self._get_attr(ent, note) + attr = self._get_attr(ent, blob) return pyfuse3.FileInfo(fh=handle.handle_id), attr async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes: logger.debug("read fh:%d offset:%d length:%d", fh, offset, length) handle = self._get_handle(fh) note = handle.note - if not hasattr(note, 'mkdir'): - return note.read_content()[offset:offset + length] + if isinstance(note, TroveBlob): + return note.read()[offset:offset + length] raise pyfuse3.FUSEError(errno.EBADF) async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int: handle = self._get_handle(fh) note = handle.note - if not hasattr(note, 'mkdir'): - existing = note.read_content() + if isinstance(note, TroveBlob): + existing = note.read() if offset > len(existing): existing = existing + b"\x00" * (offset - len(existing)) - note.write_content(existing[:offset] + data + existing[offset + len(data):]) + note.write(existing[:offset] + data + existing[offset + len(data):]) return len(data) async def release(self, fh: FileHandleT) -> None: @@ -379,8 +380,12 @@ class TroveFuseOps(pyfuse3.Operations): async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None: parent_note = self._get_inode_note(parent_inode) + if not isinstance(parent_note, TroveTree): + raise pyfuse3.FUSEError(errno.ENOTDIR) name_str = name.decode() - parent_note.rm_child(name_str, False) + if name_str not in parent_note.list(): + raise pyfuse3.FUSEError(errno.ENOENT) + parent_note.unlink(name.decode()) async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx): # Decode / validate names diff --git a/trovedb/qgui/main_window.py b/trovedb/qgui/main_window.py index 11e1ac4..d4cdde4 100644 --- a/trovedb/qgui/main_window.py +++ b/trovedb/qgui/main_window.py @@ -22,20 +22,20 @@ class TroveMainWindow(QMainWindow): self.setWindowTitle("Trove") # ── Toolbar ── - # toolbar = QToolBar("Main") - # toolbar.setObjectName("maintoolbar") - # toolbar.setMovable(False) - # self.addToolBar(toolbar) - # - # new_action = QAction("New", self) - # new_action.setShortcut(QKeySequence.StandardKey.New) - # toolbar.addAction(new_action) - # - # save_action = QAction("Save", self) - # save_action.setShortcut(QKeySequence.StandardKey.Save) - # toolbar.addAction(save_action) - # - # toolbar.addSeparator() + toolbar = QToolBar("Main") + toolbar.setObjectName("maintoolbar") + toolbar.setMovable(False) + self.addToolBar(toolbar) + + new_action = QAction("New", self) + new_action.setShortcut(QKeySequence.StandardKey.New) + toolbar.addAction(new_action) + + save_action = QAction("Save", self) + save_action.setShortcut(QKeySequence.StandardKey.Save) + toolbar.addAction(save_action) + + toolbar.addSeparator() # ── Central layout ── central = QWidget() diff --git a/trovedb/qgui/tool_basic_editor.py b/trovedb/qgui/tool_basic_editor.py index 1f667e5..8c0f408 100644 --- a/trovedb/qgui/tool_basic_editor.py +++ b/trovedb/qgui/tool_basic_editor.py @@ -1,6 +1,5 @@ """Tool Supporting Basic Editor Functions""" from typing import cast, Protocol -from PySide6.QtCore import QTimer from PySide6.QtWidgets import QTextEdit, QVBoxLayout import trovedb.trove as tr @@ -17,33 +16,7 @@ class ToolBasicEditor(Tool): self._text_edit = QTextEdit() layout.addWidget(self._text_edit) - - self._content_dirty = False - self._auto_save_timer = QTimer(self) - self._auto_save_timer.setSingleShot(True) - self._auto_save_timer.setInterval(2000) # 2 seconds after typing stops - self._auto_save_timer.timeout.connect(self._perform_auto_save) - - self._text_edit.textChanged.connect(self._schedule_auto_save) - self.refresh() def refresh(self): self._text_edit.setPlainText(self.note.read_content().decode("utf-8")) - self._content_dirty = False - - def _schedule_auto_save(self): - self._content_dirty = True - self._auto_save_timer.stop() - self._auto_save_timer.start() - - def _perform_auto_save(self): - if self._content_dirty: - content = self._text_edit.toPlainText().encode("utf-8") - self.note.write_content(content) - self._content_dirty = False - - def closeEvent(self, event): - if self._content_dirty: - self._perform_auto_save() - super().closeEvent(event) diff --git a/trovedb/trove.py b/trovedb/trove.py index 06ee331..a3a47f8 100644 --- a/trovedb/trove.py +++ b/trovedb/trove.py @@ -1,39 +1,14 @@ -from typing import Protocol, runtime_checkable, Optional, Dict, List, Self, NamedTuple, Iterable, TypedDict, Iterator +from typing import Protocol, runtime_checkable, Optional, Dict, List, Self, NamedTuple, Iterable, TypedDict from uuid import UUID +from pathlib import PurePosixPath import datetime as dt -import errno + type ObjectId = int | str | UUID class TroveError(Exception): """Base class for all Trove errors.""" -class ErrorWithErrno(TroveError): - """Raised when an error occurs with an errno.""" - - def __init__(self, error: int, *args): - super().__init__(*args) - self.errno = error - -class ErrorExists(ErrorWithErrno): - """Raised when a note already exists.""" - - def __init__(self, *args): - super().__init__(errno.EEXIST, *args) - -class ErrorNotFound(ErrorWithErrno): - """Raised when a note is not found.""" - - def __init__(self, *args): - super().__init__(errno.ENOENT, *args) - -class ErrorNotEmpty(ErrorWithErrno): - """Raised when a directory is not empty.""" - - def __init__(self, *args): - super().__init__(errno.ENOTEMPTY, *args) - - class BadNoteType(TypeError): """Raised when an invalid note type is encountered.""" @@ -46,12 +21,6 @@ class NoteNotFound(KeyError): class OpenArguments(TypedDict): create: bool -class TreeEntry(NamedTuple): - name: str - object_id: ObjectId - -DEFAULT_MIME = "application/octet-stream" - @runtime_checkable class Note(Protocol): """ @@ -95,30 +64,10 @@ class Note(Protocol): """Write the raw content of the note.""" ... - def children(self) -> Iterator[TreeEntry]: - """Get all children of this note.""" - ... - - def child(self, name: str) -> 'Note': - """Retrieve a child note by name.""" - ... - - def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> 'Note': - """Create a new child note.""" - ... - - def rm_child(self, name: str, recurse: bool): - """Remove a child note.""" - ... - - def has_children(self) -> bool: - """Check if note has children.""" - return next(self.children(), None) is not None - - -def new_child(note: Note, name: str, mime: str = DEFAULT_MIME, content: bytes | None = None, executable: bool = False, hidden: bool = False) -> Note: - return note.new_child(name=name, mime=mime, content=content, executable=executable, hidden=hidden) +class TreeEntry(NamedTuple): + name: str + object_id: ObjectId @runtime_checkable class Tree(Protocol): @@ -138,10 +87,17 @@ class Tree(Protocol): """Remove a directory from the tree.""" ... + def child(self, name: str) -> Note: + """Retrieve a child note by name.""" + ... + def entries(self) -> Iterable[TreeEntry]: """Return all entries in the directory""" ... + def list(self) -> dict[str, int]: + """Return all entries as {name: object_id}.""" + ... @runtime_checkable class TreeNote(Note, Tree, Protocol): diff --git a/trovedb/trovedb.py b/trovedb/trovedb.py index 30dfa6d..77aaa23 100644 --- a/trovedb/trovedb.py +++ b/trovedb/trovedb.py @@ -5,10 +5,9 @@ Implements BlobNote, TreeNote, and Trove protocols defined in trove.py. Depends on db.py (Sqlite3Trove) for storage. """ -from typing import Optional, Iterator +from typing import Optional from pathlib import Path import datetime as dt -import uuid from .db import Sqlite3Trove, NOTE_ROOT_ID @@ -21,19 +20,9 @@ class NoteImpl(Note): """Concrete note implementation.""" def __init__(self, parent: 'TroveImpl', object_id: ObjectId): - if not isinstance(object_id, uuid.UUID): - object_id = uuid.UUID(str(object_id)) - assert isinstance(object_id, uuid.UUID) - self._parent = parent self._db = parent.db - self._object_id: uuid.UUID = object_id - - @staticmethod - def get_impl_id(note: Note) -> uuid.UUID: - if not isinstance(note.object_id, uuid.UUID): - raise TypeError("Note not compatible with NoteImpl") - return note.object_id + self._object_id = object_id # Note protocol @property @@ -47,8 +36,7 @@ class NoteImpl(Note): @property def mtime(self) -> dt.datetime: """Return modification time as UTC datetime.""" - mtime = self._db.get_mtime(self._object_id) - return mtime if mtime is not None else dt.datetime.now(tz=dt.timezone.utc) + return self._db.get_mtime(self._object_id) @property def mime(self) -> str: @@ -69,39 +57,6 @@ class NoteImpl(Note): def write_content(self, data: bytes) -> None: self._db.write_content(self._object_id, data) - def children(self) -> Iterator[TreeEntry]: - """Get all children of this note.""" - for name, object_id in self._db.list_tree(self._object_id).items(): - yield TreeEntry(name, object_id) - - def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note: - """Create a new child note.""" - content = content if content is not None else b"" - object_id = self._db.write_blob(data=content, object_id=None, dtype=mime, executable=executable, hidden=hidden) - # TODO fix this - if mime == 'inode/directory': - return TreeNoteImpl(self._parent, object_id) - return NoteImpl(self._parent, object_id) - - def child(self, name: str) -> Note: - """Retrieve a child note by name.""" - entries = self._db.list_tree(self._object_id) - if name not in entries: - raise tr.ErrorNotFound(name) - child_id = entries[name] - value = self._parent.get_raw_note(child_id) - if value is None: - raise tr.ErrorNotFound("dangling child link") # FIXME: better errors - return value - - def rm_child(self, name: str, recurse: bool) -> None: - """Remove a child note.""" - note = self.child(name) - if note.has_children() and not recurse: - raise tr.ErrorNotEmpty(name) - self._db.unlink(self._object_id, name) - - class TreeNoteImpl(NoteImpl, TreeNote): """Concrete TreeNote: a tree object backed by the tree_entries table.""" @@ -109,7 +64,7 @@ class TreeNoteImpl(NoteImpl, TreeNote): # Tree protocol def link(self, name: str, note: Note) -> None: """Link name to an existing note.""" - self._db.link(self._object_id, name, NoteImpl.get_impl_id(note)) + self._db.link(self._object_id, name, note.object_id) def unlink(self, name: str) -> None: """Remove an entry by name.""" @@ -126,13 +81,25 @@ class TreeNoteImpl(NoteImpl, TreeNote): """Remove a directory from the tree.""" self.unlink(name) - + def child(self, name: str) -> Note: + """Retrieve a child note by name.""" + entries = self._db.list_tree(self._object_id) + if name not in entries: + raise KeyError(f"Entry '{name}' not found") + child_id = entries[name] + value = self._parent.get_raw_note(child_id) + if value is None: + raise KeyError(f"Entry '{name}' has no value") + return value def entries(self): """Return all entries as an iterable of TreeEntry.""" for name, object_id in self._db.list_tree(self._object_id).items(): yield TreeEntry(name, object_id) + def list(self) -> dict[str, ObjectId]: + """Return all entries as {name: object_id}.""" + return self._db.list_tree(self._object_id) # --------------------------------------------------------------------------- @@ -170,8 +137,6 @@ class TroveImpl: # Trove protocol def get_raw_note(self, note_id: ObjectId) -> Note: """Return a BlobNote or TreeNote for the given id, or None if not found.""" - if not isinstance(note_id, uuid.UUID): - note_id = uuid.UUID(str(note_id)) info = self._db.get_info(note_id) if info is None: raise NoteNotFound(note_id)