439 lines
16 KiB
Python
439 lines
16 KiB
Python
"""
|
|
fuse.py — FUSE filesystem layer for Trove, backed by pyfuse3 + trio.
|
|
|
|
Blob objects are exposed as regular files.
|
|
Tree objects are exposed as directories.
|
|
|
|
Inode numbers map 1:1 to Trove object IDs (NODE_ROOT_ID == pyfuse3 root inode == 1).
|
|
|
|
Entry point: serve(trove, mountpoint)
|
|
"""
|
|
|
|
import errno
|
|
import os
|
|
import stat
|
|
import time
|
|
import logging
|
|
from typing import Sequence, Tuple, cast
|
|
|
|
import pyfuse3
|
|
import trio
|
|
from pyfuse3 import InodeT, FileHandleT
|
|
|
|
from trovedb.trove import Trove, Note, Tree as TroveTree, TreeNote, Blob as TroveBlob, ObjectId, TreeExists
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class _TroveEntry:
|
|
__slots__ = [ 'object_id', 'sys_inode', 'ref_count' ]
|
|
def __init__(self, sys_inode: InodeT, object_id: ObjectId | None):
|
|
self.object_id: ObjectId | None = object_id
|
|
self.sys_inode: InodeT = sys_inode
|
|
self.ref_count = 0
|
|
|
|
def ref(self) -> None:
|
|
self.ref_count += 1
|
|
|
|
def deref(self, count: int = 1) -> bool:
|
|
assert self.ref_count > 0
|
|
self.ref_count -= count
|
|
return self.ref_count <= 0
|
|
|
|
class _TroveHandle:
|
|
__slots__ = [ 'inode_id', 'ref_count', 'note', 'handle_id' ]
|
|
def __init__(self, inode_id: InodeT, handle_id: FileHandleT, note: Note):
|
|
self.inode_id = inode_id
|
|
self.handle_id = handle_id
|
|
self.note = note
|
|
|
|
class _TroveHandleTree(_TroveHandle):
|
|
@property
|
|
def tree(self) -> TreeNote:
|
|
return cast(TreeNote, self.note)
|
|
|
|
|
|
class TroveFuseOps(pyfuse3.Operations):
|
|
def __init__(self, trove: Trove):
|
|
super().__init__()
|
|
self._trove = trove
|
|
|
|
# Inode Cache
|
|
self._next_inode = 2
|
|
self._inode_cache: dict[InodeT, _TroveEntry] = {}
|
|
self._inode_reverse_cache: dict[ObjectId, InodeT] = {}
|
|
|
|
# Cache and Lock Root Inode
|
|
node_root = trove.get_root()
|
|
self._inode_cache[pyfuse3.ROOT_INODE] = _TroveEntry(pyfuse3.ROOT_INODE, node_root.object_id)
|
|
self._inode_reverse_cache[node_root.object_id] = pyfuse3.ROOT_INODE
|
|
self._inode_cache[pyfuse3.ROOT_INODE].ref()
|
|
self._inode_cache[pyfuse3.ROOT_INODE].ref()
|
|
|
|
# Handles
|
|
self._next_handle = 2
|
|
self._handles: dict[int, _TroveHandle] = {}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Entry Management [entries relate inode to Note]
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_ent_from_inode(self, inode: InodeT) -> _TroveEntry:
|
|
"""Get entry from predefined inode"""
|
|
if inode not in self._inode_cache:
|
|
logger.debug("inode not found in cache: %d", inode)
|
|
raise pyfuse3.FUSEError(errno.ENOENT)
|
|
value = self._inode_cache[inode]
|
|
return value
|
|
|
|
def _create_get_ent_from_note(self, note: Note) -> _TroveEntry:
|
|
"""Create entry from note. Inode is reserved but not saved in cache."""
|
|
if note.object_id in self._inode_reverse_cache:
|
|
sys_inode = self._inode_reverse_cache[note.object_id]
|
|
return self._inode_cache[sys_inode]
|
|
sys_inode = InodeT(self._next_inode)
|
|
self._next_inode += 1
|
|
return _TroveEntry(sys_inode=sys_inode, object_id=note.object_id)
|
|
|
|
def _ref_entry(self, ent: _TroveEntry) -> None:
|
|
"""Ref entry. If it is not in cache, it is added to cache."""
|
|
if ent.sys_inode not in self._inode_cache:
|
|
self._inode_cache[ent.sys_inode] = ent
|
|
self._inode_reverse_cache[ent.object_id] = ent.sys_inode
|
|
ent.ref()
|
|
|
|
def _deref_entry(self, ent: _TroveEntry, count: int = 1) -> None:
|
|
"""Deref entry. Remove from cache if count hits 0"""
|
|
if ent.deref(count):
|
|
if ent.sys_inode in self._inode_cache:
|
|
logger.debug("free inode: %d", ent.sys_inode)
|
|
del self._inode_cache[ent.sys_inode]
|
|
del self._inode_reverse_cache[ent.object_id]
|
|
|
|
def _get_inode_note(self, inode: InodeT) -> Note:
|
|
"""Get note from Inode, inode must be reserved"""
|
|
ent = self._get_ent_from_inode(inode)
|
|
return self._get_ent_note(ent)
|
|
|
|
def _get_ent_note(self, ent: _TroveEntry) -> Note:
|
|
"""Get note from entry."""
|
|
note = self._trove.get_raw_note(ent.object_id)
|
|
if note is None:
|
|
logger.debug("note lookup failed: %s", ent.object_id)
|
|
raise pyfuse3.FUSEError(errno.ENOENT)
|
|
return note
|
|
|
|
def _lookup_update_object(self, object_id: ObjectId) -> _TroveEntry:
|
|
if object_id in self._inode_reverse_cache:
|
|
inode = self._inode_reverse_cache[object_id]
|
|
return self._lookup_existing(inode)
|
|
else:
|
|
inode_id = InodeT(self._next_inode)
|
|
self._next_inode += 1
|
|
inode = _TroveEntry(sys_inode=inode_id, object_id=object_id)
|
|
self._inode_cache[inode_id] = inode
|
|
self._inode_reverse_cache[object_id] = inode_id
|
|
return inode
|
|
|
|
def _lookup_child(self, parent_inode: InodeT, name: bytes) -> Tuple[_TroveEntry, Note]:
|
|
parent = self._get_inode_note(parent_inode)
|
|
if not isinstance(parent, TreeNote):
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
try:
|
|
note = parent.child(name.decode())
|
|
except KeyError:
|
|
logger.debug("lookup failed: %d -> %s", parent_inode, name.decode())
|
|
raise pyfuse3.FUSEError(errno.ENOENT) from None
|
|
ent = self._create_get_ent_from_note(note)
|
|
return ent, note
|
|
|
|
|
|
def _get_sys_inode_id(self, object_id: ObjectId) -> InodeT:
|
|
if object_id in self._inode_reverse_cache:
|
|
return self._inode_reverse_cache[object_id]
|
|
else:
|
|
raise pyfuse3.FUSEError(errno.ENOENT)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Handle Management
|
|
# ------------------------------------------------------------------
|
|
|
|
def _open_handle(self, inode: InodeT) -> _TroveHandle:
|
|
note = self._get_inode_note(inode)
|
|
|
|
handle_id = FileHandleT(self._next_handle)
|
|
self._next_handle += 1
|
|
|
|
handle: _TroveHandle
|
|
if isinstance(note, TreeNote):
|
|
handle = _TroveHandleTree(inode_id=inode, handle_id=handle_id, note=note)
|
|
else:
|
|
handle = _TroveHandle(inode_id=inode, handle_id=handle_id, note=note)
|
|
|
|
self._handles[handle_id] = handle
|
|
return handle
|
|
|
|
def _get_handle(self, handle_id: FileHandleT) -> _TroveHandle:
|
|
if not handle_id in self._handles:
|
|
raise pyfuse3.FUSEError(errno.EBADF)
|
|
return self._handles[handle_id]
|
|
|
|
def _close_handle(self, handle: _TroveHandle):
|
|
del self._handles[handle.handle_id]
|
|
|
|
def _get_attr(self, ent: _TroveEntry, note: Note) -> pyfuse3.EntryAttributes:
|
|
# Determine basic information
|
|
is_tree = True
|
|
size = 0
|
|
if isinstance(note, TroveBlob):
|
|
size = len(note.read())
|
|
is_tree = False
|
|
|
|
# Create and fill attr structure
|
|
attr = pyfuse3.EntryAttributes()
|
|
attr.st_ino = ent.sys_inode
|
|
attr.st_nlink = 1
|
|
attr.st_uid = os.getuid()
|
|
attr.st_gid = os.getgid()
|
|
|
|
mtime_ns = int(note.mtime.timestamp() * 1e9)
|
|
now_ns = int(time.time() * 1e9)
|
|
attr.st_atime_ns = now_ns
|
|
attr.st_mtime_ns = mtime_ns
|
|
attr.st_ctime_ns = mtime_ns
|
|
attr.generation = 0
|
|
attr.entry_timeout = 5.0
|
|
attr.attr_timeout = 5.0
|
|
|
|
# Determine permissions based on readonly property
|
|
if is_tree:
|
|
mode = 0o755 if not note.readonly else 0o555
|
|
attr.st_mode = stat.S_IFDIR | mode
|
|
attr.st_size = 0
|
|
attr.st_blksize = 512
|
|
attr.st_blocks = 0
|
|
else:
|
|
mode = 0o644 if not note.readonly else 0o444
|
|
attr.st_mode = stat.S_IFREG | mode
|
|
attr.st_size = size
|
|
attr.st_blksize = 512
|
|
attr.st_blocks = (size + 511) // 512
|
|
return attr
|
|
|
|
# ------------------------------------------------------------------
|
|
# Stat / lookup
|
|
# ------------------------------------------------------------------
|
|
|
|
async def getattr(self, inode: InodeT, ctx=None) -> pyfuse3.EntryAttributes:
|
|
logger.debug("getattr inode:%d", inode)
|
|
ent = self._get_ent_from_inode(inode)
|
|
note = self._get_ent_note(ent)
|
|
return self._get_attr(ent, note)
|
|
|
|
async def lookup(self, parent_inode: InodeT, name: bytes, ctx=None) -> pyfuse3.EntryAttributes:
|
|
logger.debug("lookup inode:%d name:%s", parent_inode, name)
|
|
ent, child = self._lookup_child(parent_inode, name)
|
|
self._ref_entry(ent)
|
|
return self._get_attr(ent, child)
|
|
|
|
async def setattr(self, inode: InodeT, attr, fields, fh: FileHandleT | None, ctx) -> pyfuse3.EntryAttributes:
|
|
ent = self._get_ent_from_inode(inode)
|
|
note = self._get_ent_note(ent)
|
|
if fields.update_size:
|
|
if isinstance(note, TroveBlob):
|
|
current = note.read()
|
|
new_size = attr.st_size
|
|
if new_size < len(current):
|
|
note.write(current[:new_size])
|
|
elif new_size > len(current):
|
|
note.write(current + b"\x00" * (new_size - len(current)))
|
|
else:
|
|
raise pyfuse3.FUSEError(errno.EINVAL)
|
|
return self._get_attr(ent, note)
|
|
|
|
def forget(self, inode_list: Sequence[Tuple[InodeT, int]]) -> None:
|
|
for inode, nlookup in inode_list:
|
|
try:
|
|
logger.debug("deref inode:%d count:%d", inode, nlookup)
|
|
self._deref_entry(self._get_ent_from_inode(inode), nlookup)
|
|
except pyfuse3.FUSEError as e:
|
|
logger.warning("Failed to deref inode %d: %s", inode, str(e))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Directory ops
|
|
# ------------------------------------------------------------------
|
|
|
|
async def opendir(self, inode: InodeT, ctx) -> FileHandleT:
|
|
handle = self._open_handle(inode)
|
|
if not isinstance(handle, _TroveHandleTree):
|
|
logger.debug("attempted opendir on %d not a tree", inode)
|
|
self._close_handle(handle)
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
logger.debug("opened dir inode %d -> handle %d", inode, handle.handle_id)
|
|
return handle.handle_id
|
|
|
|
async def readdir(self, fh: FileHandleT, start_id: int, token) -> None:
|
|
logger.debug("readdir %d start_id %d", fh, start_id)
|
|
handle = self._get_handle(fh)
|
|
note = handle.note
|
|
if not isinstance(note, TroveTree):
|
|
logger.debug("attempted readdir on %d not a tree", fh)
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
entries = list(note.list().items()) # [(name, object_id), ...]
|
|
|
|
for idx, (name, child_id) in enumerate(entries):
|
|
if idx < start_id:
|
|
continue
|
|
|
|
child = self._trove.get_raw_note(child_id)
|
|
if child is None:
|
|
continue
|
|
|
|
child_ent = self._create_get_ent_from_note(child)
|
|
attr = self._get_attr(child_ent, child)
|
|
self._ref_entry(child_ent)
|
|
|
|
if not pyfuse3.readdir_reply(token, name.encode(), attr, idx + 1):
|
|
break
|
|
|
|
async def releasedir(self, fh: FileHandleT) -> None:
|
|
logger.debug("releasedir %d", fh)
|
|
handle = self._get_handle(fh)
|
|
self._close_handle(handle)
|
|
|
|
async def mkdir(self, parent_inode: InodeT, name: bytes, mode: int, ctx) -> pyfuse3.EntryAttributes:
|
|
logger.debug("mkdir inode:%d name:%s", parent_inode, name)
|
|
# Grab parent note, verify is tree
|
|
parent = self._get_inode_note(parent_inode)
|
|
if not isinstance(parent, TreeNote):
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
# Create new directory in note
|
|
try:
|
|
new_tree: TreeNote = parent.mkdir(name.decode())
|
|
except TreeExists:
|
|
raise pyfuse3.FUSEError(errno.EEXIST) from None
|
|
# Grab entity for kernel
|
|
ent = self._create_get_ent_from_note(new_tree)
|
|
self._ref_entry(ent)
|
|
return self._get_attr(ent, new_tree)
|
|
|
|
async def rmdir(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
|
logger.debug("rmdir inode:%d name:%s", parent_inode, name)
|
|
parent = self._get_inode_note(parent_inode)
|
|
if not isinstance(parent, TreeNote):
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
try:
|
|
parent.unlink(name.decode())
|
|
except KeyError:
|
|
raise pyfuse3.FUSEError(errno.ENOENT) from None
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# File ops
|
|
# ------------------------------------------------------------------
|
|
|
|
async def open(self, inode: InodeT, flags, ctx) -> pyfuse3.FileInfo:
|
|
handle = self._open_handle(inode)
|
|
if isinstance(handle.note, TroveTree):
|
|
self._close_handle(handle)
|
|
raise pyfuse3.FUSEError(errno.EISDIR)
|
|
return pyfuse3.FileInfo(fh=handle.handle_id)
|
|
|
|
async def create(self, parent_inode: InodeT, name: bytes, mode: int, flags, ctx) -> tuple:
|
|
logger.debug("create inode:%d name:%s", parent_inode, name)
|
|
parent = self._get_inode_note(parent_inode)
|
|
if not isinstance(parent, TroveTree):
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
name_str = name.decode()
|
|
if name_str in parent.list():
|
|
raise pyfuse3.FUSEError(errno.EEXIST)
|
|
blob = self._trove.create_blob(b"")
|
|
parent.link(name_str, blob)
|
|
|
|
ent = self._create_get_ent_from_note(blob)
|
|
self._ref_entry(ent)
|
|
|
|
handle = self._open_handle(ent.sys_inode)
|
|
attr = self._get_attr(ent, blob)
|
|
return pyfuse3.FileInfo(fh=handle.handle_id), attr
|
|
|
|
async def read(self, fh: FileHandleT, offset: int, length: int) -> bytes:
|
|
logger.debug("read fh:%d offset:%d length:%d", fh, offset, length)
|
|
handle = self._get_handle(fh)
|
|
note = handle.note
|
|
if isinstance(note, TroveBlob):
|
|
return note.read()[offset:offset + length]
|
|
raise pyfuse3.FUSEError(errno.EBADF)
|
|
|
|
async def write(self, fh: FileHandleT, offset: int, data: bytes) -> int:
|
|
handle = self._get_handle(fh)
|
|
note = handle.note
|
|
if isinstance(note, TroveBlob):
|
|
existing = note.read()
|
|
if offset > len(existing):
|
|
existing = existing + b"\x00" * (offset - len(existing))
|
|
note.write(existing[:offset] + data + existing[offset + len(data):])
|
|
return len(data)
|
|
|
|
async def release(self, fh: FileHandleT) -> None:
|
|
handle = self._get_handle(fh)
|
|
self._close_handle(handle)
|
|
|
|
async def unlink(self, parent_inode: InodeT, name: bytes, ctx) -> None:
|
|
parent_note = self._get_inode_note(parent_inode)
|
|
if not isinstance(parent_note, TroveTree):
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
name_str = name.decode()
|
|
if name_str not in parent_note.list():
|
|
raise pyfuse3.FUSEError(errno.ENOENT)
|
|
parent_note.unlink(name.decode())
|
|
|
|
async def rename(self, parent_inode_old: InodeT, name_old: bytes, parent_inode_new: InodeT, name_new: bytes, flags, ctx):
|
|
# Decode / validate names
|
|
name_new_str = name_new.decode()
|
|
name_old_str = name_old.decode()
|
|
|
|
# Grab the parents
|
|
new_parent = self._get_inode_note(parent_inode_new)
|
|
if not isinstance(new_parent, TroveTree):
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
old_parent = self._get_inode_note(parent_inode_old)
|
|
if not isinstance(old_parent, TroveTree):
|
|
raise pyfuse3.FUSEError(errno.ENOTDIR)
|
|
|
|
# We want to maintain the inode - find the note via the internal entity
|
|
ent, note = self._lookup_child(parent_inode_old, name_old)
|
|
|
|
# Remove existing target
|
|
new_parent.unlink(name_new_str)
|
|
|
|
# Link to new parent, unlink from old
|
|
new_parent.link(name_new_str, note)
|
|
old_parent.unlink(name_old_str)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Serve
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _run(ops: TroveFuseOps, mountpoint: str) -> None:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
options = set(pyfuse3.default_options)
|
|
options.add("fsname=trove")
|
|
pyfuse3.init(ops, mountpoint, options)
|
|
try:
|
|
await pyfuse3.main()
|
|
finally:
|
|
pyfuse3.close()
|
|
|
|
|
|
def serve(trove: Trove, mountpoint: str) -> None:
|
|
"""
|
|
Mount a Trove store at mountpoint and serve until KeyboardInterrupt.
|
|
Runs a trio event loop internally.
|
|
"""
|
|
ops = TroveFuseOps(trove)
|
|
try:
|
|
trio.run(_run, ops, mountpoint)
|
|
except KeyboardInterrupt:
|
|
pass
|