trove/trovedb/fuse/server.py

433 lines
16 KiB
Python

"""
fuse.py — FUSE filesystem layer for Trove, backed by pyfuse3 + trio.
Blob objects are exposed as regular files.
Tree objects are exposed as directories.
Inode numbers map 1:1 to Trove object IDs (NODE_ROOT_ID == pyfuse3 root inode == 1).
Entry point: serve(trove, mountpoint)
"""
import errno
import os
import stat
import time
import logging
from typing import Sequence, Tuple, cast
import pyfuse3
import trio
from pyfuse3 import InodeT, FileHandleT
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, Blob as TroveBlob, ObjectId, TreeExists
logger = logging.getLogger(__name__)
class _TroveEntry:
__slots__ = [ 'object_id', 'sys_inode', 'ref_count' ]
def __init__(self, sys_inode: InodeT, object_id: ObjectId | None):
self.object_id: ObjectId | None = object_id
self.sys_inode: InodeT = sys_inode
self.ref_count = 0
def ref(self) -> None:
self.ref_count += 1
def deref(self, count: int = 1) -> bool:
assert self.ref_count > 0
self.ref_count -= count
return self.ref_count <= 0
class _TroveHandle:
__slots__ = [ 'inode_id', 'ref_count', 'note', 'handle_id' ]
def __init__(self, inode_id: InodeT, handle_id: FileHandleT, note: Note):
self.inode_id = inode_id
self.handle_id = handle_id
self.note = note
class _TroveHandleTree(_TroveHandle):
@property
def tree(self) -> TreeNote:
return cast(TreeNote, self.note)
class TroveFuseOps(pyfuse3.Operations):
def __init__(self, trove: Trove):
super().__init__()
self._trove = trove
# Inode Cache
self._next_inode = 2
self._inode_cache: dict[InodeT, _TroveEntry] = {}
self._inode_reverse_cache: dict[ObjectId, InodeT] = {}
# Cache and Lock Root Inode
node_root = trove.get_root()
self._inode_cache[pyfuse3.ROOT_INODE] = _TroveEntry(pyfuse3.ROOT_INODE, node_root.object_id)
self._inode_reverse_cache[node_root.object_id] = pyfuse3.ROOT_INODE
self._inode_cache[pyfuse3.ROOT_INODE].ref()
self._inode_cache[pyfuse3.ROOT_INODE].ref()
# Handles
self._next_handle = 2
self._handles: dict[int, _TroveHandle] = {}
# ------------------------------------------------------------------
# Entry Management [entries relate inode to Note]
# ------------------------------------------------------------------
def _get_ent_from_inode(self, inode: InodeT) -> _TroveEntry:
"""Get entry from predefined inode"""
if inode not in self._inode_cache:
logger.debug("inode not found in cache: %d", inode)
raise pyfuse3.FUSEError(errno.ENOENT)
value = self._inode_cache[inode]
return value
def _create_get_ent_from_note(self, note: Note) -> _TroveEntry:
"""Create entry from note. Inode is reserved but not saved in cache."""
if note.object_id in self._inode_reverse_cache:
sys_inode = self._inode_reverse_cache[note.object_id]
return self._inode_cache[sys_inode]
sys_inode = InodeT(self._next_inode)
self._next_inode += 1
return _TroveEntry(sys_inode=sys_inode, object_id=note.object_id)
def _ref_entry(self, ent: _TroveEntry) -> None:
"""Ref entry. If it is not in cache, it is added to cache."""
if ent.sys_inode not in self._inode_cache:
self._inode_cache[ent.sys_inode] = ent
self._inode_reverse_cache[ent.object_id] = ent.sys_inode
ent.ref()
def _deref_entry(self, ent: _TroveEntry, count: int = 1) -> None:
"""Deref entry. Remove from cache if count hits 0"""
if ent.deref(count):
if ent.sys_inode in self._inode_cache:
logger.debug("free inode: %d", ent.sys_inode)
del self._inode_cache[ent.sys_inode]
del self._inode_reverse_cache[ent.object_id]
def _get_inode_note(self, inode: InodeT) -> Note:
"""Get note from Inode, inode must be reserved"""
ent = self._get_ent_from_inode(inode)
return self._get_ent_note(ent)
def _get_ent_note(self, ent: _TroveEntry) -> Note:
"""Get note from entry."""
note = self._trove.get_raw_note(ent.object_id)
if note is None:
logger.debug("note lookup failed: %s", ent.object_id)
raise pyfuse3.FUSEError(errno.ENOENT)
return note
def _lookup_update_object(self, object_id: ObjectId) -> _TroveEntry:
if object_id in self._inode_reverse_cache:
inode = self._inode_reverse_cache[object_id]
return self._lookup_existing(inode)
else:
inode_id = InodeT(self._next_inode)
self._next_inode += 1
inode = _TroveEntry(sys_inode=inode_id, object_id=object_id)
self._inode_cache[inode_id] = inode
self._inode_reverse_cache[object_id] = inode_id
return inode
def _lookup_child(self, parent_inode: InodeT, name: bytes) -> Tuple[_TroveEntry, Note]:
parent = self._get_inode_note(parent_inode)
if not isinstance(parent, TreeNote):
raise pyfuse3.FUSEError(errno.ENOTDIR)
try:
note = parent.child(name.decode())
except KeyError:
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
raise pyfuse3.FUSEError(errno.ENOENT) from None
ent = self._create_get_ent_from_note(note)
return ent, note
def _get_sys_inode_id(self, object_id: ObjectId) -> InodeT:
if object_id in self._inode_reverse_cache:
return self._inode_reverse_cache[object_id]
else:
raise pyfuse3.FUSEError(errno.ENOENT)
# ------------------------------------------------------------------
# Handle Management
# ------------------------------------------------------------------
def _open_handle(self, inode: InodeT) -> _TroveHandle:
note = self._get_inode_note(inode)
handle_id = FileHandleT(self._next_handle)
self._next_handle += 1
handle: _TroveHandle
if isinstance(note, TreeNote):
handle = _TroveHandleTree(inode_id=inode, handle_id=handle_id, note=note)
else:
handle = _TroveHandle(inode_id=inode, handle_id=handle_id, note=note)
self._handles[handle_id] = handle
return handle
def _get_handle(self, handle_id: FileHandleT) -> _TroveHandle:
if not handle_id in self._handles:
raise pyfuse3.FUSEError(errno.EBADF)
return self._handles[handle_id]
def _close_handle(self, handle: _TroveHandle):
del self._handles[handle.handle_id]
def _get_attr(self, ent: _TroveEntry, note: Note) -> pyfuse3.EntryAttributes:
# Determine basic information
is_tree = True
size = 0
if isinstance(note, TroveBlob):
size = len(note.read())
is_tree = False
# Create and fill attr structure
attr = pyfuse3.EntryAttributes()
attr.st_ino = ent.sys_inode
attr.st_nlink = 1
attr.st_uid = os.getuid()
attr.st_gid = os.getgid()
now_ns = int(time.time() * 1e9)
attr.st_atime_ns = now_ns
attr.st_mtime_ns = now_ns
attr.st_ctime_ns = now_ns
attr.generation = 0
attr.entry_timeout = 5.0
attr.attr_timeout = 5.0
if is_tree:
attr.st_mode = stat.S_IFDIR | 0o755
attr.st_size = 0
attr.st_blksize = 512
attr.st_blocks = 0
else:
attr.st_mode = stat.S_IFREG | 0o644
attr.st_size = size
attr.st_blksize = 512
attr.st_blocks = (size + 511) // 512
return attr
# ------------------------------------------------------------------
# Stat / lookup
# ------------------------------------------------------------------
async def getattr(self, inode: InodeT, ctx=None) -> pyfuse3.EntryAttributes:
logger.debug("getattr inode:%d", inode)
ent = self._get_ent_from_inode(inode)
note = self._get_ent_note(ent)
return self._get_attr(ent, note)
async def lookup(self, parent_inode: InodeT, name: bytes, ctx=None) -> pyfuse3.EntryAttributes:
logger.debug("lookup inode:%d name:%s", parent_inode, name)
ent, child = self._lookup_child(parent_inode, name)
self._ref_entry(ent)
return self._get_attr(ent, child)
async def setattr(self, inode: InodeT, attr, fields, fh: FileHandleT | None, ctx) -> pyfuse3.EntryAttributes:
ent = self._get_ent_from_inode(inode)
note = self._get_ent_note(ent)
if fields.update_size:
if isinstance(note, TroveBlob):
current = note.read()
new_size = attr.st_size
if new_size < len(current):
note.write(current[:new_size])
elif new_size > len(current):
note.write(current + b"\x00" * (new_size - len(current)))
else:
raise pyfuse3.FUSEError(errno.EINVAL)
return self._get_attr(ent, note)
def forget(self, inode_list: Sequence[Tuple[InodeT, int]]) -> None:
for inode, nlookup in inode_list:
try:
logger.debug("deref inode:%d count:%d", inode, nlookup)
self._deref_entry(self._get_ent_from_inode(inode), nlookup)
except pyfuse3.FUSEError as e:
logger.warning("Failed to deref inode %d: %s", inode, str(e))
# ------------------------------------------------------------------
# Directory ops
# ------------------------------------------------------------------
async def opendir(self, inode: InodeT, ctx) -> FileHandleT:
handle = self._open_handle(inode)
if not isinstance(handle, _TroveHandleTree):
logger.debug("attempted opendir on %d not a tree", inode)
self._close_handle(handle)
raise pyfuse3.FUSEError(errno.ENOTDIR)
logger.debug("opened dir inode %d -> handle %d", inode, handle.handle_id)
return handle.handle_id
async def readdir(self, fh: FileHandleT, start_id: int, token) -> None:
logger.debug("readdir %d start_id %d", fh, start_id)
handle = self._get_handle(fh)
note = handle.note
if not isinstance(note, TroveTree):
logger.debug("attempted readdir on %d not a tree", fh)
raise pyfuse3.FUSEError(errno.ENOTDIR)
entries = list(note.list().items()) # [(name, object_id), ...]
for idx, (name, child_id) in enumerate(entries):
if idx < start_id:
continue
child = self._trove.get_raw_note(child_id)
if child is None:
continue
child_ent = self._create_get_ent_from_note(child)
attr = self._get_attr(child_ent, child)
self._ref_entry(child_ent)
if not pyfuse3.readdir_reply(token, name.encode(), attr, idx + 1):
break
async def releasedir(self, fh: FileHandleT) -> None:
logger.debug("releasedir %d", fh)
handle = self._get_handle(fh)
self._close_handle(handle)
async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes:
logger.debug("mkdir inode:%d name:%s", parent_inode, name)
# Grab parent note, verify is tree
parent = self._get_inode_note(parent_inode)
if not isinstance(parent, TreeNote):
raise pyfuse3.FUSEError(errno.ENOTDIR)
# Create new directory in note
try:
new_tree: TreeNote = parent.mkdir(name.decode())
except TreeExists:
raise pyfuse3.FUSEError(errno.EEXIST) from None
# Grab entity for kernel
ent = self._create_get_ent_from_note(new_tree)
self._ref_entry(ent)
return self._get_attr(ent, new_tree)
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
parent = self._get_inode_note(parent_inode)
if not isinstance(parent, TreeNote):
raise pyfuse3.FUSEError(errno.ENOTDIR)
try:
parent.unlink(name.decode())
except KeyError:
raise pyfuse3.FUSEError(errno.ENOENT) from None
# ------------------------------------------------------------------
# File ops
# ------------------------------------------------------------------
async def open(self, inode: InodeT, flags, ctx) -> pyfuse3.FileInfo:
handle = self._open_handle(inode)
if isinstance(handle.note, TroveTree):
self._close_handle(handle)
raise pyfuse3.FUSEError(errno.EISDIR)
return pyfuse3.FileInfo(fh=handle.handle_id)
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
logger.debug("create inode:%d name:%s", parent_inode, name)
parent = self._get_inode_note(parent_inode)
if not isinstance(parent, TroveTree):
raise pyfuse3.FUSEError(errno.ENOTDIR)
name_str = name.decode()
if name_str in parent.list():
raise pyfuse3.FUSEError(errno.EEXIST)
blob = self._trove.create_blob(b"")
parent.link(name_str, blob)
ent = self._create_get_ent_from_note(blob)
self._ref_entry(ent)
handle = self._open_handle(ent.sys_inode)
attr = self._get_attr(ent, blob)
return pyfuse3.FileInfo(fh=handle.handle_id), attr
async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes:
logger.debug("read fh:%d offset:%d length:%d", fh, offset, length)
handle = self._get_handle(fh)
note = handle.note
if isinstance(note, TroveBlob):
return note.read()[offset:offset + length]
raise pyfuse3.FUSEError(errno.EBADF)
async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int:
handle = self._get_handle(fh)
note = handle.note
if isinstance(note, TroveBlob):
existing = note.read()
if offset > len(existing):
existing = existing + b"\x00" * (offset - len(existing))
note.write(existing[:offset] + data + existing[offset + len(data):])
return len(data)
async def release(self, fh: FileHandleT) -> None:
handle = self._get_handle(fh)
self._close_handle(handle)
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
parent_note = self._get_inode_note(parent_inode)
if not isinstance(parent_note, TroveTree):
raise pyfuse3.FUSEError(errno.ENOTDIR)
name_str = name.decode()
if name_str not in parent_note.list():
raise pyfuse3.FUSEError(errno.ENOENT)
parent_note.unlink(name.decode())
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
# Decode / validate names
name_new_str = name_new.decode()
name_old_str = name_old.decode()
# Grab the parents
new_parent = self._get_inode_note(parent_inode_new)
if not isinstance(new_parent, TroveTree):
raise pyfuse3.FUSEError(errno.ENOTDIR)
old_parent = self._get_inode_note(parent_inode_old)
if not isinstance(old_parent, TroveTree):
raise pyfuse3.FUSEError(errno.ENOTDIR)
# We want to maintain the inode - find the note via the internal entity
ent, note = self._lookup_child(parent_inode_old, name_old)
# Remove existing target
new_parent.unlink(name_new_str)
# Link to new parent, unlink from old
new_parent.link(name_new_str, note)
old_parent.unlink(name_old_str)
# ------------------------------------------------------------------
# Serve
# ------------------------------------------------------------------
async def _run(ops: TroveFuseOps, mountpoint: str) -> None:
logging.basicConfig(level=logging.DEBUG)
options = set(pyfuse3.default_options)
options.add("fsname=trove")
pyfuse3.init(ops, mountpoint, options)
try:
await pyfuse3.main()
finally:
pyfuse3.close()
def serve(trove: Trove, mountpoint: str) -> None:
"""
Mount a Trove store at mountpoint and serve until KeyboardInterrupt.
Runs a trio event loop internally.
"""
ops = TroveFuseOps(trove)
try:
trio.run(_run, ops, mountpoint)
except KeyboardInterrupt:
pass