Compare commits

...

4 commits

Author SHA1 Message Date
41480a39c9 Start API refactor to remove separate Tree
We want trees to just be notes. We want to reference notes
by Path instead of object ids. Stop thinking about the
fs implementation and the fuse service with the same terms -
they will be different backends.
2026-03-27 12:44:35 -05:00
abbef64bbc Switch to using index instead of _index 2026-03-26 19:02:20 -05:00
0b3025b878 Add some project specific ignores 2026-03-26 16:11:11 -05:00
a67b9c48c2 Add trove auto save function 2026-03-26 16:08:10 -05:00
9 changed files with 296 additions and 250 deletions

8
.gitignore vendored
View file

@ -7,6 +7,14 @@ __pycache__/
# C extensions
*.so
# Project specify temporary / test files
*.db
.trove/
ai
tmp/
ui/
# Distribution / packaging
.Python
build/

View file

@ -213,6 +213,7 @@ class Sqlite3Trove:
if object_id is None:
object_id = uuid.uuid4()
sid = _sql_id(object_id)
assert sid is not None
# Preserve created timestamp on update
row = self._con.execute(

View file

@ -2,99 +2,132 @@ import os
import sqlite3
import tempfile
import datetime as dt
from pathlib import Path
from typing import Optional, Dict, List, Self, Iterable
from pathlib import Path, PurePosixPath
from typing import Optional, Dict, List, Self, Iterable, Iterator
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound, ObjectId
from . import fs_util as fsu
from . import trove as tr
class FSNote(Note):
def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None):
def __init__(self, trove: 'FSTrove', path: Path):
self._trove: FSTrove = trove
self._fs_path: Path | None = path
self._inode: int | None = inode
if self._fs_path is not None:
inode = self._fs_path.stat().st_ino
if self._inode != inode and self._inode is not None:
raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}")
self._inode = inode
self._fs_path: Path = path.resolve()
if not self._fs_path.is_relative_to(trove.root):
raise ValueError("Path must be relative to the root directory")
self._object_id: str = path.relative_to(trove.root).as_posix()
@property
def object_id(self) -> int:
if self._inode is None:
raise ValueError("Note not yet saved to disk")
return self._inode
def object_id(self) -> tr.ObjectId:
return self._object_id
@property
def fs_path(self) -> Path:
return self._fs_path
@property
def mtime(self):
"""Return modification time as datetime."""
stat = self._path.stat()
stat = self._fs_path.stat()
return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc)
@property
def readonly(self) -> bool:
"""Check if the note is readonly based on file permissions."""
if self._inode is None:
return False
return not os.access(self._path, os.W_OK)
return not os.access(self._fs_path, os.W_OK)
@property
def mime(self) -> str:
"""Return MIME type, defaulting to generic binary stream."""
if self._fs_path.is_dir():
return "inode/directory"
return "application/octet-stream"
@property
def _path(self) -> Path:
if self._fs_path is not None:
if self._fs_path.exists():
return self._fs_path
self._fs_path = None
if self._inode is None:
raise ValueError("Note not yet saved to disk")
self._fs_path = self._trove.get_path_by_inode(self._inode)
assert self._fs_path is not None
return self._fs_path
def get_raw_metadata(self, key: str) -> Optional[bytes]:
return self._trove._get_metadata(self._inode, key)
# TODO: FIXME
return None
def set_raw_metadata(self, key: str, value: bytes) -> None:
self._trove._set_metadata(self._inode, key, value)
# TODO: FIXME
pass
def read_content(self) -> bytes:
"""Read the raw content of the note."""
content_file = fsu.get_content_path(self._path)
if content_file.exists():
return content_file.read_bytes()
if self._fs_path.is_file():
return self._fs_path.read_bytes()
return b""
def write_content(self, data:bytes) -> None:
"""Write the raw content of the note."""
content_file = fsu.get_content_path(self._path)
content_file.write_bytes(data)
self._fs_path.write_bytes(data)
def _new_child_subdir(self, name: str, exist_ok: bool = True) -> Path:
ex_path = self._fs_path / name
try:
ex_path.mkdir(exist_ok=exist_ok)
return ex_path
except FileExistsError:
raise tr.ErrorExists(str(ex_path)) from None
except OSError as e:
raise tr.ErrorWithErrno(e.errno, str(e)) from None
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note:
"""Create a new child note."""
if content is None:
content = b""
if mime == 'inode/directory':
if content is not None:
raise NotImplementedError("FSNote does not support children")
return FSTreeNote(self._trove, self._new_child_subdir(name, False))
ex_path = self._fs_path / name
ex_path.write_bytes(content)
return FSNote(self._trove, ex_path)
def child(self, name: str) -> Note:
"""Retrieve a child not by name."""
target_path = self._fs_path / name
return self._trove.get_raw_note_by_path(target_path)
def rm_child(self, name: str, recurse: bool):
target_path = self._fs_path / name
if not target_path.exists():
raise tr.ErrorNotFound(name)
if target_path.is_dir():
if recurse:
raise NotImplementedError("Recursive deletion not supported")
else:
target_path.rmdir()
else:
target_path.unlink()
# TODO: remove meta directory!
def children(self) -> Iterator[TreeEntry]:
"""Get all children of this note."""
if not self._fs_path.is_dir():
return
for item in self._fs_path.iterdir():
if item.name == ".trove":
continue
yield TreeEntry(name=item.name, object_id=item.stat().st_ino)
class FSTreeNote(FSNote, TreeNote):
@property
def mime(self) -> str:
"""Return MIME type for directory/tree nodes."""
return "inode/directory"
def link(self, name: str, note: Note):
if not isinstance(note, FSNote):
raise BadNoteType("Only blob notes can be linked")
target_path = self._path / name
target_path = self._fs_path / name
if target_path.exists():
self.unlink(name)
note_path = note._path
note_path = note._fs_path
# If the note is in .working, move it to the new location.
if self._trove.working in note_path.parents:
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
else:
# If it's already linked somewhere, create a hard link if it's a file.
if note_path.is_file():
@ -108,10 +141,8 @@ class FSTreeNote(FSNote, TreeNote):
# We move it to the new location.
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
def unlink(self, name: str):
target_path = self._path / name
target_path = self._fs_path / name
if not target_path.exists():
return
if target_path.is_dir():
@ -120,39 +151,20 @@ class FSTreeNote(FSNote, TreeNote):
target_path.unlink()
def mkdir(self, name: str) -> 'FSTreeNote':
target_path = self._path / name
target_path = self._fs_path / name
target_path.mkdir(exist_ok=True)
inode = target_path.stat().st_ino
self._trove._update_cache(inode, target_path)
return FSTreeNote(self._trove, inode=inode, path=target_path)
return FSTreeNote(self._trove, path=target_path)
def entries(self) -> Iterable[TreeEntry]:
try:
for item in self._path.iterdir():
for item in self._fs_path.iterdir():
if item.name == ".trove":
continue
inode = item.stat().st_ino
self._trove._update_cache(inode, item)
yield TreeEntry(name=item.name, object_id=inode)
yield TreeEntry(name=item.name, object_id=str(item))
except OSError:
pass
def list(self) -> dict[str, int]:
res = {}
try:
for item in self._path.iterdir():
if item.name == ".trove":
continue
res[item.name] = item.stat().st_ino
self._trove._update_cache(res[item.name], item)
except OSError:
pass
return res
def child(self, name: str) -> Note:
"""Retrieve a child not by name."""
target_path = self._path / name
return self._trove.get_raw_note_by_path(target_path)
class FSTrove(Trove):
@ -161,14 +173,9 @@ class FSTrove(Trove):
self._root_inode = self.root.stat().st_ino
self.dot_trove = self.root / ".trove"
self.working = self.dot_trove / ".working"
self.dot_trove.mkdir(exist_ok=True)
self.working.mkdir(exist_ok=True)
db_path = self.dot_trove / "trovecache.db"
self.con = sqlite3.connect(str(db_path))
self._init_db()
@classmethod
def open(cls, path: str | Path, create: bool = False) -> 'FSTrove':
p = Path(path)
@ -178,101 +185,30 @@ class FSTrove(Trove):
p.mkdir(parents=True)
return cls(p)
def _init_db(self):
self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)")
self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))")
self.con.commit()
def get_raw_note_by_path(self, path: Path) -> Note:
if not path.exists():
raise tr.ErrorNotFound(str(path))
if path.is_dir():
return FSTreeNote(self, path=path)
return FSNote(self, path=path)
def _update_cache(self, inode: int, path: Path):
try:
rel_path = path.relative_to(self.root)
path_str = str(rel_path)
if path_str == ".":
path_str = ""
except ValueError:
# Path not under root, maybe it's the root itself?
if path == self.root:
path_str = ""
else:
return # Not under root, don't cache
self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str))
self.con.commit()
def get_path_by_inode(self, inode: int) -> Optional[Path]:
if inode == self._root_inode:
return self.root
row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone()
if row:
p = self.root / row[0]
try:
if p.exists() and p.stat().st_ino == inode:
return p
except OSError:
pass
# search
for root_dir, dirs, files in os.walk(self.root):
# Skip .trove
if ".trove" in dirs:
dirs.remove(".trove")
for name in dirs + files:
p = Path(root_dir) / name
try:
st = p.stat()
if st.st_ino == inode:
self._update_cache(inode, p)
return p
except OSError:
continue
return None
def get_raw_note_by_path(self, target_path: Path) -> Note:
if not target_path.exists():
raise NoteNotFound(target_path.relative_to(self.root))
note_id = target_path.stat().st_ino
if target_path.is_dir():
return FSTreeNote(self, inode=note_id, path=target_path)
else:
return FSNote(self, inode=note_id, path=target_path)
def get_raw_note(self, note_id: int) -> Note:
p = self.get_path_by_inode(note_id)
if not p:
def get_raw_note(self, note_id: ObjectId) -> Note:
p = self.root / str(note_id)
if not p.exists():
raise NoteNotFound(note_id)
return self.get_raw_note_by_path(p)
def create_blob(self, data: bytes | None = None) -> Note:
fd, temp_path = tempfile.mkstemp(dir=self.working)
try:
if data:
os.write(fd, data)
finally:
os.close(fd)
p = Path(temp_path)
inode = p.stat().st_ino
self._update_cache(inode, p)
return FSNote(self, inode=inode, path=p)
raise NotImplementedError("FSTrove does not support blobs")
def get_root(self) -> TreeNote:
return FSTreeNote(self, inode=self._root_inode, path=self.root)
return FSTreeNote(self, path=self.root)
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone()
return row[0] if row else None
raise NotImplementedError("FSTrove does not support metadata")
def _set_metadata(self, inode: int, key: str, value: bytes):
self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value))
self.con.commit()
raise NotImplementedError("FSTrove does not support metadata")
def close(self):
self.con.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
pass

View file

@ -90,8 +90,8 @@ def get_content_index_name_from_path(path: Path) -> str:
"""Return the name of the index file for a directory."""
# TODO: improve handling and mimetype logic
if not path.suffix:
return '_index.dat'
return f'_index{path.suffix}'
return 'index.dat'
return f'index{path.suffix}'
def get_content_path(path: str | Path) -> Path:
"""Return the path to the content file for a directory or file"""

View file

@ -20,7 +20,9 @@ import pyfuse3
import trio
from pyfuse3 import InodeT, FileHandleT
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, Blob as TroveBlob, ObjectId, TreeExists
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, ObjectId, TreeExists
import trovedb.trove as tr
logger = logging.getLogger(__name__)
@ -140,7 +142,7 @@ class TroveFuseOps(pyfuse3.Operations):
raise pyfuse3.FUSEError(errno.ENOTDIR)
try:
note = parent.child(name.decode())
except KeyError:
except (KeyError, tr.ErrorNotFound):
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
raise pyfuse3.FUSEError(errno.ENOENT) from None
ent = self._create_get_ent_from_note(note)
@ -184,8 +186,8 @@ class TroveFuseOps(pyfuse3.Operations):
# Determine basic information
is_tree = True
size = 0
if isinstance(note, TroveBlob):
size = len(note.read())
if not hasattr(note, 'mkdir'):
size = len(note.read_content())
is_tree = False
# Create and fill attr structure
@ -239,13 +241,13 @@ class TroveFuseOps(pyfuse3.Operations):
ent = self._get_ent_from_inode(inode)
note = self._get_ent_note(ent)
if fields.update_size:
if isinstance(note, TroveBlob):
current = note.read()
if not hasattr(note, 'mkdir'):
current = note.read_content()
new_size = attr.st_size
if new_size < len(current):
note.write(current[:new_size])
note.write_content(current[:new_size])
elif new_size > len(current):
note.write(current + b"\x00" * (new_size - len(current)))
note.write_content(current + b"\x00" * (new_size - len(current)))
else:
raise pyfuse3.FUSEError(errno.EINVAL)
return self._get_attr(ent, note)
@ -278,13 +280,13 @@ class TroveFuseOps(pyfuse3.Operations):
if not isinstance(note, TroveTree):
logger.debug("attempted readdir on %d not a tree", fh)
raise pyfuse3.FUSEError(errno.ENOTDIR)
entries = list(note.list().items()) # [(name, object_id), ...]
entries = list(note.entries()) # [(name, object_id), ...]
for idx, (name, child_id) in enumerate(entries):
for idx, entry in enumerate(entries):
if idx < start_id:
continue
child = self._trove.get_raw_note(child_id)
child = self._trove.get_raw_note(entry.object_id)
if child is None:
continue
@ -292,7 +294,7 @@ class TroveFuseOps(pyfuse3.Operations):
attr = self._get_attr(child_ent, child)
self._ref_entry(child_ent)
if not pyfuse3.readdir_reply(token, name.encode(), attr, idx + 1):
if not pyfuse3.readdir_reply(token, entry.name.encode(), attr, idx + 1):
break
async def releasedir(self, fh: FileHandleT) -> None:
@ -302,19 +304,18 @@ class TroveFuseOps(pyfuse3.Operations):
async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes:
logger.debug("mkdir inode:%d name:%s", parent_inode, name)
# Grab parent note, verify is tree
parent = self._get_inode_note(parent_inode)
if not isinstance(parent, TreeNote):
raise pyfuse3.FUSEError(errno.ENOTDIR)
# Create new directory in note
# TODO: consider implications here, maybe look at ext on dir for mime?
try:
new_tree: TreeNote = parent.mkdir(name.decode())
except TreeExists:
raise pyfuse3.FUSEError(errno.EEXIST) from None
note = tr.new_child(parent, name.decode(), mime='inode/directory')
except tr.ErrorWithErrno as e:
raise pyfuse3.FUSEError(e.errno) from None
# Grab entity for kernel
ent = self._create_get_ent_from_note(new_tree)
ent = self._create_get_ent_from_note(note)
self._ref_entry(ent)
return self._get_attr(ent, new_tree)
return self._get_attr(ent, note)
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
@ -341,37 +342,35 @@ class TroveFuseOps(pyfuse3.Operations):
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
logger.debug("create inode:%d name:%s", parent_inode, name)
parent = self._get_inode_note(parent_inode)
if not isinstance(parent, TroveTree):
raise pyfuse3.FUSEError(errno.ENOTDIR)
name_str = name.decode()
if name_str in parent.list():
raise pyfuse3.FUSEError(errno.EEXIST)
blob = self._trove.create_blob(b"")
parent.link(name_str, blob)
ent = self._create_get_ent_from_note(blob)
# TODO: handle mode
# TODO: handle flags
name_str = name.decode()
note = tr.new_child(parent, name_str)
ent = self._create_get_ent_from_note(note)
self._ref_entry(ent)
handle = self._open_handle(ent.sys_inode)
attr = self._get_attr(ent, blob)
attr = self._get_attr(ent, note)
return pyfuse3.FileInfo(fh=handle.handle_id), attr
async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes:
logger.debug("read fh:%d offset:%d length:%d", fh, offset, length)
handle = self._get_handle(fh)
note = handle.note
if isinstance(note, TroveBlob):
return note.read()[offset:offset + length]
if not hasattr(note, 'mkdir'):
return note.read_content()[offset:offset + length]
raise pyfuse3.FUSEError(errno.EBADF)
async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int:
handle = self._get_handle(fh)
note = handle.note
if isinstance(note, TroveBlob):
existing = note.read()
if not hasattr(note, 'mkdir'):
existing = note.read_content()
if offset > len(existing):
existing = existing + b"\x00" * (offset - len(existing))
note.write(existing[:offset] + data + existing[offset + len(data):])
note.write_content(existing[:offset] + data + existing[offset + len(data):])
return len(data)
async def release(self, fh: FileHandleT) -> None:
@ -380,12 +379,8 @@ class TroveFuseOps(pyfuse3.Operations):
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
parent_note = self._get_inode_note(parent_inode)
if not isinstance(parent_note, TroveTree):
raise pyfuse3.FUSEError(errno.ENOTDIR)
name_str = name.decode()
if name_str not in parent_note.list():
raise pyfuse3.FUSEError(errno.ENOENT)
parent_note.unlink(name.decode())
parent_note.rm_child(name_str, False)
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
# Decode / validate names

View file

@ -22,20 +22,20 @@ class TroveMainWindow(QMainWindow):
self.setWindowTitle("Trove")
# ── Toolbar ──
toolbar = QToolBar("Main")
toolbar.setObjectName("maintoolbar")
toolbar.setMovable(False)
self.addToolBar(toolbar)
new_action = QAction("New", self)
new_action.setShortcut(QKeySequence.StandardKey.New)
toolbar.addAction(new_action)
save_action = QAction("Save", self)
save_action.setShortcut(QKeySequence.StandardKey.Save)
toolbar.addAction(save_action)
toolbar.addSeparator()
# toolbar = QToolBar("Main")
# toolbar.setObjectName("maintoolbar")
# toolbar.setMovable(False)
# self.addToolBar(toolbar)
#
# new_action = QAction("New", self)
# new_action.setShortcut(QKeySequence.StandardKey.New)
# toolbar.addAction(new_action)
#
# save_action = QAction("Save", self)
# save_action.setShortcut(QKeySequence.StandardKey.Save)
# toolbar.addAction(save_action)
#
# toolbar.addSeparator()
# ── Central layout ──
central = QWidget()

View file

@ -1,5 +1,6 @@
"""Tool Supporting Basic Editor Functions"""
from typing import cast, Protocol
from PySide6.QtCore import QTimer
from PySide6.QtWidgets import QTextEdit, QVBoxLayout
import trovedb.trove as tr
@ -16,7 +17,33 @@ class ToolBasicEditor(Tool):
self._text_edit = QTextEdit()
layout.addWidget(self._text_edit)
self._content_dirty = False
self._auto_save_timer = QTimer(self)
self._auto_save_timer.setSingleShot(True)
self._auto_save_timer.setInterval(2000) # 2 seconds after typing stops
self._auto_save_timer.timeout.connect(self._perform_auto_save)
self._text_edit.textChanged.connect(self._schedule_auto_save)
self.refresh()
def refresh(self):
self._text_edit.setPlainText(self.note.read_content().decode("utf-8"))
self._content_dirty = False
def _schedule_auto_save(self):
self._content_dirty = True
self._auto_save_timer.stop()
self._auto_save_timer.start()
def _perform_auto_save(self):
if self._content_dirty:
content = self._text_edit.toPlainText().encode("utf-8")
self.note.write_content(content)
self._content_dirty = False
def closeEvent(self, event):
if self._content_dirty:
self._perform_auto_save()
super().closeEvent(event)

View file

@ -1,14 +1,39 @@
from typing import Protocol, runtime_checkable, Optional, Dict, List, Self, NamedTuple, Iterable, TypedDict
from typing import Protocol, runtime_checkable, Optional, Dict, List, Self, NamedTuple, Iterable, TypedDict, Iterator
from uuid import UUID
from pathlib import PurePosixPath
import datetime as dt
import errno
type ObjectId = int | str | UUID
class TroveError(Exception):
"""Base class for all Trove errors."""
class ErrorWithErrno(TroveError):
"""Raised when an error occurs with an errno."""
def __init__(self, error: int, *args):
super().__init__(*args)
self.errno = error
class ErrorExists(ErrorWithErrno):
"""Raised when a note already exists."""
def __init__(self, *args):
super().__init__(errno.EEXIST, *args)
class ErrorNotFound(ErrorWithErrno):
"""Raised when a note is not found."""
def __init__(self, *args):
super().__init__(errno.ENOENT, *args)
class ErrorNotEmpty(ErrorWithErrno):
"""Raised when a directory is not empty."""
def __init__(self, *args):
super().__init__(errno.ENOTEMPTY, *args)
class BadNoteType(TypeError):
"""Raised when an invalid note type is encountered."""
@ -21,6 +46,12 @@ class NoteNotFound(KeyError):
class OpenArguments(TypedDict):
create: bool
class TreeEntry(NamedTuple):
name: str
object_id: ObjectId
DEFAULT_MIME = "application/octet-stream"
@runtime_checkable
class Note(Protocol):
"""
@ -64,10 +95,30 @@ class Note(Protocol):
"""Write the raw content of the note."""
...
def children(self) -> Iterator[TreeEntry]:
"""Get all children of this note."""
...
def child(self, name: str) -> 'Note':
"""Retrieve a child note by name."""
...
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> 'Note':
"""Create a new child note."""
...
def rm_child(self, name: str, recurse: bool):
"""Remove a child note."""
...
def has_children(self) -> bool:
"""Check if note has children."""
return next(self.children(), None) is not None
def new_child(note: Note, name: str, mime: str = DEFAULT_MIME, content: bytes | None = None, executable: bool = False, hidden: bool = False) -> Note:
return note.new_child(name=name, mime=mime, content=content, executable=executable, hidden=hidden)
class TreeEntry(NamedTuple):
name: str
object_id: ObjectId
@runtime_checkable
class Tree(Protocol):
@ -87,17 +138,10 @@ class Tree(Protocol):
"""Remove a directory from the tree."""
...
def child(self, name: str) -> Note:
"""Retrieve a child note by name."""
...
def entries(self) -> Iterable[TreeEntry]:
"""Return all entries in the directory"""
...
def list(self) -> dict[str, int]:
"""Return all entries as {name: object_id}."""
...
@runtime_checkable
class TreeNote(Note, Tree, Protocol):

View file

@ -5,9 +5,10 @@ Implements BlobNote, TreeNote, and Trove protocols defined in trove.py.
Depends on db.py (Sqlite3Trove) for storage.
"""
from typing import Optional
from typing import Optional, Iterator
from pathlib import Path
import datetime as dt
import uuid
from .db import Sqlite3Trove, NOTE_ROOT_ID
@ -20,9 +21,19 @@ class NoteImpl(Note):
"""Concrete note implementation."""
def __init__(self, parent: 'TroveImpl', object_id: ObjectId):
if not isinstance(object_id, uuid.UUID):
object_id = uuid.UUID(str(object_id))
assert isinstance(object_id, uuid.UUID)
self._parent = parent
self._db = parent.db
self._object_id = object_id
self._object_id: uuid.UUID = object_id
@staticmethod
def get_impl_id(note: Note) -> uuid.UUID:
if not isinstance(note.object_id, uuid.UUID):
raise TypeError("Note not compatible with NoteImpl")
return note.object_id
# Note protocol
@property
@ -36,7 +47,8 @@ class NoteImpl(Note):
@property
def mtime(self) -> dt.datetime:
"""Return modification time as UTC datetime."""
return self._db.get_mtime(self._object_id)
mtime = self._db.get_mtime(self._object_id)
return mtime if mtime is not None else dt.datetime.now(tz=dt.timezone.utc)
@property
def mime(self) -> str:
@ -57,6 +69,39 @@ class NoteImpl(Note):
def write_content(self, data: bytes) -> None:
self._db.write_content(self._object_id, data)
def children(self) -> Iterator[TreeEntry]:
"""Get all children of this note."""
for name, object_id in self._db.list_tree(self._object_id).items():
yield TreeEntry(name, object_id)
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note:
"""Create a new child note."""
content = content if content is not None else b""
object_id = self._db.write_blob(data=content, object_id=None, dtype=mime, executable=executable, hidden=hidden)
# TODO fix this
if mime == 'inode/directory':
return TreeNoteImpl(self._parent, object_id)
return NoteImpl(self._parent, object_id)
def child(self, name: str) -> Note:
"""Retrieve a child note by name."""
entries = self._db.list_tree(self._object_id)
if name not in entries:
raise tr.ErrorNotFound(name)
child_id = entries[name]
value = self._parent.get_raw_note(child_id)
if value is None:
raise tr.ErrorNotFound("dangling child link") # FIXME: better errors
return value
def rm_child(self, name: str, recurse: bool) -> None:
"""Remove a child note."""
note = self.child(name)
if note.has_children() and not recurse:
raise tr.ErrorNotEmpty(name)
self._db.unlink(self._object_id, name)
class TreeNoteImpl(NoteImpl, TreeNote):
"""Concrete TreeNote: a tree object backed by the tree_entries table."""
@ -64,7 +109,7 @@ class TreeNoteImpl(NoteImpl, TreeNote):
# Tree protocol
def link(self, name: str, note: Note) -> None:
"""Link name to an existing note."""
self._db.link(self._object_id, name, note.object_id)
self._db.link(self._object_id, name, NoteImpl.get_impl_id(note))
def unlink(self, name: str) -> None:
"""Remove an entry by name."""
@ -81,25 +126,13 @@ class TreeNoteImpl(NoteImpl, TreeNote):
"""Remove a directory from the tree."""
self.unlink(name)
def child(self, name: str) -> Note:
"""Retrieve a child note by name."""
entries = self._db.list_tree(self._object_id)
if name not in entries:
raise KeyError(f"Entry '{name}' not found")
child_id = entries[name]
value = self._parent.get_raw_note(child_id)
if value is None:
raise KeyError(f"Entry '{name}' has no value")
return value
def entries(self):
"""Return all entries as an iterable of TreeEntry."""
for name, object_id in self._db.list_tree(self._object_id).items():
yield TreeEntry(name, object_id)
def list(self) -> dict[str, ObjectId]:
"""Return all entries as {name: object_id}."""
return self._db.list_tree(self._object_id)
# ---------------------------------------------------------------------------
@ -137,6 +170,8 @@ class TroveImpl:
# Trove protocol
def get_raw_note(self, note_id: ObjectId) -> Note:
"""Return a BlobNote or TreeNote for the given id, or None if not found."""
if not isinstance(note_id, uuid.UUID):
note_id = uuid.UUID(str(note_id))
info = self._db.get_info(note_id)
if info is None:
raise NoteNotFound(note_id)