Compare commits

..

No commits in common. "41480a39c909ab26c3fb09d363d6aeaabb0d7b1e" and "01e9780bb896abab131375efd792d0d4a54d5f48" have entirely different histories.

9 changed files with 250 additions and 296 deletions

8
.gitignore vendored
View file

@ -7,14 +7,6 @@ __pycache__/
# C extensions
*.so
# Project specify temporary / test files
*.db
.trove/
ai
tmp/
ui/
# Distribution / packaging
.Python
build/

View file

@ -213,7 +213,6 @@ class Sqlite3Trove:
if object_id is None:
object_id = uuid.uuid4()
sid = _sql_id(object_id)
assert sid is not None
# Preserve created timestamp on update
row = self._con.execute(

View file

@ -2,132 +2,99 @@ import os
import sqlite3
import tempfile
import datetime as dt
from pathlib import Path, PurePosixPath
from typing import Optional, Dict, List, Self, Iterable, Iterator
from pathlib import Path
from typing import Optional, Dict, List, Self, Iterable
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound, ObjectId
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound
from . import fs_util as fsu
from . import trove as tr
class FSNote(Note):
def __init__(self, trove: 'FSTrove', path: Path):
def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None):
self._trove: FSTrove = trove
self._fs_path: Path = path.resolve()
if not self._fs_path.is_relative_to(trove.root):
raise ValueError("Path must be relative to the root directory")
self._object_id: str = path.relative_to(trove.root).as_posix()
self._fs_path: Path | None = path
self._inode: int | None = inode
if self._fs_path is not None:
inode = self._fs_path.stat().st_ino
if self._inode != inode and self._inode is not None:
raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}")
self._inode = inode
@property
def object_id(self) -> tr.ObjectId:
return self._object_id
@property
def fs_path(self) -> Path:
return self._fs_path
def object_id(self) -> int:
if self._inode is None:
raise ValueError("Note not yet saved to disk")
return self._inode
@property
def mtime(self):
"""Return modification time as datetime."""
stat = self._fs_path.stat()
stat = self._path.stat()
return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc)
@property
def readonly(self) -> bool:
"""Check if the note is readonly based on file permissions."""
return not os.access(self._fs_path, os.W_OK)
if self._inode is None:
return False
return not os.access(self._path, os.W_OK)
@property
def mime(self) -> str:
"""Return MIME type, defaulting to generic binary stream."""
if self._fs_path.is_dir():
return "inode/directory"
return "application/octet-stream"
@property
def _path(self) -> Path:
if self._fs_path is not None:
if self._fs_path.exists():
return self._fs_path
self._fs_path = None
if self._inode is None:
raise ValueError("Note not yet saved to disk")
self._fs_path = self._trove.get_path_by_inode(self._inode)
assert self._fs_path is not None
return self._fs_path
def get_raw_metadata(self, key: str) -> Optional[bytes]:
# TODO: FIXME
return None
return self._trove._get_metadata(self._inode, key)
def set_raw_metadata(self, key: str, value: bytes) -> None:
# TODO: FIXME
pass
self._trove._set_metadata(self._inode, key, value)
def read_content(self) -> bytes:
"""Read the raw content of the note."""
if self._fs_path.is_file():
return self._fs_path.read_bytes()
content_file = fsu.get_content_path(self._path)
if content_file.exists():
return content_file.read_bytes()
return b""
def write_content(self, data:bytes) -> None:
"""Write the raw content of the note."""
self._fs_path.write_bytes(data)
def _new_child_subdir(self, name: str, exist_ok: bool = True) -> Path:
ex_path = self._fs_path / name
try:
ex_path.mkdir(exist_ok=exist_ok)
return ex_path
except FileExistsError:
raise tr.ErrorExists(str(ex_path)) from None
except OSError as e:
raise tr.ErrorWithErrno(e.errno, str(e)) from None
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note:
"""Create a new child note."""
if content is None:
content = b""
if mime == 'inode/directory':
if content is not None:
raise NotImplementedError("FSNote does not support children")
return FSTreeNote(self._trove, self._new_child_subdir(name, False))
ex_path = self._fs_path / name
ex_path.write_bytes(content)
return FSNote(self._trove, ex_path)
def child(self, name: str) -> Note:
"""Retrieve a child not by name."""
target_path = self._fs_path / name
return self._trove.get_raw_note_by_path(target_path)
def rm_child(self, name: str, recurse: bool):
target_path = self._fs_path / name
if not target_path.exists():
raise tr.ErrorNotFound(name)
if target_path.is_dir():
if recurse:
raise NotImplementedError("Recursive deletion not supported")
else:
target_path.rmdir()
else:
target_path.unlink()
# TODO: remove meta directory!
def children(self) -> Iterator[TreeEntry]:
"""Get all children of this note."""
if not self._fs_path.is_dir():
return
for item in self._fs_path.iterdir():
if item.name == ".trove":
continue
yield TreeEntry(name=item.name, object_id=item.stat().st_ino)
content_file = fsu.get_content_path(self._path)
content_file.write_bytes(data)
class FSTreeNote(FSNote, TreeNote):
@property
def mime(self) -> str:
"""Return MIME type for directory/tree nodes."""
return "inode/directory"
def link(self, name: str, note: Note):
if not isinstance(note, FSNote):
raise BadNoteType("Only blob notes can be linked")
target_path = self._fs_path / name
target_path = self._path / name
if target_path.exists():
self.unlink(name)
note_path = note._fs_path
note_path = note._path
# If the note is in .working, move it to the new location.
if self._trove.working in note_path.parents:
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
else:
# If it's already linked somewhere, create a hard link if it's a file.
if note_path.is_file():
@ -141,8 +108,10 @@ class FSTreeNote(FSNote, TreeNote):
# We move it to the new location.
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
def unlink(self, name: str):
target_path = self._fs_path / name
target_path = self._path / name
if not target_path.exists():
return
if target_path.is_dir():
@ -151,20 +120,39 @@ class FSTreeNote(FSNote, TreeNote):
target_path.unlink()
def mkdir(self, name: str) -> 'FSTreeNote':
target_path = self._fs_path / name
target_path = self._path / name
target_path.mkdir(exist_ok=True)
return FSTreeNote(self._trove, path=target_path)
inode = target_path.stat().st_ino
self._trove._update_cache(inode, target_path)
return FSTreeNote(self._trove, inode=inode, path=target_path)
def entries(self) -> Iterable[TreeEntry]:
try:
for item in self._fs_path.iterdir():
for item in self._path.iterdir():
if item.name == ".trove":
continue
yield TreeEntry(name=item.name, object_id=str(item))
inode = item.stat().st_ino
self._trove._update_cache(inode, item)
yield TreeEntry(name=item.name, object_id=inode)
except OSError:
pass
def list(self) -> dict[str, int]:
res = {}
try:
for item in self._path.iterdir():
if item.name == ".trove":
continue
res[item.name] = item.stat().st_ino
self._trove._update_cache(res[item.name], item)
except OSError:
pass
return res
def child(self, name: str) -> Note:
"""Retrieve a child not by name."""
target_path = self._path / name
return self._trove.get_raw_note_by_path(target_path)
class FSTrove(Trove):
@ -173,9 +161,14 @@ class FSTrove(Trove):
self._root_inode = self.root.stat().st_ino
self.dot_trove = self.root / ".trove"
self.working = self.dot_trove / ".working"
self.dot_trove.mkdir(exist_ok=True)
self.working.mkdir(exist_ok=True)
db_path = self.dot_trove / "trovecache.db"
self.con = sqlite3.connect(str(db_path))
self._init_db()
@classmethod
def open(cls, path: str | Path, create: bool = False) -> 'FSTrove':
p = Path(path)
@ -185,30 +178,101 @@ class FSTrove(Trove):
p.mkdir(parents=True)
return cls(p)
def get_raw_note_by_path(self, path: Path) -> Note:
if not path.exists():
raise tr.ErrorNotFound(str(path))
if path.is_dir():
return FSTreeNote(self, path=path)
return FSNote(self, path=path)
def _init_db(self):
self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)")
self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))")
self.con.commit()
def get_raw_note(self, note_id: ObjectId) -> Note:
p = self.root / str(note_id)
if not p.exists():
def _update_cache(self, inode: int, path: Path):
try:
rel_path = path.relative_to(self.root)
path_str = str(rel_path)
if path_str == ".":
path_str = ""
except ValueError:
# Path not under root, maybe it's the root itself?
if path == self.root:
path_str = ""
else:
return # Not under root, don't cache
self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str))
self.con.commit()
def get_path_by_inode(self, inode: int) -> Optional[Path]:
if inode == self._root_inode:
return self.root
row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone()
if row:
p = self.root / row[0]
try:
if p.exists() and p.stat().st_ino == inode:
return p
except OSError:
pass
# search
for root_dir, dirs, files in os.walk(self.root):
# Skip .trove
if ".trove" in dirs:
dirs.remove(".trove")
for name in dirs + files:
p = Path(root_dir) / name
try:
st = p.stat()
if st.st_ino == inode:
self._update_cache(inode, p)
return p
except OSError:
continue
return None
def get_raw_note_by_path(self, target_path: Path) -> Note:
if not target_path.exists():
raise NoteNotFound(target_path.relative_to(self.root))
note_id = target_path.stat().st_ino
if target_path.is_dir():
return FSTreeNote(self, inode=note_id, path=target_path)
else:
return FSNote(self, inode=note_id, path=target_path)
def get_raw_note(self, note_id: int) -> Note:
p = self.get_path_by_inode(note_id)
if not p:
raise NoteNotFound(note_id)
return self.get_raw_note_by_path(p)
def create_blob(self, data: bytes | None = None) -> Note:
raise NotImplementedError("FSTrove does not support blobs")
fd, temp_path = tempfile.mkstemp(dir=self.working)
try:
if data:
os.write(fd, data)
finally:
os.close(fd)
p = Path(temp_path)
inode = p.stat().st_ino
self._update_cache(inode, p)
return FSNote(self, inode=inode, path=p)
def get_root(self) -> TreeNote:
return FSTreeNote(self, path=self.root)
return FSTreeNote(self, inode=self._root_inode, path=self.root)
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
raise NotImplementedError("FSTrove does not support metadata")
row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone()
return row[0] if row else None
def _set_metadata(self, inode: int, key: str, value: bytes):
raise NotImplementedError("FSTrove does not support metadata")
self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value))
self.con.commit()
def close(self):
pass
self.con.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()

View file

@ -90,8 +90,8 @@ def get_content_index_name_from_path(path: Path) -> str:
"""Return the name of the index file for a directory."""
# TODO: improve handling and mimetype logic
if not path.suffix:
return 'index.dat'
return f'index{path.suffix}'
return '_index.dat'
return f'_index{path.suffix}'
def get_content_path(path: str | Path) -> Path:
"""Return the path to the content file for a directory or file"""

View file

@ -20,9 +20,7 @@ import pyfuse3
import trio
from pyfuse3 import InodeT, FileHandleT
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, ObjectId, TreeExists
import trovedb.trove as tr
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, Blob as TroveBlob, ObjectId, TreeExists
logger = logging.getLogger(__name__)
@ -142,7 +140,7 @@ class TroveFuseOps(pyfuse3.Operations):
raise pyfuse3.FUSEError(errno.ENOTDIR)
try:
note = parent.child(name.decode())
except (KeyError, tr.ErrorNotFound):
except KeyError:
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
raise pyfuse3.FUSEError(errno.ENOENT) from None
ent = self._create_get_ent_from_note(note)
@ -186,8 +184,8 @@ class TroveFuseOps(pyfuse3.Operations):
# Determine basic information
is_tree = True
size = 0
if not hasattr(note, 'mkdir'):
size = len(note.read_content())
if isinstance(note, TroveBlob):
size = len(note.read())
is_tree = False
# Create and fill attr structure
@ -241,13 +239,13 @@ class TroveFuseOps(pyfuse3.Operations):
ent = self._get_ent_from_inode(inode)
note = self._get_ent_note(ent)
if fields.update_size:
if not hasattr(note, 'mkdir'):
current = note.read_content()
if isinstance(note, TroveBlob):
current = note.read()
new_size = attr.st_size
if new_size < len(current):
note.write_content(current[:new_size])
note.write(current[:new_size])
elif new_size > len(current):
note.write_content(current + b"\x00" * (new_size - len(current)))
note.write(current + b"\x00" * (new_size - len(current)))
else:
raise pyfuse3.FUSEError(errno.EINVAL)
return self._get_attr(ent, note)
@ -280,13 +278,13 @@ class TroveFuseOps(pyfuse3.Operations):
if not isinstance(note, TroveTree):
logger.debug("attempted readdir on %d not a tree", fh)
raise pyfuse3.FUSEError(errno.ENOTDIR)
entries = list(note.entries()) # [(name, object_id), ...]
entries = list(note.list().items()) # [(name, object_id), ...]
for idx, entry in enumerate(entries):
for idx, (name, child_id) in enumerate(entries):
if idx < start_id:
continue
child = self._trove.get_raw_note(entry.object_id)
child = self._trove.get_raw_note(child_id)
if child is None:
continue
@ -294,7 +292,7 @@ class TroveFuseOps(pyfuse3.Operations):
attr = self._get_attr(child_ent, child)
self._ref_entry(child_ent)
if not pyfuse3.readdir_reply(token, entry.name.encode(), attr, idx + 1):
if not pyfuse3.readdir_reply(token, name.encode(), attr, idx + 1):
break
async def releasedir(self, fh: FileHandleT) -> None:
@ -304,18 +302,19 @@ class TroveFuseOps(pyfuse3.Operations):
async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes:
logger.debug("mkdir inode:%d name:%s", parent_inode, name)
# Grab parent note, verify is tree
parent = self._get_inode_note(parent_inode)
# TODO: consider implications here, maybe look at ext on dir for mime?
if not isinstance(parent, TreeNote):
raise pyfuse3.FUSEError(errno.ENOTDIR)
# Create new directory in note
try:
note = tr.new_child(parent, name.decode(), mime='inode/directory')
except tr.ErrorWithErrno as e:
raise pyfuse3.FUSEError(e.errno) from None
new_tree: TreeNote = parent.mkdir(name.decode())
except TreeExists:
raise pyfuse3.FUSEError(errno.EEXIST) from None
# Grab entity for kernel
ent = self._create_get_ent_from_note(note)
ent = self._create_get_ent_from_note(new_tree)
self._ref_entry(ent)
return self._get_attr(ent, note)
return self._get_attr(ent, new_tree)
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
@ -342,35 +341,37 @@ class TroveFuseOps(pyfuse3.Operations):
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
logger.debug("create inode:%d name:%s", parent_inode, name)
parent = self._get_inode_note(parent_inode)
# TODO: handle mode
# TODO: handle flags
if not isinstance(parent, TroveTree):
raise pyfuse3.FUSEError(errno.ENOTDIR)
name_str = name.decode()
note = tr.new_child(parent, name_str)
ent = self._create_get_ent_from_note(note)
if name_str in parent.list():
raise pyfuse3.FUSEError(errno.EEXIST)
blob = self._trove.create_blob(b"")
parent.link(name_str, blob)
ent = self._create_get_ent_from_note(blob)
self._ref_entry(ent)
handle = self._open_handle(ent.sys_inode)
attr = self._get_attr(ent, note)
attr = self._get_attr(ent, blob)
return pyfuse3.FileInfo(fh=handle.handle_id), attr
async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes:
logger.debug("read fh:%d offset:%d length:%d", fh, offset, length)
handle = self._get_handle(fh)
note = handle.note
if not hasattr(note, 'mkdir'):
return note.read_content()[offset:offset + length]
if isinstance(note, TroveBlob):
return note.read()[offset:offset + length]
raise pyfuse3.FUSEError(errno.EBADF)
async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int:
handle = self._get_handle(fh)
note = handle.note
if not hasattr(note, 'mkdir'):
existing = note.read_content()
if isinstance(note, TroveBlob):
existing = note.read()
if offset > len(existing):
existing = existing + b"\x00" * (offset - len(existing))
note.write_content(existing[:offset] + data + existing[offset + len(data):])
note.write(existing[:offset] + data + existing[offset + len(data):])
return len(data)
async def release(self, fh: FileHandleT) -> None:
@ -379,8 +380,12 @@ class TroveFuseOps(pyfuse3.Operations):
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
parent_note = self._get_inode_note(parent_inode)
if not isinstance(parent_note, TroveTree):
raise pyfuse3.FUSEError(errno.ENOTDIR)
name_str = name.decode()
parent_note.rm_child(name_str, False)
if name_str not in parent_note.list():
raise pyfuse3.FUSEError(errno.ENOENT)
parent_note.unlink(name.decode())
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
# Decode / validate names

View file

@ -22,20 +22,20 @@ class TroveMainWindow(QMainWindow):
self.setWindowTitle("Trove")
# ── Toolbar ──
# toolbar = QToolBar("Main")
# toolbar.setObjectName("maintoolbar")
# toolbar.setMovable(False)
# self.addToolBar(toolbar)
#
# new_action = QAction("New", self)
# new_action.setShortcut(QKeySequence.StandardKey.New)
# toolbar.addAction(new_action)
#
# save_action = QAction("Save", self)
# save_action.setShortcut(QKeySequence.StandardKey.Save)
# toolbar.addAction(save_action)
#
# toolbar.addSeparator()
toolbar = QToolBar("Main")
toolbar.setObjectName("maintoolbar")
toolbar.setMovable(False)
self.addToolBar(toolbar)
new_action = QAction("New", self)
new_action.setShortcut(QKeySequence.StandardKey.New)
toolbar.addAction(new_action)
save_action = QAction("Save", self)
save_action.setShortcut(QKeySequence.StandardKey.Save)
toolbar.addAction(save_action)
toolbar.addSeparator()
# ── Central layout ──
central = QWidget()

View file

@ -1,6 +1,5 @@
"""Tool Supporting Basic Editor Functions"""
from typing import cast, Protocol
from PySide6.QtCore import QTimer
from PySide6.QtWidgets import QTextEdit, QVBoxLayout
import trovedb.trove as tr
@ -17,33 +16,7 @@ class ToolBasicEditor(Tool):
self._text_edit = QTextEdit()
layout.addWidget(self._text_edit)
self._content_dirty = False
self._auto_save_timer = QTimer(self)
self._auto_save_timer.setSingleShot(True)
self._auto_save_timer.setInterval(2000) # 2 seconds after typing stops
self._auto_save_timer.timeout.connect(self._perform_auto_save)
self._text_edit.textChanged.connect(self._schedule_auto_save)
self.refresh()
def refresh(self):
self._text_edit.setPlainText(self.note.read_content().decode("utf-8"))
self._content_dirty = False
def _schedule_auto_save(self):
self._content_dirty = True
self._auto_save_timer.stop()
self._auto_save_timer.start()
def _perform_auto_save(self):
if self._content_dirty:
content = self._text_edit.toPlainText().encode("utf-8")
self.note.write_content(content)
self._content_dirty = False
def closeEvent(self, event):
if self._content_dirty:
self._perform_auto_save()
super().closeEvent(event)

View file

@ -1,39 +1,14 @@
from typing import Protocol, runtime_checkable, Optional, Dict, List, Self, NamedTuple, Iterable, TypedDict, Iterator
from typing import Protocol, runtime_checkable, Optional, Dict, List, Self, NamedTuple, Iterable, TypedDict
from uuid import UUID
from pathlib import PurePosixPath
import datetime as dt
import errno
type ObjectId = int | str | UUID
class TroveError(Exception):
"""Base class for all Trove errors."""
class ErrorWithErrno(TroveError):
"""Raised when an error occurs with an errno."""
def __init__(self, error: int, *args):
super().__init__(*args)
self.errno = error
class ErrorExists(ErrorWithErrno):
"""Raised when a note already exists."""
def __init__(self, *args):
super().__init__(errno.EEXIST, *args)
class ErrorNotFound(ErrorWithErrno):
"""Raised when a note is not found."""
def __init__(self, *args):
super().__init__(errno.ENOENT, *args)
class ErrorNotEmpty(ErrorWithErrno):
"""Raised when a directory is not empty."""
def __init__(self, *args):
super().__init__(errno.ENOTEMPTY, *args)
class BadNoteType(TypeError):
"""Raised when an invalid note type is encountered."""
@ -46,12 +21,6 @@ class NoteNotFound(KeyError):
class OpenArguments(TypedDict):
create: bool
class TreeEntry(NamedTuple):
name: str
object_id: ObjectId
DEFAULT_MIME = "application/octet-stream"
@runtime_checkable
class Note(Protocol):
"""
@ -95,30 +64,10 @@ class Note(Protocol):
"""Write the raw content of the note."""
...
def children(self) -> Iterator[TreeEntry]:
"""Get all children of this note."""
...
def child(self, name: str) -> 'Note':
"""Retrieve a child note by name."""
...
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> 'Note':
"""Create a new child note."""
...
def rm_child(self, name: str, recurse: bool):
"""Remove a child note."""
...
def has_children(self) -> bool:
"""Check if note has children."""
return next(self.children(), None) is not None
def new_child(note: Note, name: str, mime: str = DEFAULT_MIME, content: bytes | None = None, executable: bool = False, hidden: bool = False) -> Note:
return note.new_child(name=name, mime=mime, content=content, executable=executable, hidden=hidden)
class TreeEntry(NamedTuple):
name: str
object_id: ObjectId
@runtime_checkable
class Tree(Protocol):
@ -138,10 +87,17 @@ class Tree(Protocol):
"""Remove a directory from the tree."""
...
def child(self, name: str) -> Note:
"""Retrieve a child note by name."""
...
def entries(self) -> Iterable[TreeEntry]:
"""Return all entries in the directory"""
...
def list(self) -> dict[str, int]:
"""Return all entries as {name: object_id}."""
...
@runtime_checkable
class TreeNote(Note, Tree, Protocol):

View file

@ -5,10 +5,9 @@ Implements BlobNote, TreeNote, and Trove protocols defined in trove.py.
Depends on db.py (Sqlite3Trove) for storage.
"""
from typing import Optional, Iterator
from typing import Optional
from pathlib import Path
import datetime as dt
import uuid
from .db import Sqlite3Trove, NOTE_ROOT_ID
@ -21,19 +20,9 @@ class NoteImpl(Note):
"""Concrete note implementation."""
def __init__(self, parent: 'TroveImpl', object_id: ObjectId):
if not isinstance(object_id, uuid.UUID):
object_id = uuid.UUID(str(object_id))
assert isinstance(object_id, uuid.UUID)
self._parent = parent
self._db = parent.db
self._object_id: uuid.UUID = object_id
@staticmethod
def get_impl_id(note: Note) -> uuid.UUID:
if not isinstance(note.object_id, uuid.UUID):
raise TypeError("Note not compatible with NoteImpl")
return note.object_id
self._object_id = object_id
# Note protocol
@property
@ -47,8 +36,7 @@ class NoteImpl(Note):
@property
def mtime(self) -> dt.datetime:
"""Return modification time as UTC datetime."""
mtime = self._db.get_mtime(self._object_id)
return mtime if mtime is not None else dt.datetime.now(tz=dt.timezone.utc)
return self._db.get_mtime(self._object_id)
@property
def mime(self) -> str:
@ -69,39 +57,6 @@ class NoteImpl(Note):
def write_content(self, data: bytes) -> None:
self._db.write_content(self._object_id, data)
def children(self) -> Iterator[TreeEntry]:
"""Get all children of this note."""
for name, object_id in self._db.list_tree(self._object_id).items():
yield TreeEntry(name, object_id)
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note:
"""Create a new child note."""
content = content if content is not None else b""
object_id = self._db.write_blob(data=content, object_id=None, dtype=mime, executable=executable, hidden=hidden)
# TODO fix this
if mime == 'inode/directory':
return TreeNoteImpl(self._parent, object_id)
return NoteImpl(self._parent, object_id)
def child(self, name: str) -> Note:
"""Retrieve a child note by name."""
entries = self._db.list_tree(self._object_id)
if name not in entries:
raise tr.ErrorNotFound(name)
child_id = entries[name]
value = self._parent.get_raw_note(child_id)
if value is None:
raise tr.ErrorNotFound("dangling child link") # FIXME: better errors
return value
def rm_child(self, name: str, recurse: bool) -> None:
"""Remove a child note."""
note = self.child(name)
if note.has_children() and not recurse:
raise tr.ErrorNotEmpty(name)
self._db.unlink(self._object_id, name)
class TreeNoteImpl(NoteImpl, TreeNote):
"""Concrete TreeNote: a tree object backed by the tree_entries table."""
@ -109,7 +64,7 @@ class TreeNoteImpl(NoteImpl, TreeNote):
# Tree protocol
def link(self, name: str, note: Note) -> None:
"""Link name to an existing note."""
self._db.link(self._object_id, name, NoteImpl.get_impl_id(note))
self._db.link(self._object_id, name, note.object_id)
def unlink(self, name: str) -> None:
"""Remove an entry by name."""
@ -126,13 +81,25 @@ class TreeNoteImpl(NoteImpl, TreeNote):
"""Remove a directory from the tree."""
self.unlink(name)
def child(self, name: str) -> Note:
"""Retrieve a child note by name."""
entries = self._db.list_tree(self._object_id)
if name not in entries:
raise KeyError(f"Entry '{name}' not found")
child_id = entries[name]
value = self._parent.get_raw_note(child_id)
if value is None:
raise KeyError(f"Entry '{name}' has no value")
return value
def entries(self):
"""Return all entries as an iterable of TreeEntry."""
for name, object_id in self._db.list_tree(self._object_id).items():
yield TreeEntry(name, object_id)
def list(self) -> dict[str, ObjectId]:
"""Return all entries as {name: object_id}."""
return self._db.list_tree(self._object_id)
# ---------------------------------------------------------------------------
@ -170,8 +137,6 @@ class TroveImpl:
# Trove protocol
def get_raw_note(self, note_id: ObjectId) -> Note:
"""Return a BlobNote or TreeNote for the given id, or None if not found."""
if not isinstance(note_id, uuid.UUID):
note_id = uuid.UUID(str(note_id))
info = self._db.get_info(note_id)
if info is None:
raise NoteNotFound(note_id)