Compare commits
No commits in common. "41480a39c909ab26c3fb09d363d6aeaabb0d7b1e" and "01e9780bb896abab131375efd792d0d4a54d5f48" have entirely different histories.
41480a39c9
...
01e9780bb8
9 changed files with 250 additions and 296 deletions
8
.gitignore
vendored
8
.gitignore
vendored
|
|
@ -7,14 +7,6 @@ __pycache__/
|
||||||
# C extensions
|
# C extensions
|
||||||
*.so
|
*.so
|
||||||
|
|
||||||
# Project specify temporary / test files
|
|
||||||
*.db
|
|
||||||
.trove/
|
|
||||||
ai
|
|
||||||
tmp/
|
|
||||||
ui/
|
|
||||||
|
|
||||||
|
|
||||||
# Distribution / packaging
|
# Distribution / packaging
|
||||||
.Python
|
.Python
|
||||||
build/
|
build/
|
||||||
|
|
|
||||||
|
|
@ -213,7 +213,6 @@ class Sqlite3Trove:
|
||||||
if object_id is None:
|
if object_id is None:
|
||||||
object_id = uuid.uuid4()
|
object_id = uuid.uuid4()
|
||||||
sid = _sql_id(object_id)
|
sid = _sql_id(object_id)
|
||||||
assert sid is not None
|
|
||||||
|
|
||||||
# Preserve created timestamp on update
|
# Preserve created timestamp on update
|
||||||
row = self._con.execute(
|
row = self._con.execute(
|
||||||
|
|
|
||||||
264
trovedb/fs.py
264
trovedb/fs.py
|
|
@ -2,132 +2,99 @@ import os
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import tempfile
|
import tempfile
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
from pathlib import Path, PurePosixPath
|
from pathlib import Path
|
||||||
from typing import Optional, Dict, List, Self, Iterable, Iterator
|
from typing import Optional, Dict, List, Self, Iterable
|
||||||
|
|
||||||
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound, ObjectId
|
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound
|
||||||
from . import fs_util as fsu
|
from . import fs_util as fsu
|
||||||
from . import trove as tr
|
|
||||||
|
|
||||||
class FSNote(Note):
|
class FSNote(Note):
|
||||||
def __init__(self, trove: 'FSTrove', path: Path):
|
def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None):
|
||||||
self._trove: FSTrove = trove
|
self._trove: FSTrove = trove
|
||||||
self._fs_path: Path = path.resolve()
|
self._fs_path: Path | None = path
|
||||||
if not self._fs_path.is_relative_to(trove.root):
|
self._inode: int | None = inode
|
||||||
raise ValueError("Path must be relative to the root directory")
|
|
||||||
self._object_id: str = path.relative_to(trove.root).as_posix()
|
if self._fs_path is not None:
|
||||||
|
inode = self._fs_path.stat().st_ino
|
||||||
|
if self._inode != inode and self._inode is not None:
|
||||||
|
raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}")
|
||||||
|
self._inode = inode
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def object_id(self) -> tr.ObjectId:
|
def object_id(self) -> int:
|
||||||
return self._object_id
|
if self._inode is None:
|
||||||
|
raise ValueError("Note not yet saved to disk")
|
||||||
@property
|
return self._inode
|
||||||
def fs_path(self) -> Path:
|
|
||||||
return self._fs_path
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def mtime(self):
|
def mtime(self):
|
||||||
"""Return modification time as datetime."""
|
"""Return modification time as datetime."""
|
||||||
stat = self._fs_path.stat()
|
stat = self._path.stat()
|
||||||
return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc)
|
return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def readonly(self) -> bool:
|
def readonly(self) -> bool:
|
||||||
"""Check if the note is readonly based on file permissions."""
|
"""Check if the note is readonly based on file permissions."""
|
||||||
return not os.access(self._fs_path, os.W_OK)
|
if self._inode is None:
|
||||||
|
return False
|
||||||
|
return not os.access(self._path, os.W_OK)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def mime(self) -> str:
|
def mime(self) -> str:
|
||||||
"""Return MIME type, defaulting to generic binary stream."""
|
"""Return MIME type, defaulting to generic binary stream."""
|
||||||
if self._fs_path.is_dir():
|
|
||||||
return "inode/directory"
|
|
||||||
return "application/octet-stream"
|
return "application/octet-stream"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _path(self) -> Path:
|
||||||
|
if self._fs_path is not None:
|
||||||
|
if self._fs_path.exists():
|
||||||
|
return self._fs_path
|
||||||
|
self._fs_path = None
|
||||||
|
if self._inode is None:
|
||||||
|
raise ValueError("Note not yet saved to disk")
|
||||||
|
self._fs_path = self._trove.get_path_by_inode(self._inode)
|
||||||
|
assert self._fs_path is not None
|
||||||
|
return self._fs_path
|
||||||
|
|
||||||
def get_raw_metadata(self, key: str) -> Optional[bytes]:
|
def get_raw_metadata(self, key: str) -> Optional[bytes]:
|
||||||
# TODO: FIXME
|
return self._trove._get_metadata(self._inode, key)
|
||||||
return None
|
|
||||||
|
|
||||||
def set_raw_metadata(self, key: str, value: bytes) -> None:
|
def set_raw_metadata(self, key: str, value: bytes) -> None:
|
||||||
# TODO: FIXME
|
self._trove._set_metadata(self._inode, key, value)
|
||||||
pass
|
|
||||||
|
|
||||||
def read_content(self) -> bytes:
|
def read_content(self) -> bytes:
|
||||||
"""Read the raw content of the note."""
|
"""Read the raw content of the note."""
|
||||||
if self._fs_path.is_file():
|
content_file = fsu.get_content_path(self._path)
|
||||||
return self._fs_path.read_bytes()
|
if content_file.exists():
|
||||||
|
return content_file.read_bytes()
|
||||||
return b""
|
return b""
|
||||||
|
|
||||||
def write_content(self, data:bytes) -> None:
|
def write_content(self, data:bytes) -> None:
|
||||||
"""Write the raw content of the note."""
|
"""Write the raw content of the note."""
|
||||||
self._fs_path.write_bytes(data)
|
content_file = fsu.get_content_path(self._path)
|
||||||
|
content_file.write_bytes(data)
|
||||||
def _new_child_subdir(self, name: str, exist_ok: bool = True) -> Path:
|
|
||||||
ex_path = self._fs_path / name
|
|
||||||
try:
|
|
||||||
ex_path.mkdir(exist_ok=exist_ok)
|
|
||||||
return ex_path
|
|
||||||
except FileExistsError:
|
|
||||||
raise tr.ErrorExists(str(ex_path)) from None
|
|
||||||
except OSError as e:
|
|
||||||
raise tr.ErrorWithErrno(e.errno, str(e)) from None
|
|
||||||
|
|
||||||
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note:
|
|
||||||
"""Create a new child note."""
|
|
||||||
|
|
||||||
if content is None:
|
|
||||||
content = b""
|
|
||||||
|
|
||||||
if mime == 'inode/directory':
|
|
||||||
if content is not None:
|
|
||||||
raise NotImplementedError("FSNote does not support children")
|
|
||||||
return FSTreeNote(self._trove, self._new_child_subdir(name, False))
|
|
||||||
|
|
||||||
ex_path = self._fs_path / name
|
|
||||||
ex_path.write_bytes(content)
|
|
||||||
return FSNote(self._trove, ex_path)
|
|
||||||
|
|
||||||
def child(self, name: str) -> Note:
|
|
||||||
"""Retrieve a child not by name."""
|
|
||||||
target_path = self._fs_path / name
|
|
||||||
return self._trove.get_raw_note_by_path(target_path)
|
|
||||||
|
|
||||||
def rm_child(self, name: str, recurse: bool):
|
|
||||||
target_path = self._fs_path / name
|
|
||||||
if not target_path.exists():
|
|
||||||
raise tr.ErrorNotFound(name)
|
|
||||||
if target_path.is_dir():
|
|
||||||
if recurse:
|
|
||||||
raise NotImplementedError("Recursive deletion not supported")
|
|
||||||
else:
|
|
||||||
target_path.rmdir()
|
|
||||||
else:
|
|
||||||
target_path.unlink()
|
|
||||||
# TODO: remove meta directory!
|
|
||||||
|
|
||||||
|
|
||||||
def children(self) -> Iterator[TreeEntry]:
|
|
||||||
"""Get all children of this note."""
|
|
||||||
if not self._fs_path.is_dir():
|
|
||||||
return
|
|
||||||
for item in self._fs_path.iterdir():
|
|
||||||
if item.name == ".trove":
|
|
||||||
continue
|
|
||||||
yield TreeEntry(name=item.name, object_id=item.stat().st_ino)
|
|
||||||
|
|
||||||
class FSTreeNote(FSNote, TreeNote):
|
class FSTreeNote(FSNote, TreeNote):
|
||||||
|
@property
|
||||||
|
def mime(self) -> str:
|
||||||
|
"""Return MIME type for directory/tree nodes."""
|
||||||
|
return "inode/directory"
|
||||||
|
|
||||||
def link(self, name: str, note: Note):
|
def link(self, name: str, note: Note):
|
||||||
if not isinstance(note, FSNote):
|
if not isinstance(note, FSNote):
|
||||||
raise BadNoteType("Only blob notes can be linked")
|
raise BadNoteType("Only blob notes can be linked")
|
||||||
|
|
||||||
target_path = self._fs_path / name
|
target_path = self._path / name
|
||||||
if target_path.exists():
|
if target_path.exists():
|
||||||
self.unlink(name)
|
self.unlink(name)
|
||||||
|
|
||||||
note_path = note._fs_path
|
note_path = note._path
|
||||||
|
|
||||||
# If the note is in .working, move it to the new location.
|
# If the note is in .working, move it to the new location.
|
||||||
if self._trove.working in note_path.parents:
|
if self._trove.working in note_path.parents:
|
||||||
os.rename(note_path, target_path)
|
os.rename(note_path, target_path)
|
||||||
|
self._trove._update_cache(note.object_id, target_path)
|
||||||
else:
|
else:
|
||||||
# If it's already linked somewhere, create a hard link if it's a file.
|
# If it's already linked somewhere, create a hard link if it's a file.
|
||||||
if note_path.is_file():
|
if note_path.is_file():
|
||||||
|
|
@ -141,8 +108,10 @@ class FSTreeNote(FSNote, TreeNote):
|
||||||
# We move it to the new location.
|
# We move it to the new location.
|
||||||
os.rename(note_path, target_path)
|
os.rename(note_path, target_path)
|
||||||
|
|
||||||
|
self._trove._update_cache(note.object_id, target_path)
|
||||||
|
|
||||||
def unlink(self, name: str):
|
def unlink(self, name: str):
|
||||||
target_path = self._fs_path / name
|
target_path = self._path / name
|
||||||
if not target_path.exists():
|
if not target_path.exists():
|
||||||
return
|
return
|
||||||
if target_path.is_dir():
|
if target_path.is_dir():
|
||||||
|
|
@ -151,20 +120,39 @@ class FSTreeNote(FSNote, TreeNote):
|
||||||
target_path.unlink()
|
target_path.unlink()
|
||||||
|
|
||||||
def mkdir(self, name: str) -> 'FSTreeNote':
|
def mkdir(self, name: str) -> 'FSTreeNote':
|
||||||
target_path = self._fs_path / name
|
target_path = self._path / name
|
||||||
target_path.mkdir(exist_ok=True)
|
target_path.mkdir(exist_ok=True)
|
||||||
return FSTreeNote(self._trove, path=target_path)
|
inode = target_path.stat().st_ino
|
||||||
|
self._trove._update_cache(inode, target_path)
|
||||||
|
return FSTreeNote(self._trove, inode=inode, path=target_path)
|
||||||
|
|
||||||
def entries(self) -> Iterable[TreeEntry]:
|
def entries(self) -> Iterable[TreeEntry]:
|
||||||
try:
|
try:
|
||||||
for item in self._fs_path.iterdir():
|
for item in self._path.iterdir():
|
||||||
if item.name == ".trove":
|
if item.name == ".trove":
|
||||||
continue
|
continue
|
||||||
yield TreeEntry(name=item.name, object_id=str(item))
|
inode = item.stat().st_ino
|
||||||
|
self._trove._update_cache(inode, item)
|
||||||
|
yield TreeEntry(name=item.name, object_id=inode)
|
||||||
except OSError:
|
except OSError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def list(self) -> dict[str, int]:
|
||||||
|
res = {}
|
||||||
|
try:
|
||||||
|
for item in self._path.iterdir():
|
||||||
|
if item.name == ".trove":
|
||||||
|
continue
|
||||||
|
res[item.name] = item.stat().st_ino
|
||||||
|
self._trove._update_cache(res[item.name], item)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return res
|
||||||
|
|
||||||
|
def child(self, name: str) -> Note:
|
||||||
|
"""Retrieve a child not by name."""
|
||||||
|
target_path = self._path / name
|
||||||
|
return self._trove.get_raw_note_by_path(target_path)
|
||||||
|
|
||||||
|
|
||||||
class FSTrove(Trove):
|
class FSTrove(Trove):
|
||||||
|
|
@ -173,9 +161,14 @@ class FSTrove(Trove):
|
||||||
self._root_inode = self.root.stat().st_ino
|
self._root_inode = self.root.stat().st_ino
|
||||||
self.dot_trove = self.root / ".trove"
|
self.dot_trove = self.root / ".trove"
|
||||||
self.working = self.dot_trove / ".working"
|
self.working = self.dot_trove / ".working"
|
||||||
|
|
||||||
self.dot_trove.mkdir(exist_ok=True)
|
self.dot_trove.mkdir(exist_ok=True)
|
||||||
self.working.mkdir(exist_ok=True)
|
self.working.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
db_path = self.dot_trove / "trovecache.db"
|
||||||
|
self.con = sqlite3.connect(str(db_path))
|
||||||
|
self._init_db()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def open(cls, path: str | Path, create: bool = False) -> 'FSTrove':
|
def open(cls, path: str | Path, create: bool = False) -> 'FSTrove':
|
||||||
p = Path(path)
|
p = Path(path)
|
||||||
|
|
@ -185,30 +178,101 @@ class FSTrove(Trove):
|
||||||
p.mkdir(parents=True)
|
p.mkdir(parents=True)
|
||||||
return cls(p)
|
return cls(p)
|
||||||
|
|
||||||
def get_raw_note_by_path(self, path: Path) -> Note:
|
def _init_db(self):
|
||||||
if not path.exists():
|
self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)")
|
||||||
raise tr.ErrorNotFound(str(path))
|
self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))")
|
||||||
if path.is_dir():
|
self.con.commit()
|
||||||
return FSTreeNote(self, path=path)
|
|
||||||
return FSNote(self, path=path)
|
|
||||||
|
|
||||||
def get_raw_note(self, note_id: ObjectId) -> Note:
|
def _update_cache(self, inode: int, path: Path):
|
||||||
p = self.root / str(note_id)
|
try:
|
||||||
if not p.exists():
|
rel_path = path.relative_to(self.root)
|
||||||
|
path_str = str(rel_path)
|
||||||
|
if path_str == ".":
|
||||||
|
path_str = ""
|
||||||
|
except ValueError:
|
||||||
|
# Path not under root, maybe it's the root itself?
|
||||||
|
if path == self.root:
|
||||||
|
path_str = ""
|
||||||
|
else:
|
||||||
|
return # Not under root, don't cache
|
||||||
|
|
||||||
|
self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str))
|
||||||
|
self.con.commit()
|
||||||
|
|
||||||
|
def get_path_by_inode(self, inode: int) -> Optional[Path]:
|
||||||
|
if inode == self._root_inode:
|
||||||
|
return self.root
|
||||||
|
|
||||||
|
row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone()
|
||||||
|
if row:
|
||||||
|
p = self.root / row[0]
|
||||||
|
try:
|
||||||
|
if p.exists() and p.stat().st_ino == inode:
|
||||||
|
return p
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# search
|
||||||
|
for root_dir, dirs, files in os.walk(self.root):
|
||||||
|
# Skip .trove
|
||||||
|
if ".trove" in dirs:
|
||||||
|
dirs.remove(".trove")
|
||||||
|
|
||||||
|
for name in dirs + files:
|
||||||
|
p = Path(root_dir) / name
|
||||||
|
try:
|
||||||
|
st = p.stat()
|
||||||
|
if st.st_ino == inode:
|
||||||
|
self._update_cache(inode, p)
|
||||||
|
return p
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_raw_note_by_path(self, target_path: Path) -> Note:
|
||||||
|
if not target_path.exists():
|
||||||
|
raise NoteNotFound(target_path.relative_to(self.root))
|
||||||
|
note_id = target_path.stat().st_ino
|
||||||
|
if target_path.is_dir():
|
||||||
|
return FSTreeNote(self, inode=note_id, path=target_path)
|
||||||
|
else:
|
||||||
|
return FSNote(self, inode=note_id, path=target_path)
|
||||||
|
|
||||||
|
def get_raw_note(self, note_id: int) -> Note:
|
||||||
|
p = self.get_path_by_inode(note_id)
|
||||||
|
if not p:
|
||||||
raise NoteNotFound(note_id)
|
raise NoteNotFound(note_id)
|
||||||
return self.get_raw_note_by_path(p)
|
return self.get_raw_note_by_path(p)
|
||||||
|
|
||||||
|
|
||||||
def create_blob(self, data: bytes | None = None) -> Note:
|
def create_blob(self, data: bytes | None = None) -> Note:
|
||||||
raise NotImplementedError("FSTrove does not support blobs")
|
fd, temp_path = tempfile.mkstemp(dir=self.working)
|
||||||
|
try:
|
||||||
|
if data:
|
||||||
|
os.write(fd, data)
|
||||||
|
finally:
|
||||||
|
os.close(fd)
|
||||||
|
p = Path(temp_path)
|
||||||
|
inode = p.stat().st_ino
|
||||||
|
self._update_cache(inode, p)
|
||||||
|
return FSNote(self, inode=inode, path=p)
|
||||||
|
|
||||||
def get_root(self) -> TreeNote:
|
def get_root(self) -> TreeNote:
|
||||||
return FSTreeNote(self, path=self.root)
|
return FSTreeNote(self, inode=self._root_inode, path=self.root)
|
||||||
|
|
||||||
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
|
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
|
||||||
raise NotImplementedError("FSTrove does not support metadata")
|
row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone()
|
||||||
|
return row[0] if row else None
|
||||||
|
|
||||||
def _set_metadata(self, inode: int, key: str, value: bytes):
|
def _set_metadata(self, inode: int, key: str, value: bytes):
|
||||||
raise NotImplementedError("FSTrove does not support metadata")
|
self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value))
|
||||||
|
self.con.commit()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
pass
|
self.con.close()
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *args):
|
||||||
|
self.close()
|
||||||
|
|
|
||||||
|
|
@ -90,8 +90,8 @@ def get_content_index_name_from_path(path: Path) -> str:
|
||||||
"""Return the name of the index file for a directory."""
|
"""Return the name of the index file for a directory."""
|
||||||
# TODO: improve handling and mimetype logic
|
# TODO: improve handling and mimetype logic
|
||||||
if not path.suffix:
|
if not path.suffix:
|
||||||
return 'index.dat'
|
return '_index.dat'
|
||||||
return f'index{path.suffix}'
|
return f'_index{path.suffix}'
|
||||||
|
|
||||||
def get_content_path(path: str | Path) -> Path:
|
def get_content_path(path: str | Path) -> Path:
|
||||||
"""Return the path to the content file for a directory or file"""
|
"""Return the path to the content file for a directory or file"""
|
||||||
|
|
|
||||||
|
|
@ -20,9 +20,7 @@ import pyfuse3
|
||||||
import trio
|
import trio
|
||||||
from pyfuse3 import InodeT, FileHandleT
|
from pyfuse3 import InodeT, FileHandleT
|
||||||
|
|
||||||
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, ObjectId, TreeExists
|
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, Blob as TroveBlob, ObjectId, TreeExists
|
||||||
|
|
||||||
import trovedb.trove as tr
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -142,7 +140,7 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
try:
|
try:
|
||||||
note = parent.child(name.decode())
|
note = parent.child(name.decode())
|
||||||
except (KeyError, tr.ErrorNotFound):
|
except KeyError:
|
||||||
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
|
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
|
||||||
raise pyfuse3.FUSEError(errno.ENOENT) from None
|
raise pyfuse3.FUSEError(errno.ENOENT) from None
|
||||||
ent = self._create_get_ent_from_note(note)
|
ent = self._create_get_ent_from_note(note)
|
||||||
|
|
@ -186,8 +184,8 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
# Determine basic information
|
# Determine basic information
|
||||||
is_tree = True
|
is_tree = True
|
||||||
size = 0
|
size = 0
|
||||||
if not hasattr(note, 'mkdir'):
|
if isinstance(note, TroveBlob):
|
||||||
size = len(note.read_content())
|
size = len(note.read())
|
||||||
is_tree = False
|
is_tree = False
|
||||||
|
|
||||||
# Create and fill attr structure
|
# Create and fill attr structure
|
||||||
|
|
@ -241,13 +239,13 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
ent = self._get_ent_from_inode(inode)
|
ent = self._get_ent_from_inode(inode)
|
||||||
note = self._get_ent_note(ent)
|
note = self._get_ent_note(ent)
|
||||||
if fields.update_size:
|
if fields.update_size:
|
||||||
if not hasattr(note, 'mkdir'):
|
if isinstance(note, TroveBlob):
|
||||||
current = note.read_content()
|
current = note.read()
|
||||||
new_size = attr.st_size
|
new_size = attr.st_size
|
||||||
if new_size < len(current):
|
if new_size < len(current):
|
||||||
note.write_content(current[:new_size])
|
note.write(current[:new_size])
|
||||||
elif new_size > len(current):
|
elif new_size > len(current):
|
||||||
note.write_content(current + b"\x00" * (new_size - len(current)))
|
note.write(current + b"\x00" * (new_size - len(current)))
|
||||||
else:
|
else:
|
||||||
raise pyfuse3.FUSEError(errno.EINVAL)
|
raise pyfuse3.FUSEError(errno.EINVAL)
|
||||||
return self._get_attr(ent, note)
|
return self._get_attr(ent, note)
|
||||||
|
|
@ -280,13 +278,13 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
if not isinstance(note, TroveTree):
|
if not isinstance(note, TroveTree):
|
||||||
logger.debug("attempted readdir on %d not a tree", fh)
|
logger.debug("attempted readdir on %d not a tree", fh)
|
||||||
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
entries = list(note.entries()) # [(name, object_id), ...]
|
entries = list(note.list().items()) # [(name, object_id), ...]
|
||||||
|
|
||||||
for idx, entry in enumerate(entries):
|
for idx, (name, child_id) in enumerate(entries):
|
||||||
if idx < start_id:
|
if idx < start_id:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
child = self._trove.get_raw_note(entry.object_id)
|
child = self._trove.get_raw_note(child_id)
|
||||||
if child is None:
|
if child is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
@ -294,7 +292,7 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
attr = self._get_attr(child_ent, child)
|
attr = self._get_attr(child_ent, child)
|
||||||
self._ref_entry(child_ent)
|
self._ref_entry(child_ent)
|
||||||
|
|
||||||
if not pyfuse3.readdir_reply(token, entry.name.encode(), attr, idx + 1):
|
if not pyfuse3.readdir_reply(token, name.encode(), attr, idx + 1):
|
||||||
break
|
break
|
||||||
|
|
||||||
async def releasedir(self, fh: FileHandleT) -> None:
|
async def releasedir(self, fh: FileHandleT) -> None:
|
||||||
|
|
@ -304,18 +302,19 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
|
|
||||||
async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes:
|
async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes:
|
||||||
logger.debug("mkdir inode:%d name:%s", parent_inode, name)
|
logger.debug("mkdir inode:%d name:%s", parent_inode, name)
|
||||||
|
# Grab parent note, verify is tree
|
||||||
parent = self._get_inode_note(parent_inode)
|
parent = self._get_inode_note(parent_inode)
|
||||||
# TODO: consider implications here, maybe look at ext on dir for mime?
|
if not isinstance(parent, TreeNote):
|
||||||
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
|
# Create new directory in note
|
||||||
try:
|
try:
|
||||||
note = tr.new_child(parent, name.decode(), mime='inode/directory')
|
new_tree: TreeNote = parent.mkdir(name.decode())
|
||||||
except tr.ErrorWithErrno as e:
|
except TreeExists:
|
||||||
raise pyfuse3.FUSEError(e.errno) from None
|
raise pyfuse3.FUSEError(errno.EEXIST) from None
|
||||||
|
|
||||||
# Grab entity for kernel
|
# Grab entity for kernel
|
||||||
ent = self._create_get_ent_from_note(note)
|
ent = self._create_get_ent_from_note(new_tree)
|
||||||
self._ref_entry(ent)
|
self._ref_entry(ent)
|
||||||
return self._get_attr(ent, note)
|
return self._get_attr(ent, new_tree)
|
||||||
|
|
||||||
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
||||||
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
|
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
|
||||||
|
|
@ -342,35 +341,37 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
|
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
|
||||||
logger.debug("create inode:%d name:%s", parent_inode, name)
|
logger.debug("create inode:%d name:%s", parent_inode, name)
|
||||||
parent = self._get_inode_note(parent_inode)
|
parent = self._get_inode_note(parent_inode)
|
||||||
|
if not isinstance(parent, TroveTree):
|
||||||
# TODO: handle mode
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
# TODO: handle flags
|
|
||||||
|
|
||||||
name_str = name.decode()
|
name_str = name.decode()
|
||||||
note = tr.new_child(parent, name_str)
|
if name_str in parent.list():
|
||||||
ent = self._create_get_ent_from_note(note)
|
raise pyfuse3.FUSEError(errno.EEXIST)
|
||||||
|
blob = self._trove.create_blob(b"")
|
||||||
|
parent.link(name_str, blob)
|
||||||
|
|
||||||
|
ent = self._create_get_ent_from_note(blob)
|
||||||
self._ref_entry(ent)
|
self._ref_entry(ent)
|
||||||
|
|
||||||
handle = self._open_handle(ent.sys_inode)
|
handle = self._open_handle(ent.sys_inode)
|
||||||
attr = self._get_attr(ent, note)
|
attr = self._get_attr(ent, blob)
|
||||||
return pyfuse3.FileInfo(fh=handle.handle_id), attr
|
return pyfuse3.FileInfo(fh=handle.handle_id), attr
|
||||||
|
|
||||||
async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes:
|
async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes:
|
||||||
logger.debug("read fh:%d offset:%d length:%d", fh, offset, length)
|
logger.debug("read fh:%d offset:%d length:%d", fh, offset, length)
|
||||||
handle = self._get_handle(fh)
|
handle = self._get_handle(fh)
|
||||||
note = handle.note
|
note = handle.note
|
||||||
if not hasattr(note, 'mkdir'):
|
if isinstance(note, TroveBlob):
|
||||||
return note.read_content()[offset:offset + length]
|
return note.read()[offset:offset + length]
|
||||||
raise pyfuse3.FUSEError(errno.EBADF)
|
raise pyfuse3.FUSEError(errno.EBADF)
|
||||||
|
|
||||||
async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int:
|
async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int:
|
||||||
handle = self._get_handle(fh)
|
handle = self._get_handle(fh)
|
||||||
note = handle.note
|
note = handle.note
|
||||||
if not hasattr(note, 'mkdir'):
|
if isinstance(note, TroveBlob):
|
||||||
existing = note.read_content()
|
existing = note.read()
|
||||||
if offset > len(existing):
|
if offset > len(existing):
|
||||||
existing = existing + b"\x00" * (offset - len(existing))
|
existing = existing + b"\x00" * (offset - len(existing))
|
||||||
note.write_content(existing[:offset] + data + existing[offset + len(data):])
|
note.write(existing[:offset] + data + existing[offset + len(data):])
|
||||||
return len(data)
|
return len(data)
|
||||||
|
|
||||||
async def release(self, fh: FileHandleT) -> None:
|
async def release(self, fh: FileHandleT) -> None:
|
||||||
|
|
@ -379,8 +380,12 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
|
|
||||||
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
||||||
parent_note = self._get_inode_note(parent_inode)
|
parent_note = self._get_inode_note(parent_inode)
|
||||||
|
if not isinstance(parent_note, TroveTree):
|
||||||
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
name_str = name.decode()
|
name_str = name.decode()
|
||||||
parent_note.rm_child(name_str, False)
|
if name_str not in parent_note.list():
|
||||||
|
raise pyfuse3.FUSEError(errno.ENOENT)
|
||||||
|
parent_note.unlink(name.decode())
|
||||||
|
|
||||||
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
|
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
|
||||||
# Decode / validate names
|
# Decode / validate names
|
||||||
|
|
|
||||||
|
|
@ -22,20 +22,20 @@ class TroveMainWindow(QMainWindow):
|
||||||
self.setWindowTitle("Trove")
|
self.setWindowTitle("Trove")
|
||||||
|
|
||||||
# ── Toolbar ──
|
# ── Toolbar ──
|
||||||
# toolbar = QToolBar("Main")
|
toolbar = QToolBar("Main")
|
||||||
# toolbar.setObjectName("maintoolbar")
|
toolbar.setObjectName("maintoolbar")
|
||||||
# toolbar.setMovable(False)
|
toolbar.setMovable(False)
|
||||||
# self.addToolBar(toolbar)
|
self.addToolBar(toolbar)
|
||||||
#
|
|
||||||
# new_action = QAction("New", self)
|
new_action = QAction("New", self)
|
||||||
# new_action.setShortcut(QKeySequence.StandardKey.New)
|
new_action.setShortcut(QKeySequence.StandardKey.New)
|
||||||
# toolbar.addAction(new_action)
|
toolbar.addAction(new_action)
|
||||||
#
|
|
||||||
# save_action = QAction("Save", self)
|
save_action = QAction("Save", self)
|
||||||
# save_action.setShortcut(QKeySequence.StandardKey.Save)
|
save_action.setShortcut(QKeySequence.StandardKey.Save)
|
||||||
# toolbar.addAction(save_action)
|
toolbar.addAction(save_action)
|
||||||
#
|
|
||||||
# toolbar.addSeparator()
|
toolbar.addSeparator()
|
||||||
|
|
||||||
# ── Central layout ──
|
# ── Central layout ──
|
||||||
central = QWidget()
|
central = QWidget()
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
"""Tool Supporting Basic Editor Functions"""
|
"""Tool Supporting Basic Editor Functions"""
|
||||||
from typing import cast, Protocol
|
from typing import cast, Protocol
|
||||||
from PySide6.QtCore import QTimer
|
|
||||||
from PySide6.QtWidgets import QTextEdit, QVBoxLayout
|
from PySide6.QtWidgets import QTextEdit, QVBoxLayout
|
||||||
|
|
||||||
import trovedb.trove as tr
|
import trovedb.trove as tr
|
||||||
|
|
@ -17,33 +16,7 @@ class ToolBasicEditor(Tool):
|
||||||
|
|
||||||
self._text_edit = QTextEdit()
|
self._text_edit = QTextEdit()
|
||||||
layout.addWidget(self._text_edit)
|
layout.addWidget(self._text_edit)
|
||||||
|
|
||||||
self._content_dirty = False
|
|
||||||
self._auto_save_timer = QTimer(self)
|
|
||||||
self._auto_save_timer.setSingleShot(True)
|
|
||||||
self._auto_save_timer.setInterval(2000) # 2 seconds after typing stops
|
|
||||||
self._auto_save_timer.timeout.connect(self._perform_auto_save)
|
|
||||||
|
|
||||||
self._text_edit.textChanged.connect(self._schedule_auto_save)
|
|
||||||
|
|
||||||
self.refresh()
|
self.refresh()
|
||||||
|
|
||||||
def refresh(self):
|
def refresh(self):
|
||||||
self._text_edit.setPlainText(self.note.read_content().decode("utf-8"))
|
self._text_edit.setPlainText(self.note.read_content().decode("utf-8"))
|
||||||
self._content_dirty = False
|
|
||||||
|
|
||||||
def _schedule_auto_save(self):
|
|
||||||
self._content_dirty = True
|
|
||||||
self._auto_save_timer.stop()
|
|
||||||
self._auto_save_timer.start()
|
|
||||||
|
|
||||||
def _perform_auto_save(self):
|
|
||||||
if self._content_dirty:
|
|
||||||
content = self._text_edit.toPlainText().encode("utf-8")
|
|
||||||
self.note.write_content(content)
|
|
||||||
self._content_dirty = False
|
|
||||||
|
|
||||||
def closeEvent(self, event):
|
|
||||||
if self._content_dirty:
|
|
||||||
self._perform_auto_save()
|
|
||||||
super().closeEvent(event)
|
|
||||||
|
|
|
||||||
|
|
@ -1,39 +1,14 @@
|
||||||
from typing import Protocol, runtime_checkable, Optional, Dict, List, Self, NamedTuple, Iterable, TypedDict, Iterator
|
from typing import Protocol, runtime_checkable, Optional, Dict, List, Self, NamedTuple, Iterable, TypedDict
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
from pathlib import PurePosixPath
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import errno
|
|
||||||
|
|
||||||
type ObjectId = int | str | UUID
|
type ObjectId = int | str | UUID
|
||||||
|
|
||||||
class TroveError(Exception):
|
class TroveError(Exception):
|
||||||
"""Base class for all Trove errors."""
|
"""Base class for all Trove errors."""
|
||||||
|
|
||||||
class ErrorWithErrno(TroveError):
|
|
||||||
"""Raised when an error occurs with an errno."""
|
|
||||||
|
|
||||||
def __init__(self, error: int, *args):
|
|
||||||
super().__init__(*args)
|
|
||||||
self.errno = error
|
|
||||||
|
|
||||||
class ErrorExists(ErrorWithErrno):
|
|
||||||
"""Raised when a note already exists."""
|
|
||||||
|
|
||||||
def __init__(self, *args):
|
|
||||||
super().__init__(errno.EEXIST, *args)
|
|
||||||
|
|
||||||
class ErrorNotFound(ErrorWithErrno):
|
|
||||||
"""Raised when a note is not found."""
|
|
||||||
|
|
||||||
def __init__(self, *args):
|
|
||||||
super().__init__(errno.ENOENT, *args)
|
|
||||||
|
|
||||||
class ErrorNotEmpty(ErrorWithErrno):
|
|
||||||
"""Raised when a directory is not empty."""
|
|
||||||
|
|
||||||
def __init__(self, *args):
|
|
||||||
super().__init__(errno.ENOTEMPTY, *args)
|
|
||||||
|
|
||||||
|
|
||||||
class BadNoteType(TypeError):
|
class BadNoteType(TypeError):
|
||||||
"""Raised when an invalid note type is encountered."""
|
"""Raised when an invalid note type is encountered."""
|
||||||
|
|
||||||
|
|
@ -46,12 +21,6 @@ class NoteNotFound(KeyError):
|
||||||
class OpenArguments(TypedDict):
|
class OpenArguments(TypedDict):
|
||||||
create: bool
|
create: bool
|
||||||
|
|
||||||
class TreeEntry(NamedTuple):
|
|
||||||
name: str
|
|
||||||
object_id: ObjectId
|
|
||||||
|
|
||||||
DEFAULT_MIME = "application/octet-stream"
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class Note(Protocol):
|
class Note(Protocol):
|
||||||
"""
|
"""
|
||||||
|
|
@ -95,30 +64,10 @@ class Note(Protocol):
|
||||||
"""Write the raw content of the note."""
|
"""Write the raw content of the note."""
|
||||||
...
|
...
|
||||||
|
|
||||||
def children(self) -> Iterator[TreeEntry]:
|
|
||||||
"""Get all children of this note."""
|
|
||||||
...
|
|
||||||
|
|
||||||
def child(self, name: str) -> 'Note':
|
|
||||||
"""Retrieve a child note by name."""
|
|
||||||
...
|
|
||||||
|
|
||||||
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> 'Note':
|
|
||||||
"""Create a new child note."""
|
|
||||||
...
|
|
||||||
|
|
||||||
def rm_child(self, name: str, recurse: bool):
|
|
||||||
"""Remove a child note."""
|
|
||||||
...
|
|
||||||
|
|
||||||
def has_children(self) -> bool:
|
|
||||||
"""Check if note has children."""
|
|
||||||
return next(self.children(), None) is not None
|
|
||||||
|
|
||||||
|
|
||||||
def new_child(note: Note, name: str, mime: str = DEFAULT_MIME, content: bytes | None = None, executable: bool = False, hidden: bool = False) -> Note:
|
|
||||||
return note.new_child(name=name, mime=mime, content=content, executable=executable, hidden=hidden)
|
|
||||||
|
|
||||||
|
class TreeEntry(NamedTuple):
|
||||||
|
name: str
|
||||||
|
object_id: ObjectId
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class Tree(Protocol):
|
class Tree(Protocol):
|
||||||
|
|
@ -138,10 +87,17 @@ class Tree(Protocol):
|
||||||
"""Remove a directory from the tree."""
|
"""Remove a directory from the tree."""
|
||||||
...
|
...
|
||||||
|
|
||||||
|
def child(self, name: str) -> Note:
|
||||||
|
"""Retrieve a child note by name."""
|
||||||
|
...
|
||||||
|
|
||||||
def entries(self) -> Iterable[TreeEntry]:
|
def entries(self) -> Iterable[TreeEntry]:
|
||||||
"""Return all entries in the directory"""
|
"""Return all entries in the directory"""
|
||||||
...
|
...
|
||||||
|
|
||||||
|
def list(self) -> dict[str, int]:
|
||||||
|
"""Return all entries as {name: object_id}."""
|
||||||
|
...
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class TreeNote(Note, Tree, Protocol):
|
class TreeNote(Note, Tree, Protocol):
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,9 @@ Implements BlobNote, TreeNote, and Trove protocols defined in trove.py.
|
||||||
Depends on db.py (Sqlite3Trove) for storage.
|
Depends on db.py (Sqlite3Trove) for storage.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import Optional, Iterator
|
from typing import Optional
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import uuid
|
|
||||||
|
|
||||||
from .db import Sqlite3Trove, NOTE_ROOT_ID
|
from .db import Sqlite3Trove, NOTE_ROOT_ID
|
||||||
|
|
||||||
|
|
@ -21,19 +20,9 @@ class NoteImpl(Note):
|
||||||
"""Concrete note implementation."""
|
"""Concrete note implementation."""
|
||||||
|
|
||||||
def __init__(self, parent: 'TroveImpl', object_id: ObjectId):
|
def __init__(self, parent: 'TroveImpl', object_id: ObjectId):
|
||||||
if not isinstance(object_id, uuid.UUID):
|
|
||||||
object_id = uuid.UUID(str(object_id))
|
|
||||||
assert isinstance(object_id, uuid.UUID)
|
|
||||||
|
|
||||||
self._parent = parent
|
self._parent = parent
|
||||||
self._db = parent.db
|
self._db = parent.db
|
||||||
self._object_id: uuid.UUID = object_id
|
self._object_id = object_id
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_impl_id(note: Note) -> uuid.UUID:
|
|
||||||
if not isinstance(note.object_id, uuid.UUID):
|
|
||||||
raise TypeError("Note not compatible with NoteImpl")
|
|
||||||
return note.object_id
|
|
||||||
|
|
||||||
# Note protocol
|
# Note protocol
|
||||||
@property
|
@property
|
||||||
|
|
@ -47,8 +36,7 @@ class NoteImpl(Note):
|
||||||
@property
|
@property
|
||||||
def mtime(self) -> dt.datetime:
|
def mtime(self) -> dt.datetime:
|
||||||
"""Return modification time as UTC datetime."""
|
"""Return modification time as UTC datetime."""
|
||||||
mtime = self._db.get_mtime(self._object_id)
|
return self._db.get_mtime(self._object_id)
|
||||||
return mtime if mtime is not None else dt.datetime.now(tz=dt.timezone.utc)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def mime(self) -> str:
|
def mime(self) -> str:
|
||||||
|
|
@ -69,39 +57,6 @@ class NoteImpl(Note):
|
||||||
def write_content(self, data: bytes) -> None:
|
def write_content(self, data: bytes) -> None:
|
||||||
self._db.write_content(self._object_id, data)
|
self._db.write_content(self._object_id, data)
|
||||||
|
|
||||||
def children(self) -> Iterator[TreeEntry]:
|
|
||||||
"""Get all children of this note."""
|
|
||||||
for name, object_id in self._db.list_tree(self._object_id).items():
|
|
||||||
yield TreeEntry(name, object_id)
|
|
||||||
|
|
||||||
def new_child(self, name: str, mime: str, content: bytes | None, executable: bool, hidden: bool) -> Note:
|
|
||||||
"""Create a new child note."""
|
|
||||||
content = content if content is not None else b""
|
|
||||||
object_id = self._db.write_blob(data=content, object_id=None, dtype=mime, executable=executable, hidden=hidden)
|
|
||||||
# TODO fix this
|
|
||||||
if mime == 'inode/directory':
|
|
||||||
return TreeNoteImpl(self._parent, object_id)
|
|
||||||
return NoteImpl(self._parent, object_id)
|
|
||||||
|
|
||||||
def child(self, name: str) -> Note:
|
|
||||||
"""Retrieve a child note by name."""
|
|
||||||
entries = self._db.list_tree(self._object_id)
|
|
||||||
if name not in entries:
|
|
||||||
raise tr.ErrorNotFound(name)
|
|
||||||
child_id = entries[name]
|
|
||||||
value = self._parent.get_raw_note(child_id)
|
|
||||||
if value is None:
|
|
||||||
raise tr.ErrorNotFound("dangling child link") # FIXME: better errors
|
|
||||||
return value
|
|
||||||
|
|
||||||
def rm_child(self, name: str, recurse: bool) -> None:
|
|
||||||
"""Remove a child note."""
|
|
||||||
note = self.child(name)
|
|
||||||
if note.has_children() and not recurse:
|
|
||||||
raise tr.ErrorNotEmpty(name)
|
|
||||||
self._db.unlink(self._object_id, name)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TreeNoteImpl(NoteImpl, TreeNote):
|
class TreeNoteImpl(NoteImpl, TreeNote):
|
||||||
"""Concrete TreeNote: a tree object backed by the tree_entries table."""
|
"""Concrete TreeNote: a tree object backed by the tree_entries table."""
|
||||||
|
|
@ -109,7 +64,7 @@ class TreeNoteImpl(NoteImpl, TreeNote):
|
||||||
# Tree protocol
|
# Tree protocol
|
||||||
def link(self, name: str, note: Note) -> None:
|
def link(self, name: str, note: Note) -> None:
|
||||||
"""Link name to an existing note."""
|
"""Link name to an existing note."""
|
||||||
self._db.link(self._object_id, name, NoteImpl.get_impl_id(note))
|
self._db.link(self._object_id, name, note.object_id)
|
||||||
|
|
||||||
def unlink(self, name: str) -> None:
|
def unlink(self, name: str) -> None:
|
||||||
"""Remove an entry by name."""
|
"""Remove an entry by name."""
|
||||||
|
|
@ -126,13 +81,25 @@ class TreeNoteImpl(NoteImpl, TreeNote):
|
||||||
"""Remove a directory from the tree."""
|
"""Remove a directory from the tree."""
|
||||||
self.unlink(name)
|
self.unlink(name)
|
||||||
|
|
||||||
|
def child(self, name: str) -> Note:
|
||||||
|
"""Retrieve a child note by name."""
|
||||||
|
entries = self._db.list_tree(self._object_id)
|
||||||
|
if name not in entries:
|
||||||
|
raise KeyError(f"Entry '{name}' not found")
|
||||||
|
child_id = entries[name]
|
||||||
|
value = self._parent.get_raw_note(child_id)
|
||||||
|
if value is None:
|
||||||
|
raise KeyError(f"Entry '{name}' has no value")
|
||||||
|
return value
|
||||||
|
|
||||||
def entries(self):
|
def entries(self):
|
||||||
"""Return all entries as an iterable of TreeEntry."""
|
"""Return all entries as an iterable of TreeEntry."""
|
||||||
for name, object_id in self._db.list_tree(self._object_id).items():
|
for name, object_id in self._db.list_tree(self._object_id).items():
|
||||||
yield TreeEntry(name, object_id)
|
yield TreeEntry(name, object_id)
|
||||||
|
|
||||||
|
def list(self) -> dict[str, ObjectId]:
|
||||||
|
"""Return all entries as {name: object_id}."""
|
||||||
|
return self._db.list_tree(self._object_id)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
@ -170,8 +137,6 @@ class TroveImpl:
|
||||||
# Trove protocol
|
# Trove protocol
|
||||||
def get_raw_note(self, note_id: ObjectId) -> Note:
|
def get_raw_note(self, note_id: ObjectId) -> Note:
|
||||||
"""Return a BlobNote or TreeNote for the given id, or None if not found."""
|
"""Return a BlobNote or TreeNote for the given id, or None if not found."""
|
||||||
if not isinstance(note_id, uuid.UUID):
|
|
||||||
note_id = uuid.UUID(str(note_id))
|
|
||||||
info = self._db.get_info(note_id)
|
info = self._db.get_info(note_id)
|
||||||
if info is None:
|
if info is None:
|
||||||
raise NoteNotFound(note_id)
|
raise NoteNotFound(note_id)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue