trove/trovedb/fuse/server.py

431 lines
16 KiB
Python

"""
fuse.py — FUSE filesystem layer for Trove, backed by pyfuse3 + trio.
Blob objects are exposed as regular files.
Tree objects are exposed as directories.
Inode numbers map 1:1 to Trove object IDs (NODE_ROOT_ID == pyfuse3 root inode == 1).
Entry point: serve(trove, mountpoint)
"""
import errno
import os
import stat
import time
import logging
from typing import Sequence, Tuple, cast
import pyfuse3
import trio
from pyfuse3 import InodeT, FileHandleT
from trovedb.trove import Trove, Note, ObjectId, TreeExists
import trovedb.trove as tr
logger = logging.getLogger(__name__)
class _TroveEntry:
__slots__ = [ 'object_id', 'sys_inode', 'ref_count' ]
def __init__(self, sys_inode: InodeT, object_id: ObjectId | None):
self.object_id: ObjectId | None = object_id
self.sys_inode: InodeT = sys_inode
self.ref_count = 0
def ref(self) -> None:
self.ref_count += 1
def deref(self, count: int = 1) -> bool:
assert self.ref_count > 0
self.ref_count -= count
return self.ref_count <= 0
class _TroveHandle:
__slots__ = [ 'inode_id', 'ref_count', 'note', 'handle_id' ]
def __init__(self, inode_id: InodeT, handle_id: FileHandleT, note: Note):
self.inode_id = inode_id
self.handle_id = handle_id
self.note = note
class _TroveHandleTree(_TroveHandle):
@property
def tree(self) -> Note:
return self.note
def _note_has_folder(note: Note) -> bool:
"""Return TRUE if a note name should have an associated folder"""
return note.has_children() or note.mime == "inode/directory"
class TroveFuseOps(pyfuse3.Operations):
def __init__(self, trove: Trove):
super().__init__()
self._trove = trove
# Inode Cache
self._next_inode = 2
self._inode_cache: dict[InodeT, _TroveEntry] = {}
self._inode_reverse_cache: dict[ObjectId, InodeT] = {}
# Cache and Lock Root Inode
node_root = trove.get_root()
self._inode_cache[pyfuse3.ROOT_INODE] = _TroveEntry(pyfuse3.ROOT_INODE, node_root.object_id)
self._inode_reverse_cache[node_root.object_id] = pyfuse3.ROOT_INODE
self._inode_cache[pyfuse3.ROOT_INODE].ref()
self._inode_cache[pyfuse3.ROOT_INODE].ref()
# Handles
self._next_handle = 2
self._handles: dict[int, _TroveHandle] = {}
# ------------------------------------------------------------------
# Entry Management [entries relate inode to Note]
# ------------------------------------------------------------------
def _get_ent_from_inode(self, inode: InodeT) -> _TroveEntry:
"""Get entry from predefined inode"""
if inode not in self._inode_cache:
logger.debug("inode not found in cache: %d", inode)
raise pyfuse3.FUSEError(errno.ENOENT)
value = self._inode_cache[inode]
return value
def _create_get_ent_from_note(self, note: Note) -> _TroveEntry:
"""Create entry from note. Inode is reserved but not saved in cache."""
if note.object_id in self._inode_reverse_cache:
sys_inode = self._inode_reverse_cache[note.object_id]
return self._inode_cache[sys_inode]
sys_inode = InodeT(self._next_inode)
self._next_inode += 1
return _TroveEntry(sys_inode=sys_inode, object_id=note.object_id)
def _ref_entry(self, ent: _TroveEntry) -> None:
"""Ref entry. If it is not in cache, it is added to cache."""
if ent.sys_inode not in self._inode_cache:
self._inode_cache[ent.sys_inode] = ent
self._inode_reverse_cache[ent.object_id] = ent.sys_inode
ent.ref()
def _deref_entry(self, ent: _TroveEntry, count: int = 1) -> None:
"""Deref entry. Remove from cache if count hits 0"""
if ent.deref(count):
if ent.sys_inode in self._inode_cache:
logger.debug("free inode: %d", ent.sys_inode)
del self._inode_cache[ent.sys_inode]
del self._inode_reverse_cache[ent.object_id]
def _get_inode_note(self, inode: InodeT) -> Note:
"""Get note from Inode, inode must be reserved"""
ent = self._get_ent_from_inode(inode)
return self._get_ent_note(ent)
def _get_ent_note(self, ent: _TroveEntry) -> Note:
"""Get note from entry."""
note = self._trove.get_raw_note(ent.object_id)
if note is None:
logger.debug("note lookup failed: %s", ent.object_id)
raise pyfuse3.FUSEError(errno.ENOENT)
return note
def _lookup_update_object(self, object_id: ObjectId) -> _TroveEntry:
if object_id in self._inode_reverse_cache:
inode = self._inode_reverse_cache[object_id]
return self._lookup_existing(inode)
else:
inode_id = InodeT(self._next_inode)
self._next_inode += 1
inode = _TroveEntry(sys_inode=inode_id, object_id=object_id)
self._inode_cache[inode_id] = inode
self._inode_reverse_cache[object_id] = inode_id
return inode
def _lookup_child(self, parent_inode: InodeT, name: bytes) -> Tuple[_TroveEntry, Note]:
parent = self._get_inode_note(parent_inode)
try:
note = parent.child(name.decode())
except tr.ErrorWithErrno as e:
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
raise pyfuse3.FUSEError(e.errno) from None
ent = self._create_get_ent_from_note(note)
return ent, note
def _get_sys_inode_id(self, object_id: ObjectId) -> InodeT:
if object_id in self._inode_reverse_cache:
return self._inode_reverse_cache[object_id]
else:
raise pyfuse3.FUSEError(errno.ENOENT)
# ------------------------------------------------------------------
# Handle Management
# ------------------------------------------------------------------
def _open_handle(self, inode: InodeT) -> _TroveHandle:
logger.debug("open_handle inode:%d", inode)
note = self._get_inode_note(inode)
handle_id = FileHandleT(self._next_handle)
self._next_handle += 1
handle: _TroveHandle
if _note_has_folder(note):
logger.debug("open_handle inode:%d is a folder", inode)
handle = _TroveHandleTree(inode_id=inode, handle_id=handle_id, note=note)
else:
logger.debug("open_handle inode:%d is a file", inode)
handle = _TroveHandle(inode_id=inode, handle_id=handle_id, note=note)
self._handles[handle_id] = handle
return handle
def _get_handle(self, handle_id: FileHandleT) -> _TroveHandle:
if not handle_id in self._handles:
raise pyfuse3.FUSEError(errno.EBADF)
return self._handles[handle_id]
def _close_handle(self, handle: _TroveHandle):
del self._handles[handle.handle_id]
def _get_attr(self, ent: _TroveEntry, note: Note) -> pyfuse3.EntryAttributes:
# Determine basic information
is_tree = True
size = 0
# FIXME: Properly support folder / content, right now it's either or
if not _note_has_folder(note):
size = len(note.read_content())
is_tree = False
# Create and fill attr structure
attr = pyfuse3.EntryAttributes()
attr.st_ino = ent.sys_inode
attr.st_nlink = 1
attr.st_uid = os.getuid()
attr.st_gid = os.getgid()
mtime_ns = int(note.mtime.timestamp() * 1e9)
now_ns = int(time.time() * 1e9)
attr.st_atime_ns = now_ns
attr.st_mtime_ns = mtime_ns
attr.st_ctime_ns = mtime_ns
attr.generation = 0
attr.entry_timeout = 5.0
attr.attr_timeout = 5.0
# Determine permissions based on readonly property
if is_tree:
mode = 0o755 if not note.readonly else 0o555
attr.st_mode = stat.S_IFDIR | mode
attr.st_size = 0
attr.st_blksize = 512
attr.st_blocks = 0
else:
mode = 0o644 if not note.readonly else 0o444
attr.st_mode = stat.S_IFREG | mode
attr.st_size = size
attr.st_blksize = 512
attr.st_blocks = (size + 511) // 512
return attr
# ------------------------------------------------------------------
# Stat / lookup
# ------------------------------------------------------------------
async def getattr(self, inode: InodeT, ctx=None) -> pyfuse3.EntryAttributes:
logger.debug("getattr inode:%d", inode)
ent = self._get_ent_from_inode(inode)
note = self._get_ent_note(ent)
return self._get_attr(ent, note)
async def lookup(self, parent_inode: InodeT, name: bytes, ctx=None) -> pyfuse3.EntryAttributes:
logger.debug("lookup inode:%d name:%s", parent_inode, name)
ent, child = self._lookup_child(parent_inode, name)
self._ref_entry(ent)
return self._get_attr(ent, child)
async def setattr(self, inode: InodeT, attr, fields, fh: FileHandleT | None, ctx) -> pyfuse3.EntryAttributes:
ent = self._get_ent_from_inode(inode)
note = self._get_ent_note(ent)
if fields.update_size:
if not hasattr(note, 'mkdir'):
current = note.read_content()
new_size = attr.st_size
if new_size < len(current):
note.write_content(current[:new_size])
elif new_size > len(current):
note.write_content(current + b"\x00" * (new_size - len(current)))
else:
raise pyfuse3.FUSEError(errno.EINVAL)
return self._get_attr(ent, note)
def forget(self, inode_list: Sequence[Tuple[InodeT, int]]) -> None:
for inode, nlookup in inode_list:
try:
logger.debug("deref inode:%d count:%d", inode, nlookup)
self._deref_entry(self._get_ent_from_inode(inode), nlookup)
except pyfuse3.FUSEError as e:
logger.warning("Failed to deref inode %d: %s", inode, str(e))
# ------------------------------------------------------------------
# Directory ops
# ------------------------------------------------------------------
async def opendir(self, inode: InodeT, ctx) -> FileHandleT:
logger.debug("opendir inode:%d", inode)
handle = self._open_handle(inode)
if not isinstance(handle, _TroveHandleTree):
logger.debug("attempted opendir on %d not a tree", inode)
self._close_handle(handle)
raise pyfuse3.FUSEError(errno.ENOTDIR)
logger.debug("opened dir inode %d -> handle %d", inode, handle.handle_id)
return handle.handle_id
async def readdir(self, fh: FileHandleT, start_id: int, token) -> None:
logger.debug("readdir %d start_id %d", fh, start_id)
handle = self._get_handle(fh)
note = handle.note
entries = list(note.children()) # [(name, object_id), ...]
for idx, entry in enumerate(entries):
if idx < start_id:
continue
child = self._trove.get_raw_note(entry.object_id)
if child is None:
continue
child_ent = self._create_get_ent_from_note(child)
attr = self._get_attr(child_ent, child)
self._ref_entry(child_ent)
if not pyfuse3.readdir_reply(token, entry.name.encode(), attr, idx + 1):
break
async def releasedir(self, fh: FileHandleT) -> None:
logger.debug("releasedir %d", fh)
handle = self._get_handle(fh)
self._close_handle(handle)
async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes:
logger.debug("mkdir inode:%d name:%s", parent_inode, name)
parent = self._get_inode_note(parent_inode)
# TODO: consider implications here, maybe look at ext on dir for mime?
try:
note = tr.new_child(parent, name.decode(), mime='inode/directory')
except tr.ErrorWithErrno as e:
raise pyfuse3.FUSEError(e.errno) from None
# Grab entity for kernel
ent = self._create_get_ent_from_note(note)
self._ref_entry(ent)
return self._get_attr(ent, note)
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
parent = self._get_inode_note(parent_inode)
try:
parent.rm_child(name.decode(), False)
except tr.ErrorWithErrno as e:
raise pyfuse3.FUSEError(e.errno) from None
# ------------------------------------------------------------------
# File ops
# ------------------------------------------------------------------
async def open(self, inode: InodeT, flags, ctx) -> pyfuse3.FileInfo:
handle = self._open_handle(inode)
# FIXME: Add support for inode tree and inode content
# if isinstance(handle.note, TroveTree):
# self._close_handle(handle)
# raise pyfuse3.FUSEError(errno.EISDIR)
return pyfuse3.FileInfo(fh=handle.handle_id)
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
logger.debug("create inode:%d name:%s", parent_inode, name)
parent = self._get_inode_note(parent_inode)
# TODO: handle mode
# TODO: handle flags
name_str = name.decode()
note = tr.new_child(parent, name_str)
ent = self._create_get_ent_from_note(note)
self._ref_entry(ent)
handle = self._open_handle(ent.sys_inode)
attr = self._get_attr(ent, note)
return pyfuse3.FileInfo(fh=handle.handle_id), attr
async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes:
logger.debug("read fh:%d offset:%d length:%d", fh, offset, length)
handle = self._get_handle(fh)
note = handle.note
if not hasattr(note, 'mkdir'):
return note.read_content()[offset:offset + length]
raise pyfuse3.FUSEError(errno.EBADF)
async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int:
handle = self._get_handle(fh)
note = handle.note
if not hasattr(note, 'mkdir'):
existing = note.read_content()
if offset > len(existing):
existing = existing + b"\x00" * (offset - len(existing))
note.write_content(existing[:offset] + data + existing[offset + len(data):])
return len(data)
async def release(self, fh: FileHandleT) -> None:
handle = self._get_handle(fh)
self._close_handle(handle)
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
try:
parent_note = self._get_inode_note(parent_inode)
name_str = name.decode()
parent_note.rm_child(name_str, False)
except tr.ErrorWithErrno as e:
raise pyfuse3.FUSEError(e.errno) from None
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
# # Decode / validate names
name_new_str = name_new.decode()
name_old_str = name_old.decode()
# Grab the parents
new_parent = self._get_inode_note(parent_inode_new)
old_parent = self._get_inode_note(parent_inode_old)
# Move!
try:
self._trove.move(old_parent, name_old_str, new_parent, name_new_str, overwrite=True)
except tr.ErrorWithErrno as e:
raise pyfuse3.FUSEError(e.errno) from None
# ------------------------------------------------------------------
# Serve
# ------------------------------------------------------------------
async def _run(ops: TroveFuseOps, mountpoint: str) -> None:
logging.basicConfig(level=logging.DEBUG)
options = set(pyfuse3.default_options)
options.add("fsname=trove")
pyfuse3.init(ops, mountpoint, options)
try:
await pyfuse3.main()
finally:
pyfuse3.close()
def serve(trove: Trove, mountpoint: str) -> None:
"""
Mount a Trove store at mountpoint and serve until KeyboardInterrupt.
Runs a trio event loop internally.
"""
ops = TroveFuseOps(trove)
try:
trio.run(_run, ops, mountpoint)
except KeyboardInterrupt:
pass