Compare commits
No commits in common. "b2516146673f5d8bb7b21a25c8eb0dc0a84e0704" and "41480a39c909ab26c3fb09d363d6aeaabb0d7b1e" have entirely different histories.
b251614667
...
41480a39c9
5 changed files with 164 additions and 121 deletions
|
|
@ -14,8 +14,6 @@ from typing import NamedTuple
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from . import trove as tr
|
|
||||||
|
|
||||||
NOTE_ROOT_ID = uuid.UUID(int=0)
|
NOTE_ROOT_ID = uuid.UUID(int=0)
|
||||||
|
|
||||||
class ObjectInfo(NamedTuple):
|
class ObjectInfo(NamedTuple):
|
||||||
|
|
@ -290,21 +288,13 @@ class Sqlite3Trove:
|
||||||
# Tree entry operations
|
# Tree entry operations
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def link(self, parent_id: SqlObjectId, name: str, child_id: SqlObjectId, overwrite: bool = True) -> None:
|
def link(self, parent_id: SqlObjectId, name: str, child_id: SqlObjectId) -> None:
|
||||||
"""
|
"""
|
||||||
Link a child object into a tree under the given name.
|
Link a child object into a tree under the given name.
|
||||||
Replaces any existing entry with the same name in this tree if overwrite=True.
|
Replaces any existing entry with the same name in this tree.
|
||||||
Both parent_id and child_id must exist in the objects table
|
Both parent_id and child_id must exist in the objects table
|
||||||
(enforced by FK constraints).
|
(enforced by FK constraints).
|
||||||
"""
|
"""
|
||||||
if not overwrite:
|
|
||||||
existing = self._con.execute(
|
|
||||||
"SELECT 1 FROM tree_entries WHERE parent_id = ? AND name = ?",
|
|
||||||
(_sql_id(parent_id), name),
|
|
||||||
).fetchone()
|
|
||||||
if existing:
|
|
||||||
raise tr.ErrorExists(f"Entry '{name}' already exists in tree {parent_id}")
|
|
||||||
|
|
||||||
self._con.execute(
|
self._con.execute(
|
||||||
"INSERT OR REPLACE INTO tree_entries (parent_id, name, child_id) "
|
"INSERT OR REPLACE INTO tree_entries (parent_id, name, child_id) "
|
||||||
"VALUES (?, ?, ?)",
|
"VALUES (?, ?, ?)",
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,12 @@
|
||||||
import os
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import tempfile
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
from pathlib import Path
|
from pathlib import Path, PurePosixPath
|
||||||
from typing import Optional, Iterable, Iterator, override
|
from typing import Optional, Dict, List, Self, Iterable, Iterator
|
||||||
|
|
||||||
from .trove import Note, Trove, BadNoteType, TreeEntry, NoteNotFound, ObjectId
|
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound, ObjectId
|
||||||
|
from . import fs_util as fsu
|
||||||
from . import trove as tr
|
from . import trove as tr
|
||||||
|
|
||||||
class FSNote(Note):
|
class FSNote(Note):
|
||||||
|
|
@ -75,9 +78,9 @@ class FSNote(Note):
|
||||||
content = b""
|
content = b""
|
||||||
|
|
||||||
if mime == 'inode/directory':
|
if mime == 'inode/directory':
|
||||||
if content:
|
if content is not None:
|
||||||
raise NotImplementedError("FSNote does not support children")
|
raise NotImplementedError("FSNote does not support children")
|
||||||
return FSNote(self._trove, self._new_child_subdir(name, False))
|
return FSTreeNote(self._trove, self._new_child_subdir(name, False))
|
||||||
|
|
||||||
ex_path = self._fs_path / name
|
ex_path = self._fs_path / name
|
||||||
ex_path.write_bytes(content)
|
ex_path.write_bytes(content)
|
||||||
|
|
@ -101,25 +104,27 @@ class FSNote(Note):
|
||||||
target_path.unlink()
|
target_path.unlink()
|
||||||
# TODO: remove meta directory!
|
# TODO: remove meta directory!
|
||||||
|
|
||||||
def unlink_(self, name: str):
|
|
||||||
target_path = self._fs_path / name
|
|
||||||
if not target_path.exists():
|
|
||||||
return
|
|
||||||
if target_path.is_dir():
|
|
||||||
target_path.rmdir()
|
|
||||||
else:
|
|
||||||
target_path.unlink()
|
|
||||||
|
|
||||||
def link_(self, name: str, note: Note):
|
def children(self) -> Iterator[TreeEntry]:
|
||||||
|
"""Get all children of this note."""
|
||||||
|
if not self._fs_path.is_dir():
|
||||||
|
return
|
||||||
|
for item in self._fs_path.iterdir():
|
||||||
|
if item.name == ".trove":
|
||||||
|
continue
|
||||||
|
yield TreeEntry(name=item.name, object_id=item.stat().st_ino)
|
||||||
|
|
||||||
|
class FSTreeNote(FSNote, TreeNote):
|
||||||
|
def link(self, name: str, note: Note):
|
||||||
if not isinstance(note, FSNote):
|
if not isinstance(note, FSNote):
|
||||||
raise BadNoteType("Only blob notes can be linked")
|
raise BadNoteType("Only blob notes can be linked")
|
||||||
|
|
||||||
target_path = self._fs_path / name
|
target_path = self._fs_path / name
|
||||||
if target_path.exists():
|
if target_path.exists():
|
||||||
self.unlink_(name)
|
self.unlink(name)
|
||||||
|
|
||||||
note_path = note._fs_path
|
note_path = note._fs_path
|
||||||
|
|
||||||
# If the note is in .working, move it to the new location.
|
# If the note is in .working, move it to the new location.
|
||||||
if self._trove.working in note_path.parents:
|
if self._trove.working in note_path.parents:
|
||||||
os.rename(note_path, target_path)
|
os.rename(note_path, target_path)
|
||||||
|
|
@ -132,18 +137,34 @@ class FSNote(Note):
|
||||||
# Fallback to rename if link fails (e.g. cross-device, though we assume single FS)
|
# Fallback to rename if link fails (e.g. cross-device, though we assume single FS)
|
||||||
os.rename(note_path, target_path)
|
os.rename(note_path, target_path)
|
||||||
else:
|
else:
|
||||||
# Directories cannot be hardlinked.
|
# Directories cannot be hardlinked.
|
||||||
# We move it to the new location.
|
# We move it to the new location.
|
||||||
os.rename(note_path, target_path)
|
os.rename(note_path, target_path)
|
||||||
|
|
||||||
def children(self) -> Iterator[TreeEntry]:
|
def unlink(self, name: str):
|
||||||
"""Get all children of this note."""
|
target_path = self._fs_path / name
|
||||||
if not self._fs_path.is_dir():
|
if not target_path.exists():
|
||||||
return
|
return
|
||||||
for item in self._fs_path.iterdir():
|
if target_path.is_dir():
|
||||||
if item.name == ".trove":
|
target_path.rmdir()
|
||||||
continue
|
else:
|
||||||
yield TreeEntry(name=item.name, object_id=str(self._fs_path / item.name))
|
target_path.unlink()
|
||||||
|
|
||||||
|
def mkdir(self, name: str) -> 'FSTreeNote':
|
||||||
|
target_path = self._fs_path / name
|
||||||
|
target_path.mkdir(exist_ok=True)
|
||||||
|
return FSTreeNote(self._trove, path=target_path)
|
||||||
|
|
||||||
|
def entries(self) -> Iterable[TreeEntry]:
|
||||||
|
try:
|
||||||
|
for item in self._fs_path.iterdir():
|
||||||
|
if item.name == ".trove":
|
||||||
|
continue
|
||||||
|
yield TreeEntry(name=item.name, object_id=str(item))
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class FSTrove(Trove):
|
class FSTrove(Trove):
|
||||||
|
|
@ -167,6 +188,8 @@ class FSTrove(Trove):
|
||||||
def get_raw_note_by_path(self, path: Path) -> Note:
|
def get_raw_note_by_path(self, path: Path) -> Note:
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
raise tr.ErrorNotFound(str(path))
|
raise tr.ErrorNotFound(str(path))
|
||||||
|
if path.is_dir():
|
||||||
|
return FSTreeNote(self, path=path)
|
||||||
return FSNote(self, path=path)
|
return FSNote(self, path=path)
|
||||||
|
|
||||||
def get_raw_note(self, note_id: ObjectId) -> Note:
|
def get_raw_note(self, note_id: ObjectId) -> Note:
|
||||||
|
|
@ -175,30 +198,11 @@ class FSTrove(Trove):
|
||||||
raise NoteNotFound(note_id)
|
raise NoteNotFound(note_id)
|
||||||
return self.get_raw_note_by_path(p)
|
return self.get_raw_note_by_path(p)
|
||||||
|
|
||||||
@override
|
|
||||||
def move(self, src_parent: Note, src_name: str, dst_parent: Note, dst_name: str, overwrite: bool):
|
|
||||||
"""Move a child note to a new location."""
|
|
||||||
src_note = src_parent.child(src_name)
|
|
||||||
if not isinstance(src_note, FSNote):
|
|
||||||
raise tr.ErrorBadType("not a valid DB note")
|
|
||||||
if not isinstance(dst_parent, FSNote):
|
|
||||||
raise tr.ErrorBadType("not a valid DB note")
|
|
||||||
if not dst_parent._fs_path.is_dir():
|
|
||||||
raise NotImplementedError("FSTrove promoting during move")
|
|
||||||
|
|
||||||
# Remove existing target
|
|
||||||
if overwrite:
|
|
||||||
dst_parent.unlink_(dst_name)
|
|
||||||
|
|
||||||
# Link to new parent, unlink from old
|
|
||||||
dst_parent.link_(dst_name, src_note)
|
|
||||||
src_parent.rm_child(src_name, True)
|
|
||||||
|
|
||||||
def create_blob(self, data: bytes | None = None) -> Note:
|
def create_blob(self, data: bytes | None = None) -> Note:
|
||||||
raise NotImplementedError("FSTrove does not support blobs")
|
raise NotImplementedError("FSTrove does not support blobs")
|
||||||
|
|
||||||
def get_root(self) -> Note:
|
def get_root(self) -> TreeNote:
|
||||||
return FSNote(self, path=self.root)
|
return FSTreeNote(self, path=self.root)
|
||||||
|
|
||||||
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
|
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
|
||||||
raise NotImplementedError("FSTrove does not support metadata")
|
raise NotImplementedError("FSTrove does not support metadata")
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import pyfuse3
|
||||||
import trio
|
import trio
|
||||||
from pyfuse3 import InodeT, FileHandleT
|
from pyfuse3 import InodeT, FileHandleT
|
||||||
|
|
||||||
from trovedb.trove import Trove, Note, ObjectId, TreeExists
|
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, ObjectId, TreeExists
|
||||||
|
|
||||||
import trovedb.trove as tr
|
import trovedb.trove as tr
|
||||||
|
|
||||||
|
|
@ -50,12 +50,8 @@ class _TroveHandle:
|
||||||
|
|
||||||
class _TroveHandleTree(_TroveHandle):
|
class _TroveHandleTree(_TroveHandle):
|
||||||
@property
|
@property
|
||||||
def tree(self) -> Note:
|
def tree(self) -> TreeNote:
|
||||||
return self.note
|
return cast(TreeNote, self.note)
|
||||||
|
|
||||||
def _note_has_folder(note: Note) -> bool:
|
|
||||||
"""Return TRUE if a note name should have an associated folder"""
|
|
||||||
return note.has_children() or note.mime == "inode/directory"
|
|
||||||
|
|
||||||
|
|
||||||
class TroveFuseOps(pyfuse3.Operations):
|
class TroveFuseOps(pyfuse3.Operations):
|
||||||
|
|
@ -142,11 +138,13 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
|
|
||||||
def _lookup_child(self, parent_inode: InodeT, name: bytes) -> Tuple[_TroveEntry, Note]:
|
def _lookup_child(self, parent_inode: InodeT, name: bytes) -> Tuple[_TroveEntry, Note]:
|
||||||
parent = self._get_inode_note(parent_inode)
|
parent = self._get_inode_note(parent_inode)
|
||||||
|
if not isinstance(parent, TreeNote):
|
||||||
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
try:
|
try:
|
||||||
note = parent.child(name.decode())
|
note = parent.child(name.decode())
|
||||||
except tr.ErrorWithErrno as e:
|
except (KeyError, tr.ErrorNotFound):
|
||||||
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
|
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
|
||||||
raise pyfuse3.FUSEError(e.errno) from None
|
raise pyfuse3.FUSEError(errno.ENOENT) from None
|
||||||
ent = self._create_get_ent_from_note(note)
|
ent = self._create_get_ent_from_note(note)
|
||||||
return ent, note
|
return ent, note
|
||||||
|
|
||||||
|
|
@ -162,18 +160,15 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def _open_handle(self, inode: InodeT) -> _TroveHandle:
|
def _open_handle(self, inode: InodeT) -> _TroveHandle:
|
||||||
logger.debug("open_handle inode:%d", inode)
|
|
||||||
note = self._get_inode_note(inode)
|
note = self._get_inode_note(inode)
|
||||||
|
|
||||||
handle_id = FileHandleT(self._next_handle)
|
handle_id = FileHandleT(self._next_handle)
|
||||||
self._next_handle += 1
|
self._next_handle += 1
|
||||||
|
|
||||||
handle: _TroveHandle
|
handle: _TroveHandle
|
||||||
if _note_has_folder(note):
|
if isinstance(note, TreeNote):
|
||||||
logger.debug("open_handle inode:%d is a folder", inode)
|
|
||||||
handle = _TroveHandleTree(inode_id=inode, handle_id=handle_id, note=note)
|
handle = _TroveHandleTree(inode_id=inode, handle_id=handle_id, note=note)
|
||||||
else:
|
else:
|
||||||
logger.debug("open_handle inode:%d is a file", inode)
|
|
||||||
handle = _TroveHandle(inode_id=inode, handle_id=handle_id, note=note)
|
handle = _TroveHandle(inode_id=inode, handle_id=handle_id, note=note)
|
||||||
|
|
||||||
self._handles[handle_id] = handle
|
self._handles[handle_id] = handle
|
||||||
|
|
@ -191,9 +186,7 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
# Determine basic information
|
# Determine basic information
|
||||||
is_tree = True
|
is_tree = True
|
||||||
size = 0
|
size = 0
|
||||||
|
if not hasattr(note, 'mkdir'):
|
||||||
# FIXME: Properly support folder / content, right now it's either or
|
|
||||||
if not _note_has_folder(note):
|
|
||||||
size = len(note.read_content())
|
size = len(note.read_content())
|
||||||
is_tree = False
|
is_tree = False
|
||||||
|
|
||||||
|
|
@ -272,7 +265,6 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
async def opendir(self, inode: InodeT, ctx) -> FileHandleT:
|
async def opendir(self, inode: InodeT, ctx) -> FileHandleT:
|
||||||
logger.debug("opendir inode:%d", inode)
|
|
||||||
handle = self._open_handle(inode)
|
handle = self._open_handle(inode)
|
||||||
if not isinstance(handle, _TroveHandleTree):
|
if not isinstance(handle, _TroveHandleTree):
|
||||||
logger.debug("attempted opendir on %d not a tree", inode)
|
logger.debug("attempted opendir on %d not a tree", inode)
|
||||||
|
|
@ -285,7 +277,10 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
logger.debug("readdir %d start_id %d", fh, start_id)
|
logger.debug("readdir %d start_id %d", fh, start_id)
|
||||||
handle = self._get_handle(fh)
|
handle = self._get_handle(fh)
|
||||||
note = handle.note
|
note = handle.note
|
||||||
entries = list(note.children()) # [(name, object_id), ...]
|
if not isinstance(note, TroveTree):
|
||||||
|
logger.debug("attempted readdir on %d not a tree", fh)
|
||||||
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
|
entries = list(note.entries()) # [(name, object_id), ...]
|
||||||
|
|
||||||
for idx, entry in enumerate(entries):
|
for idx, entry in enumerate(entries):
|
||||||
if idx < start_id:
|
if idx < start_id:
|
||||||
|
|
@ -325,10 +320,13 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
||||||
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
|
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
|
||||||
parent = self._get_inode_note(parent_inode)
|
parent = self._get_inode_note(parent_inode)
|
||||||
|
if not isinstance(parent, TreeNote):
|
||||||
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
try:
|
try:
|
||||||
parent.rm_child(name.decode(), False)
|
parent.unlink(name.decode())
|
||||||
except tr.ErrorWithErrno as e:
|
except KeyError:
|
||||||
raise pyfuse3.FUSEError(e.errno) from None
|
raise pyfuse3.FUSEError(errno.ENOENT) from None
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# File ops
|
# File ops
|
||||||
|
|
@ -336,10 +334,9 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
|
|
||||||
async def open(self, inode: InodeT, flags, ctx) -> pyfuse3.FileInfo:
|
async def open(self, inode: InodeT, flags, ctx) -> pyfuse3.FileInfo:
|
||||||
handle = self._open_handle(inode)
|
handle = self._open_handle(inode)
|
||||||
# FIXME: Add support for inode tree and inode content
|
if isinstance(handle.note, TroveTree):
|
||||||
# if isinstance(handle.note, TroveTree):
|
self._close_handle(handle)
|
||||||
# self._close_handle(handle)
|
raise pyfuse3.FUSEError(errno.EISDIR)
|
||||||
# raise pyfuse3.FUSEError(errno.EISDIR)
|
|
||||||
return pyfuse3.FileInfo(fh=handle.handle_id)
|
return pyfuse3.FileInfo(fh=handle.handle_id)
|
||||||
|
|
||||||
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
|
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
|
||||||
|
|
@ -381,27 +378,33 @@ class TroveFuseOps(pyfuse3.Operations):
|
||||||
self._close_handle(handle)
|
self._close_handle(handle)
|
||||||
|
|
||||||
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
||||||
try:
|
parent_note = self._get_inode_note(parent_inode)
|
||||||
parent_note = self._get_inode_note(parent_inode)
|
name_str = name.decode()
|
||||||
name_str = name.decode()
|
parent_note.rm_child(name_str, False)
|
||||||
parent_note.rm_child(name_str, False)
|
|
||||||
except tr.ErrorWithErrno as e:
|
|
||||||
raise pyfuse3.FUSEError(e.errno) from None
|
|
||||||
|
|
||||||
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
|
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
|
||||||
# # Decode / validate names
|
# Decode / validate names
|
||||||
name_new_str = name_new.decode()
|
name_new_str = name_new.decode()
|
||||||
name_old_str = name_old.decode()
|
name_old_str = name_old.decode()
|
||||||
|
|
||||||
# Grab the parents
|
# Grab the parents
|
||||||
new_parent = self._get_inode_note(parent_inode_new)
|
new_parent = self._get_inode_note(parent_inode_new)
|
||||||
|
if not isinstance(new_parent, TroveTree):
|
||||||
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
old_parent = self._get_inode_note(parent_inode_old)
|
old_parent = self._get_inode_note(parent_inode_old)
|
||||||
|
if not isinstance(old_parent, TroveTree):
|
||||||
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
||||||
|
|
||||||
|
# We want to maintain the inode - find the note via the internal entity
|
||||||
|
ent, note = self._lookup_child(parent_inode_old, name_old)
|
||||||
|
|
||||||
|
# Remove existing target
|
||||||
|
new_parent.unlink(name_new_str)
|
||||||
|
|
||||||
|
# Link to new parent, unlink from old
|
||||||
|
new_parent.link(name_new_str, note)
|
||||||
|
old_parent.unlink(name_old_str)
|
||||||
|
|
||||||
# Move!
|
|
||||||
try:
|
|
||||||
self._trove.move(old_parent, name_old_str, new_parent, name_new_str, overwrite=True)
|
|
||||||
except tr.ErrorWithErrno as e:
|
|
||||||
raise pyfuse3.FUSEError(e.errno) from None
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Serve
|
# Serve
|
||||||
|
|
|
||||||
|
|
@ -33,10 +33,6 @@ class ErrorNotEmpty(ErrorWithErrno):
|
||||||
def __init__(self, *args):
|
def __init__(self, *args):
|
||||||
super().__init__(errno.ENOTEMPTY, *args)
|
super().__init__(errno.ENOTEMPTY, *args)
|
||||||
|
|
||||||
class ErrorBadType(TypeError):
|
|
||||||
"""Raised when an invalid type is encountered."""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
class BadNoteType(TypeError):
|
class BadNoteType(TypeError):
|
||||||
"""Raised when an invalid note type is encountered."""
|
"""Raised when an invalid note type is encountered."""
|
||||||
|
|
@ -124,6 +120,34 @@ def new_child(note: Note, name: str, mime: str = DEFAULT_MIME, content: bytes |
|
||||||
return note.new_child(name=name, mime=mime, content=content, executable=executable, hidden=hidden)
|
return note.new_child(name=name, mime=mime, content=content, executable=executable, hidden=hidden)
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class Tree(Protocol):
|
||||||
|
def link(self, name: str, note: Note):
|
||||||
|
"""Link name to a given note."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def unlink(self, name: str):
|
||||||
|
"""Remove name from the tree."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def mkdir(self, name: str) -> Self:
|
||||||
|
"""Create a new Tree with the given name."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def rmdir(self, name: str) -> None:
|
||||||
|
"""Remove a directory from the tree."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def entries(self) -> Iterable[TreeEntry]:
|
||||||
|
"""Return all entries in the directory"""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class TreeNote(Note, Tree, Protocol):
|
||||||
|
"""Tree Note"""
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class Trove(Protocol):
|
class Trove(Protocol):
|
||||||
"""
|
"""
|
||||||
|
|
@ -135,14 +159,10 @@ class Trove(Protocol):
|
||||||
"""Retrieve a note by a object id"""
|
"""Retrieve a note by a object id"""
|
||||||
...
|
...
|
||||||
|
|
||||||
def move(self, src_parent: Note, src_name: str, dst_parent: Note, dst_name: str, overwrite: bool):
|
|
||||||
"""Move a child note to a new location."""
|
|
||||||
...
|
|
||||||
|
|
||||||
def create_blob(self, data: bytes | None = None) -> Note:
|
def create_blob(self, data: bytes | None = None) -> Note:
|
||||||
"""Create a new blob node at the given path with content"""
|
"""Create a new blob node at the given path with content"""
|
||||||
...
|
...
|
||||||
|
|
||||||
def get_root(self) -> Note:
|
def get_root(self) -> TreeNote:
|
||||||
"""Get Tree Node at the given path"""
|
"""Get Tree Node at the given path"""
|
||||||
...
|
...
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ Implements BlobNote, TreeNote, and Trove protocols defined in trove.py.
|
||||||
Depends on db.py (Sqlite3Trove) for storage.
|
Depends on db.py (Sqlite3Trove) for storage.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import Optional, Iterator, override
|
from typing import Optional, Iterator
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import uuid
|
import uuid
|
||||||
|
|
@ -14,7 +14,7 @@ from .db import Sqlite3Trove, NOTE_ROOT_ID
|
||||||
|
|
||||||
from . import trove as tr
|
from . import trove as tr
|
||||||
|
|
||||||
from .trove import Note, Trove, TreeEntry, NoteNotFound, ObjectId
|
from .trove import Note, Trove, TreeNote, TreeEntry, NoteNotFound, ObjectId
|
||||||
|
|
||||||
|
|
||||||
class NoteImpl(Note):
|
class NoteImpl(Note):
|
||||||
|
|
@ -78,7 +78,9 @@ class NoteImpl(Note):
|
||||||
"""Create a new child note."""
|
"""Create a new child note."""
|
||||||
content = content if content is not None else b""
|
content = content if content is not None else b""
|
||||||
object_id = self._db.write_blob(data=content, object_id=None, dtype=mime, executable=executable, hidden=hidden)
|
object_id = self._db.write_blob(data=content, object_id=None, dtype=mime, executable=executable, hidden=hidden)
|
||||||
self._db.link(self._object_id, name, object_id)
|
# TODO fix this
|
||||||
|
if mime == 'inode/directory':
|
||||||
|
return TreeNoteImpl(self._parent, object_id)
|
||||||
return NoteImpl(self._parent, object_id)
|
return NoteImpl(self._parent, object_id)
|
||||||
|
|
||||||
def child(self, name: str) -> Note:
|
def child(self, name: str) -> Note:
|
||||||
|
|
@ -100,11 +102,44 @@ class NoteImpl(Note):
|
||||||
self._db.unlink(self._object_id, name)
|
self._db.unlink(self._object_id, name)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class TreeNoteImpl(NoteImpl, TreeNote):
|
||||||
|
"""Concrete TreeNote: a tree object backed by the tree_entries table."""
|
||||||
|
|
||||||
|
# Tree protocol
|
||||||
|
def link(self, name: str, note: Note) -> None:
|
||||||
|
"""Link name to an existing note."""
|
||||||
|
self._db.link(self._object_id, name, NoteImpl.get_impl_id(note))
|
||||||
|
|
||||||
|
def unlink(self, name: str) -> None:
|
||||||
|
"""Remove an entry by name."""
|
||||||
|
self._db.unlink(self._object_id, name)
|
||||||
|
|
||||||
|
def mkdir(self, name: str) -> 'TreeNoteImpl':
|
||||||
|
"""Create a new empty tree, link it under name, and return it."""
|
||||||
|
new_id = self._db.write_tree(b"")
|
||||||
|
tree = TreeNoteImpl(self._parent, new_id)
|
||||||
|
self.link(name, tree)
|
||||||
|
return tree
|
||||||
|
|
||||||
|
def rmdir(self, name: str) -> None:
|
||||||
|
"""Remove a directory from the tree."""
|
||||||
|
self.unlink(name)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def entries(self):
|
||||||
|
"""Return all entries as an iterable of TreeEntry."""
|
||||||
|
for name, object_id in self._db.list_tree(self._object_id).items():
|
||||||
|
yield TreeEntry(name, object_id)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Trove
|
# Trove
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
class TroveImpl(Trove):
|
class TroveImpl:
|
||||||
"""
|
"""
|
||||||
Concrete Trove: top-level API backed by a Sqlite3Trove database.
|
Concrete Trove: top-level API backed by a Sqlite3Trove database.
|
||||||
|
|
||||||
|
|
@ -140,28 +175,19 @@ class TroveImpl(Trove):
|
||||||
info = self._db.get_info(note_id)
|
info = self._db.get_info(note_id)
|
||||||
if info is None:
|
if info is None:
|
||||||
raise NoteNotFound(note_id)
|
raise NoteNotFound(note_id)
|
||||||
|
if self._db.is_tree(note_id) or info.type == "inode/directory":
|
||||||
|
return TreeNoteImpl(self, note_id)
|
||||||
return NoteImpl(self, note_id)
|
return NoteImpl(self, note_id)
|
||||||
|
|
||||||
@override
|
|
||||||
def move(self, src_parent: Note, src_name: str, dst_parent: Note, dst_name: str, overwrite: bool):
|
|
||||||
"""Move a child note to a new location."""
|
|
||||||
src_note = src_parent.child(src_name)
|
|
||||||
if not isinstance(src_note, NoteImpl):
|
|
||||||
raise tr.ErrorBadType("not a valid DB note")
|
|
||||||
if not isinstance(dst_parent, NoteImpl):
|
|
||||||
raise tr.ErrorBadType("not a valid DB note")
|
|
||||||
self._db.link(dst_parent.object_id, dst_name, src_note.object_id)
|
|
||||||
self._db.unlink(src_parent.object_id, src_name)
|
|
||||||
|
|
||||||
def create_blob(self, data: bytes | None = None,
|
def create_blob(self, data: bytes | None = None,
|
||||||
dtype: str = "application/octet-stream") -> Note:
|
dtype: str = "application/octet-stream") -> Note:
|
||||||
"""Create a new blob object and return a BlobNote for it."""
|
"""Create a new blob object and return a BlobNote for it."""
|
||||||
obj_id = self._db.write_blob(data or b"", dtype=dtype)
|
obj_id = self._db.write_blob(data or b"", dtype=dtype)
|
||||||
return NoteImpl(self, obj_id)
|
return NoteImpl(self, obj_id)
|
||||||
|
|
||||||
def get_root(self) -> Note:
|
def get_root(self) -> TreeNote:
|
||||||
"""Return the root TreeNote (always id=NOTE_ROOT_ID)."""
|
"""Return the root TreeNote (always id=NOTE_ROOT_ID)."""
|
||||||
return NoteImpl(self, NOTE_ROOT_ID)
|
return TreeNoteImpl(self, NOTE_ROOT_ID)
|
||||||
|
|
||||||
|
|
||||||
def open_db_trove(path: str | Path, create: bool = False, **kwargs: tr.OpenArguments) -> Trove:
|
def open_db_trove(path: str | Path, create: bool = False, **kwargs: tr.OpenArguments) -> Trove:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue