trove/trovedb/fs.py

279 lines
9.5 KiB
Python
Raw Normal View History

2026-03-19 22:43:11 -05:00
import os
import sqlite3
import tempfile
import datetime as dt
2026-03-19 22:43:11 -05:00
from pathlib import Path
from typing import Optional, Dict, List, Self, Iterable
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound
from . import fs_util as fsu
2026-03-19 22:43:11 -05:00
class FSNote(Note):
def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None):
self._trove: FSTrove = trove
self._fs_path: Path | None = path
self._inode: int | None = inode
if self._fs_path is not None:
inode = self._fs_path.stat().st_ino
2026-03-21 22:25:32 -05:00
if self._inode != inode and self._inode is not None:
2026-03-19 22:43:11 -05:00
raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}")
self._inode = inode
@property
def object_id(self) -> int:
if self._inode is None:
raise ValueError("Note not yet saved to disk")
return self._inode
@property
def mtime(self):
"""Return modification time as datetime."""
stat = self._path.stat()
return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc)
@property
def readonly(self) -> bool:
"""Check if the note is readonly based on file permissions."""
if self._inode is None:
return False
return not os.access(self._path, os.W_OK)
@property
def mime(self) -> str:
"""Return MIME type, defaulting to generic binary stream."""
return "application/octet-stream"
2026-03-19 22:43:11 -05:00
@property
def _path(self) -> Path:
if self._fs_path is not None:
if self._fs_path.exists():
return self._fs_path
self._fs_path = None
2026-03-19 22:43:11 -05:00
if self._inode is None:
raise ValueError("Note not yet saved to disk")
self._fs_path = self._trove.get_path_by_inode(self._inode)
assert self._fs_path is not None
return self._fs_path
def get_raw_metadata(self, key: str) -> Optional[bytes]:
return self._trove._get_metadata(self._inode, key)
def set_raw_metadata(self, key: str, value: bytes) -> None:
self._trove._set_metadata(self._inode, key, value)
def read_content(self) -> bytes:
"""Read the raw content of the note."""
content_file = fsu.get_content_path(self._path)
if content_file.exists():
return content_file.read_bytes()
return b""
2026-03-19 22:43:11 -05:00
def write_content(self, data:bytes) -> None:
"""Write the raw content of the note."""
content_file = fsu.get_content_path(self._path)
content_file.write_bytes(data)
2026-03-19 22:43:11 -05:00
class FSTreeNote(FSNote, TreeNote):
@property
def mime(self) -> str:
"""Return MIME type for directory/tree nodes."""
return "inode/directory"
2026-03-19 22:43:11 -05:00
def link(self, name: str, note: Note):
if not isinstance(note, FSNote):
2026-03-19 22:43:11 -05:00
raise BadNoteType("Only blob notes can be linked")
target_path = self._path / name
if target_path.exists():
self.unlink(name)
note_path = note._path
# If the note is in .working, move it to the new location.
if self._trove.working in note_path.parents:
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
else:
# If it's already linked somewhere, create a hard link if it's a file.
if note_path.is_file():
try:
os.link(note_path, target_path)
except OSError:
# Fallback to rename if link fails (e.g. cross-device, though we assume single FS)
os.rename(note_path, target_path)
else:
# Directories cannot be hardlinked.
# We move it to the new location.
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
def unlink(self, name: str):
target_path = self._path / name
if not target_path.exists():
return
if target_path.is_dir():
target_path.rmdir()
else:
target_path.unlink()
def mkdir(self, name: str) -> 'FSTreeNote':
target_path = self._path / name
target_path.mkdir(exist_ok=True)
inode = target_path.stat().st_ino
self._trove._update_cache(inode, target_path)
return FSTreeNote(self._trove, inode=inode, path=target_path)
def entries(self) -> Iterable[TreeEntry]:
try:
for item in self._path.iterdir():
if item.name == ".trove":
continue
inode = item.stat().st_ino
self._trove._update_cache(inode, item)
yield TreeEntry(name=item.name, object_id=inode)
except OSError:
pass
2026-03-19 22:43:11 -05:00
def list(self) -> dict[str, int]:
res = {}
try:
for item in self._path.iterdir():
if item.name == ".trove":
continue
res[item.name] = item.stat().st_ino
self._trove._update_cache(res[item.name], item)
except OSError:
pass
return res
def child(self, name: str) -> Note:
"""Retrieve a child not by name."""
target_path = self._path / name
return self._trove.get_raw_note_by_path(target_path)
2026-03-19 22:43:11 -05:00
class FSTrove(Trove):
def __init__(self, root: str | Path):
self.root = Path(root).absolute()
self._root_inode = self.root.stat().st_ino
2026-03-19 22:43:11 -05:00
self.dot_trove = self.root / ".trove"
self.working = self.dot_trove / ".working"
self.dot_trove.mkdir(exist_ok=True)
self.working.mkdir(exist_ok=True)
db_path = self.dot_trove / "trovecache.db"
self.con = sqlite3.connect(str(db_path))
self._init_db()
@classmethod
def open(cls, path: str | Path, create: bool = False) -> 'FSTrove':
p = Path(path)
if not p.exists():
if not create:
raise FileNotFoundError(f"Root path not found: {p}")
p.mkdir(parents=True)
return cls(p)
def _init_db(self):
self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)")
self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))")
self.con.commit()
def _update_cache(self, inode: int, path: Path):
try:
rel_path = path.relative_to(self.root)
path_str = str(rel_path)
if path_str == ".":
path_str = ""
except ValueError:
# Path not under root, maybe it's the root itself?
if path == self.root:
path_str = ""
else:
return # Not under root, don't cache
self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str))
self.con.commit()
def get_path_by_inode(self, inode: int) -> Optional[Path]:
if inode == self._root_inode:
2026-03-19 22:43:11 -05:00
return self.root
row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone()
if row:
p = self.root / row[0]
try:
if p.exists() and p.stat().st_ino == inode:
return p
except OSError:
pass
# search
for root_dir, dirs, files in os.walk(self.root):
# Skip .trove
if ".trove" in dirs:
dirs.remove(".trove")
for name in dirs + files:
p = Path(root_dir) / name
try:
st = p.stat()
if st.st_ino == inode:
self._update_cache(inode, p)
return p
except OSError:
continue
return None
def get_raw_note_by_path(self, target_path: Path) -> Note:
if not target_path.exists():
raise NoteNotFound(target_path.relative_to(self.root))
note_id = target_path.stat().st_ino
if target_path.is_dir():
return FSTreeNote(self, inode=note_id, path=target_path)
else:
return FSNote(self, inode=note_id, path=target_path)
def get_raw_note(self, note_id: int) -> Note:
2026-03-19 22:43:11 -05:00
p = self.get_path_by_inode(note_id)
if not p:
raise NoteNotFound(note_id)
return self.get_raw_note_by_path(p)
2026-03-19 22:43:11 -05:00
def create_blob(self, data: bytes | None = None) -> Note:
2026-03-19 22:43:11 -05:00
fd, temp_path = tempfile.mkstemp(dir=self.working)
try:
if data:
os.write(fd, data)
finally:
os.close(fd)
p = Path(temp_path)
inode = p.stat().st_ino
self._update_cache(inode, p)
return FSNote(self, inode=inode, path=p)
2026-03-19 22:43:11 -05:00
def get_root(self) -> TreeNote:
return FSTreeNote(self, inode=self._root_inode, path=self.root)
2026-03-19 22:43:11 -05:00
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone()
return row[0] if row else None
def _set_metadata(self, inode: int, key: str, value: bytes):
self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value))
self.con.commit()
def close(self):
self.con.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()