""" fuse.py — FUSE filesystem layer for Trove, backed by pyfuse3 + trio. Blob objects are exposed as regular files. Tree objects are exposed as directories. Inode numbers map 1:1 to Trove object IDs (NODE_ROOT_ID == pyfuse3 root inode == 1). Entry point: serve(trove, mountpoint) """ import errno import os import stat import time import logging from typing import Sequence, Tuple, cast import pyfuse3 import trio from pyfuse3 import InodeT, FileHandleT from trovedb.trove import Trove, Note, ObjectId, TreeExists import trovedb.trove as tr logger = logging.getLogger(__name__) class _TroveEntry: __slots__ = [ 'object_id', 'sys_inode', 'ref_count' ] def __init__(self, sys_inode: InodeT, object_id: ObjectId | None): self.object_id: ObjectId | None = object_id self.sys_inode: InodeT = sys_inode self.ref_count = 0 def ref(self) -> None: self.ref_count += 1 def deref(self, count: int = 1) -> bool: assert self.ref_count > 0 self.ref_count -= count return self.ref_count <= 0 class _TroveHandle: __slots__ = [ 'inode_id', 'ref_count', 'note', 'handle_id' ] def __init__(self, inode_id: InodeT, handle_id: FileHandleT, note: Note): self.inode_id = inode_id self.handle_id = handle_id self.note = note class _TroveHandleTree(_TroveHandle): @property def tree(self) -> Note: return self.note def _note_has_folder(note: Note) -> bool: """Return TRUE if a note name should have an associated folder""" return note.has_children() or note.mime == "inode/directory" class TroveFuseOps(pyfuse3.Operations): def __init__(self, trove: Trove): super().__init__() self._trove = trove # Inode Cache self._next_inode = 2 self._inode_cache: dict[InodeT, _TroveEntry] = {} self._inode_reverse_cache: dict[ObjectId, InodeT] = {} # Cache and Lock Root Inode node_root = trove.get_root() self._inode_cache[pyfuse3.ROOT_INODE] = _TroveEntry(pyfuse3.ROOT_INODE, node_root.object_id) self._inode_reverse_cache[node_root.object_id] = pyfuse3.ROOT_INODE self._inode_cache[pyfuse3.ROOT_INODE].ref() self._inode_cache[pyfuse3.ROOT_INODE].ref() # Handles self._next_handle = 2 self._handles: dict[int, _TroveHandle] = {} # ------------------------------------------------------------------ # Entry Management [entries relate inode to Note] # ------------------------------------------------------------------ def _get_ent_from_inode(self, inode: InodeT) -> _TroveEntry: """Get entry from predefined inode""" if inode not in self._inode_cache: logger.debug("inode not found in cache: %d", inode) raise pyfuse3.FUSEError(errno.ENOENT) value = self._inode_cache[inode] return value def _create_get_ent_from_note(self, note: Note) -> _TroveEntry: """Create entry from note. Inode is reserved but not saved in cache.""" if note.object_id in self._inode_reverse_cache: sys_inode = self._inode_reverse_cache[note.object_id] return self._inode_cache[sys_inode] sys_inode = InodeT(self._next_inode) self._next_inode += 1 return _TroveEntry(sys_inode=sys_inode, object_id=note.object_id) def _ref_entry(self, ent: _TroveEntry) -> None: """Ref entry. If it is not in cache, it is added to cache.""" if ent.sys_inode not in self._inode_cache: self._inode_cache[ent.sys_inode] = ent self._inode_reverse_cache[ent.object_id] = ent.sys_inode ent.ref() def _deref_entry(self, ent: _TroveEntry, count: int = 1) -> None: """Deref entry. Remove from cache if count hits 0""" if ent.deref(count): if ent.sys_inode in self._inode_cache: logger.debug("free inode: %d", ent.sys_inode) del self._inode_cache[ent.sys_inode] del self._inode_reverse_cache[ent.object_id] def _get_inode_note(self, inode: InodeT) -> Note: """Get note from Inode, inode must be reserved""" ent = self._get_ent_from_inode(inode) return self._get_ent_note(ent) def _get_ent_note(self, ent: _TroveEntry) -> Note: """Get note from entry.""" note = self._trove.get_raw_note(ent.object_id) if note is None: logger.debug("note lookup failed: %s", ent.object_id) raise pyfuse3.FUSEError(errno.ENOENT) return note def _lookup_update_object(self, object_id: ObjectId) -> _TroveEntry: if object_id in self._inode_reverse_cache: inode = self._inode_reverse_cache[object_id] return self._lookup_existing(inode) else: inode_id = InodeT(self._next_inode) self._next_inode += 1 inode = _TroveEntry(sys_inode=inode_id, object_id=object_id) self._inode_cache[inode_id] = inode self._inode_reverse_cache[object_id] = inode_id return inode def _lookup_child(self, parent_inode: InodeT, name: bytes) -> Tuple[_TroveEntry, Note]: parent = self._get_inode_note(parent_inode) try: note = parent.child(name.decode()) except tr.ErrorWithErrno as e: logger.debug("lookup failed: %d -> %s", parent_inode, name.decode()) raise pyfuse3.FUSEError(e.errno) from None ent = self._create_get_ent_from_note(note) return ent, note def _get_sys_inode_id(self, object_id: ObjectId) -> InodeT: if object_id in self._inode_reverse_cache: return self._inode_reverse_cache[object_id] else: raise pyfuse3.FUSEError(errno.ENOENT) # ------------------------------------------------------------------ # Handle Management # ------------------------------------------------------------------ def _open_handle(self, inode: InodeT) -> _TroveHandle: logger.debug("open_handle inode:%d", inode) note = self._get_inode_note(inode) handle_id = FileHandleT(self._next_handle) self._next_handle += 1 handle: _TroveHandle if _note_has_folder(note): logger.debug("open_handle inode:%d is a folder", inode) handle = _TroveHandleTree(inode_id=inode, handle_id=handle_id, note=note) else: logger.debug("open_handle inode:%d is a file", inode) handle = _TroveHandle(inode_id=inode, handle_id=handle_id, note=note) self._handles[handle_id] = handle return handle def _get_handle(self, handle_id: FileHandleT) -> _TroveHandle: if not handle_id in self._handles: raise pyfuse3.FUSEError(errno.EBADF) return self._handles[handle_id] def _close_handle(self, handle: _TroveHandle): del self._handles[handle.handle_id] def _get_attr(self, ent: _TroveEntry, note: Note) -> pyfuse3.EntryAttributes: # Determine basic information is_tree = True size = 0 # FIXME: Properly support folder / content, right now it's either or if not _note_has_folder(note): size = len(note.read_content()) is_tree = False # Create and fill attr structure attr = pyfuse3.EntryAttributes() attr.st_ino = ent.sys_inode attr.st_nlink = 1 attr.st_uid = os.getuid() attr.st_gid = os.getgid() mtime_ns = int(note.mtime.timestamp() * 1e9) now_ns = int(time.time() * 1e9) attr.st_atime_ns = now_ns attr.st_mtime_ns = mtime_ns attr.st_ctime_ns = mtime_ns attr.generation = 0 attr.entry_timeout = 5.0 attr.attr_timeout = 5.0 # Determine permissions based on readonly property if is_tree: mode = 0o755 if not note.readonly else 0o555 attr.st_mode = stat.S_IFDIR | mode attr.st_size = 0 attr.st_blksize = 512 attr.st_blocks = 0 else: mode = 0o644 if not note.readonly else 0o444 attr.st_mode = stat.S_IFREG | mode attr.st_size = size attr.st_blksize = 512 attr.st_blocks = (size + 511) // 512 return attr # ------------------------------------------------------------------ # Stat / lookup # ------------------------------------------------------------------ async def getattr(self, inode: InodeT, ctx=None) -> pyfuse3.EntryAttributes: logger.debug("getattr inode:%d", inode) ent = self._get_ent_from_inode(inode) note = self._get_ent_note(ent) return self._get_attr(ent, note) async def lookup(self, parent_inode: InodeT, name: bytes, ctx=None) -> pyfuse3.EntryAttributes: logger.debug("lookup inode:%d name:%s", parent_inode, name) ent, child = self._lookup_child(parent_inode, name) self._ref_entry(ent) return self._get_attr(ent, child) async def setattr(self, inode: InodeT, attr, fields, fh: FileHandleT | None, ctx) -> pyfuse3.EntryAttributes: ent = self._get_ent_from_inode(inode) note = self._get_ent_note(ent) if fields.update_size: if not hasattr(note, 'mkdir'): current = note.read_content() new_size = attr.st_size if new_size < len(current): note.write_content(current[:new_size]) elif new_size > len(current): note.write_content(current + b"\x00" * (new_size - len(current))) else: raise pyfuse3.FUSEError(errno.EINVAL) return self._get_attr(ent, note) def forget(self, inode_list: Sequence[Tuple[InodeT, int]]) -> None: for inode, nlookup in inode_list: try: logger.debug("deref inode:%d count:%d", inode, nlookup) self._deref_entry(self._get_ent_from_inode(inode), nlookup) except pyfuse3.FUSEError as e: logger.warning("Failed to deref inode %d: %s", inode, str(e)) # ------------------------------------------------------------------ # Directory ops # ------------------------------------------------------------------ async def opendir(self, inode: InodeT, ctx) -> FileHandleT: logger.debug("opendir inode:%d", inode) handle = self._open_handle(inode) if not isinstance(handle, _TroveHandleTree): logger.debug("attempted opendir on %d not a tree", inode) self._close_handle(handle) raise pyfuse3.FUSEError(errno.ENOTDIR) logger.debug("opened dir inode %d -> handle %d", inode, handle.handle_id) return handle.handle_id async def readdir(self, fh: FileHandleT, start_id: int, token) -> None: logger.debug("readdir %d start_id %d", fh, start_id) handle = self._get_handle(fh) note = handle.note entries = list(note.children()) # [(name, object_id), ...] for idx, entry in enumerate(entries): if idx < start_id: continue child = self._trove.get_raw_note(entry.object_id) if child is None: continue child_ent = self._create_get_ent_from_note(child) attr = self._get_attr(child_ent, child) self._ref_entry(child_ent) if not pyfuse3.readdir_reply(token, entry.name.encode(), attr, idx + 1): break async def releasedir(self, fh: FileHandleT) -> None: logger.debug("releasedir %d", fh) handle = self._get_handle(fh) self._close_handle(handle) async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes: logger.debug("mkdir inode:%d name:%s", parent_inode, name) parent = self._get_inode_note(parent_inode) # TODO: consider implications here, maybe look at ext on dir for mime? try: note = tr.new_child(parent, name.decode(), mime='inode/directory') except tr.ErrorWithErrno as e: raise pyfuse3.FUSEError(e.errno) from None # Grab entity for kernel ent = self._create_get_ent_from_note(note) self._ref_entry(ent) return self._get_attr(ent, note) async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None: logger.debug("rmdir inode:%d name:%s", parent_inode, name) parent = self._get_inode_note(parent_inode) try: parent.rm_child(name.decode(), False) except tr.ErrorWithErrno as e: raise pyfuse3.FUSEError(e.errno) from None # ------------------------------------------------------------------ # File ops # ------------------------------------------------------------------ async def open(self, inode: InodeT, flags, ctx) -> pyfuse3.FileInfo: handle = self._open_handle(inode) # FIXME: Add support for inode tree and inode content # if isinstance(handle.note, TroveTree): # self._close_handle(handle) # raise pyfuse3.FUSEError(errno.EISDIR) return pyfuse3.FileInfo(fh=handle.handle_id) async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple: logger.debug("create inode:%d name:%s", parent_inode, name) parent = self._get_inode_note(parent_inode) # TODO: handle mode # TODO: handle flags name_str = name.decode() note = tr.new_child(parent, name_str) ent = self._create_get_ent_from_note(note) self._ref_entry(ent) handle = self._open_handle(ent.sys_inode) attr = self._get_attr(ent, note) return pyfuse3.FileInfo(fh=handle.handle_id), attr async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes: logger.debug("read fh:%d offset:%d length:%d", fh, offset, length) handle = self._get_handle(fh) note = handle.note if not hasattr(note, 'mkdir'): return note.read_content()[offset:offset + length] raise pyfuse3.FUSEError(errno.EBADF) async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int: handle = self._get_handle(fh) note = handle.note if not hasattr(note, 'mkdir'): existing = note.read_content() if offset > len(existing): existing = existing + b"\x00" * (offset - len(existing)) note.write_content(existing[:offset] + data + existing[offset + len(data):]) return len(data) async def release(self, fh: FileHandleT) -> None: handle = self._get_handle(fh) self._close_handle(handle) async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None: try: parent_note = self._get_inode_note(parent_inode) name_str = name.decode() parent_note.rm_child(name_str, False) except tr.ErrorWithErrno as e: raise pyfuse3.FUSEError(e.errno) from None async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx): # # Decode / validate names name_new_str = name_new.decode() name_old_str = name_old.decode() # Grab the parents new_parent = self._get_inode_note(parent_inode_new) old_parent = self._get_inode_note(parent_inode_old) # Move! try: self._trove.move(old_parent, name_old_str, new_parent, name_new_str, overwrite=True) except tr.ErrorWithErrno as e: raise pyfuse3.FUSEError(e.errno) from None # ------------------------------------------------------------------ # Serve # ------------------------------------------------------------------ async def _run(ops: TroveFuseOps, mountpoint: str) -> None: logging.basicConfig(level=logging.DEBUG) options = set(pyfuse3.default_options) options.add("fsname=trove") pyfuse3.init(ops, mountpoint, options) try: await pyfuse3.main() finally: pyfuse3.close() def serve(trove: Trove, mountpoint: str) -> None: """ Mount a Trove store at mountpoint and serve until KeyboardInterrupt. Runs a trio event loop internally. """ ops = TroveFuseOps(trove) try: trio.run(_run, ops, mountpoint) except KeyboardInterrupt: pass