""" fuse.py — FUSE filesystem layer for Trove, backed by pyfuse3 + trio. Blob objects are exposed as regular files. Tree objects are exposed as directories. Inode numbers map 1:1 to Trove object IDs (NODE_ROOT_ID == pyfuse3 root inode == 1). Entry point: serve(trove, mountpoint) """ import errno import os import stat import time import logging from typing import Sequence, Tuple, cast import pyfuse3 import trio from pyfuse3 import InodeT, FileHandleT from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, Blob as TroveBlob, ObjectId, TreeExists logger = logging.getLogger(__name__) class _TroveEntry: __slots__ = [ 'object_id', 'sys_inode', 'ref_count' ] def __init__(self, sys_inode: InodeT, object_id: ObjectId | None): self.object_id: ObjectId | None = object_id self.sys_inode: InodeT = sys_inode self.ref_count = 0 def ref(self) -> None: self.ref_count += 1 def deref(self, count: int = 1) -> bool: assert self.ref_count > 0 self.ref_count -= count return self.ref_count <= 0 class _TroveHandle: __slots__ = [ 'inode_id', 'ref_count', 'note', 'handle_id' ] def __init__(self, inode_id: InodeT, handle_id: FileHandleT, note: Note): self.inode_id = inode_id self.handle_id = handle_id self.note = note class _TroveHandleTree(_TroveHandle): @property def tree(self) -> TreeNote: return cast(TreeNote, self.note) class TroveFuseOps(pyfuse3.Operations): def __init__(self, trove: Trove): super().__init__() self._trove = trove # Inode Cache self._next_inode = 2 self._inode_cache: dict[InodeT, _TroveEntry] = {} self._inode_reverse_cache: dict[ObjectId, InodeT] = {} # Cache and Lock Root Inode node_root = trove.get_root() self._inode_cache[pyfuse3.ROOT_INODE] = _TroveEntry(pyfuse3.ROOT_INODE, node_root.object_id) self._inode_reverse_cache[node_root.object_id] = pyfuse3.ROOT_INODE self._inode_cache[pyfuse3.ROOT_INODE].ref() self._inode_cache[pyfuse3.ROOT_INODE].ref() # Handles self._next_handle = 2 self._handles: dict[int, _TroveHandle] = {} # ------------------------------------------------------------------ # Entry Management [entries relate inode to Note] # ------------------------------------------------------------------ def _get_ent_from_inode(self, inode: InodeT) -> _TroveEntry: """Get entry from predefined inode""" if inode not in self._inode_cache: logger.debug("inode not found in cache: %d", inode) raise pyfuse3.FUSEError(errno.ENOENT) value = self._inode_cache[inode] return value def _create_get_ent_from_note(self, note: Note) -> _TroveEntry: """Create entry from note. Inode is reserved but not saved in cache.""" if note.object_id in self._inode_reverse_cache: sys_inode = self._inode_reverse_cache[note.object_id] return self._inode_cache[sys_inode] sys_inode = InodeT(self._next_inode) self._next_inode += 1 return _TroveEntry(sys_inode=sys_inode, object_id=note.object_id) def _ref_entry(self, ent: _TroveEntry) -> None: """Ref entry. If it is not in cache, it is added to cache.""" if ent.sys_inode not in self._inode_cache: self._inode_cache[ent.sys_inode] = ent self._inode_reverse_cache[ent.object_id] = ent.sys_inode ent.ref() def _deref_entry(self, ent: _TroveEntry, count: int = 1) -> None: """Deref entry. Remove from cache if count hits 0""" if ent.deref(count): if ent.sys_inode in self._inode_cache: logger.debug("free inode: %d", ent.sys_inode) del self._inode_cache[ent.sys_inode] del self._inode_reverse_cache[ent.object_id] def _get_inode_note(self, inode: InodeT) -> Note: """Get note from Inode, inode must be reserved""" ent = self._get_ent_from_inode(inode) return self._get_ent_note(ent) def _get_ent_note(self, ent: _TroveEntry) -> Note: """Get note from entry.""" note = self._trove.get_raw_note(ent.object_id) if note is None: logger.debug("note lookup failed: %s", ent.object_id) raise pyfuse3.FUSEError(errno.ENOENT) return note def _lookup_update_object(self, object_id: ObjectId) -> _TroveEntry: if object_id in self._inode_reverse_cache: inode = self._inode_reverse_cache[object_id] return self._lookup_existing(inode) else: inode_id = InodeT(self._next_inode) self._next_inode += 1 inode = _TroveEntry(sys_inode=inode_id, object_id=object_id) self._inode_cache[inode_id] = inode self._inode_reverse_cache[object_id] = inode_id return inode def _lookup_child(self, parent_inode: InodeT, name: bytes) -> Tuple[_TroveEntry, Note]: parent = self._get_inode_note(parent_inode) if not isinstance(parent, TreeNote): raise pyfuse3.FUSEError(errno.ENOTDIR) try: note = parent.child(name.decode()) except KeyError: logger.debug("lookup failed: %d -> %s", parent_inode, name.decode()) raise pyfuse3.FUSEError(errno.ENOENT) from None ent = self._create_get_ent_from_note(note) return ent, note def _get_sys_inode_id(self, object_id: ObjectId) -> InodeT: if object_id in self._inode_reverse_cache: return self._inode_reverse_cache[object_id] else: raise pyfuse3.FUSEError(errno.ENOENT) # ------------------------------------------------------------------ # Handle Management # ------------------------------------------------------------------ def _open_handle(self, inode: InodeT) -> _TroveHandle: note = self._get_inode_note(inode) handle_id = FileHandleT(self._next_handle) self._next_handle += 1 handle: _TroveHandle if isinstance(note, TreeNote): handle = _TroveHandleTree(inode_id=inode, handle_id=handle_id, note=note) else: handle = _TroveHandle(inode_id=inode, handle_id=handle_id, note=note) self._handles[handle_id] = handle return handle def _get_handle(self, handle_id: FileHandleT) -> _TroveHandle: if not handle_id in self._handles: raise pyfuse3.FUSEError(errno.EBADF) return self._handles[handle_id] def _close_handle(self, handle: _TroveHandle): del self._handles[handle.handle_id] def _get_attr(self, ent: _TroveEntry, note: Note) -> pyfuse3.EntryAttributes: # Determine basic information is_tree = True size = 0 if isinstance(note, TroveBlob): size = len(note.read()) is_tree = False # Create and fill attr structure attr = pyfuse3.EntryAttributes() attr.st_ino = ent.sys_inode attr.st_nlink = 1 attr.st_uid = os.getuid() attr.st_gid = os.getgid() mtime_ns = int(note.mtime.timestamp() * 1e9) now_ns = int(time.time() * 1e9) attr.st_atime_ns = now_ns attr.st_mtime_ns = mtime_ns attr.st_ctime_ns = mtime_ns attr.generation = 0 attr.entry_timeout = 5.0 attr.attr_timeout = 5.0 # Determine permissions based on readonly property if is_tree: mode = 0o755 if not note.readonly else 0o555 attr.st_mode = stat.S_IFDIR | mode attr.st_size = 0 attr.st_blksize = 512 attr.st_blocks = 0 else: mode = 0o644 if not note.readonly else 0o444 attr.st_mode = stat.S_IFREG | mode attr.st_size = size attr.st_blksize = 512 attr.st_blocks = (size + 511) // 512 return attr # ------------------------------------------------------------------ # Stat / lookup # ------------------------------------------------------------------ async def getattr(self, inode: InodeT, ctx=None) -> pyfuse3.EntryAttributes: logger.debug("getattr inode:%d", inode) ent = self._get_ent_from_inode(inode) note = self._get_ent_note(ent) return self._get_attr(ent, note) async def lookup(self, parent_inode: InodeT, name: bytes, ctx=None) -> pyfuse3.EntryAttributes: logger.debug("lookup inode:%d name:%s", parent_inode, name) ent, child = self._lookup_child(parent_inode, name) self._ref_entry(ent) return self._get_attr(ent, child) async def setattr(self, inode: InodeT, attr, fields, fh: FileHandleT | None, ctx) -> pyfuse3.EntryAttributes: ent = self._get_ent_from_inode(inode) note = self._get_ent_note(ent) if fields.update_size: if isinstance(note, TroveBlob): current = note.read() new_size = attr.st_size if new_size < len(current): note.write(current[:new_size]) elif new_size > len(current): note.write(current + b"\x00" * (new_size - len(current))) else: raise pyfuse3.FUSEError(errno.EINVAL) return self._get_attr(ent, note) def forget(self, inode_list: Sequence[Tuple[InodeT, int]]) -> None: for inode, nlookup in inode_list: try: logger.debug("deref inode:%d count:%d", inode, nlookup) self._deref_entry(self._get_ent_from_inode(inode), nlookup) except pyfuse3.FUSEError as e: logger.warning("Failed to deref inode %d: %s", inode, str(e)) # ------------------------------------------------------------------ # Directory ops # ------------------------------------------------------------------ async def opendir(self, inode: InodeT, ctx) -> FileHandleT: handle = self._open_handle(inode) if not isinstance(handle, _TroveHandleTree): logger.debug("attempted opendir on %d not a tree", inode) self._close_handle(handle) raise pyfuse3.FUSEError(errno.ENOTDIR) logger.debug("opened dir inode %d -> handle %d", inode, handle.handle_id) return handle.handle_id async def readdir(self, fh: FileHandleT, start_id: int, token) -> None: logger.debug("readdir %d start_id %d", fh, start_id) handle = self._get_handle(fh) note = handle.note if not isinstance(note, TroveTree): logger.debug("attempted readdir on %d not a tree", fh) raise pyfuse3.FUSEError(errno.ENOTDIR) entries = list(note.list().items()) # [(name, object_id), ...] for idx, (name, child_id) in enumerate(entries): if idx < start_id: continue child = self._trove.get_raw_note(child_id) if child is None: continue child_ent = self._create_get_ent_from_note(child) attr = self._get_attr(child_ent, child) self._ref_entry(child_ent) if not pyfuse3.readdir_reply(token, name.encode(), attr, idx + 1): break async def releasedir(self, fh: FileHandleT) -> None: logger.debug("releasedir %d", fh) handle = self._get_handle(fh) self._close_handle(handle) async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes: logger.debug("mkdir inode:%d name:%s", parent_inode, name) # Grab parent note, verify is tree parent = self._get_inode_note(parent_inode) if not isinstance(parent, TreeNote): raise pyfuse3.FUSEError(errno.ENOTDIR) # Create new directory in note try: new_tree: TreeNote = parent.mkdir(name.decode()) except TreeExists: raise pyfuse3.FUSEError(errno.EEXIST) from None # Grab entity for kernel ent = self._create_get_ent_from_note(new_tree) self._ref_entry(ent) return self._get_attr(ent, new_tree) async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None: logger.debug("rmdir inode:%d name:%s", parent_inode, name) parent = self._get_inode_note(parent_inode) if not isinstance(parent, TreeNote): raise pyfuse3.FUSEError(errno.ENOTDIR) try: parent.unlink(name.decode()) except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) from None # ------------------------------------------------------------------ # File ops # ------------------------------------------------------------------ async def open(self, inode: InodeT, flags, ctx) -> pyfuse3.FileInfo: handle = self._open_handle(inode) if isinstance(handle.note, TroveTree): self._close_handle(handle) raise pyfuse3.FUSEError(errno.EISDIR) return pyfuse3.FileInfo(fh=handle.handle_id) async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple: logger.debug("create inode:%d name:%s", parent_inode, name) parent = self._get_inode_note(parent_inode) if not isinstance(parent, TroveTree): raise pyfuse3.FUSEError(errno.ENOTDIR) name_str = name.decode() if name_str in parent.list(): raise pyfuse3.FUSEError(errno.EEXIST) blob = self._trove.create_blob(b"") parent.link(name_str, blob) ent = self._create_get_ent_from_note(blob) self._ref_entry(ent) handle = self._open_handle(ent.sys_inode) attr = self._get_attr(ent, blob) return pyfuse3.FileInfo(fh=handle.handle_id), attr async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes: logger.debug("read fh:%d offset:%d length:%d", fh, offset, length) handle = self._get_handle(fh) note = handle.note if isinstance(note, TroveBlob): return note.read()[offset:offset + length] raise pyfuse3.FUSEError(errno.EBADF) async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int: handle = self._get_handle(fh) note = handle.note if isinstance(note, TroveBlob): existing = note.read() if offset > len(existing): existing = existing + b"\x00" * (offset - len(existing)) note.write(existing[:offset] + data + existing[offset + len(data):]) return len(data) async def release(self, fh: FileHandleT) -> None: handle = self._get_handle(fh) self._close_handle(handle) async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None: parent_note = self._get_inode_note(parent_inode) if not isinstance(parent_note, TroveTree): raise pyfuse3.FUSEError(errno.ENOTDIR) name_str = name.decode() if name_str not in parent_note.list(): raise pyfuse3.FUSEError(errno.ENOENT) parent_note.unlink(name.decode()) async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx): # Decode / validate names name_new_str = name_new.decode() name_old_str = name_old.decode() # Grab the parents new_parent = self._get_inode_note(parent_inode_new) if not isinstance(new_parent, TroveTree): raise pyfuse3.FUSEError(errno.ENOTDIR) old_parent = self._get_inode_note(parent_inode_old) if not isinstance(old_parent, TroveTree): raise pyfuse3.FUSEError(errno.ENOTDIR) # We want to maintain the inode - find the note via the internal entity ent, note = self._lookup_child(parent_inode_old, name_old) # Remove existing target new_parent.unlink(name_new_str) # Link to new parent, unlink from old new_parent.link(name_new_str, note) old_parent.unlink(name_old_str) # ------------------------------------------------------------------ # Serve # ------------------------------------------------------------------ async def _run(ops: TroveFuseOps, mountpoint: str) -> None: logging.basicConfig(level=logging.DEBUG) options = set(pyfuse3.default_options) options.add("fsname=trove") pyfuse3.init(ops, mountpoint, options) try: await pyfuse3.main() finally: pyfuse3.close() def serve(trove: Trove, mountpoint: str) -> None: """ Mount a Trove store at mountpoint and serve until KeyboardInterrupt. Runs a trio event loop internally. """ ops = TroveFuseOps(trove) try: trio.run(_run, ops, mountpoint) except KeyboardInterrupt: pass