trove/trovedb/fs.py

278 lines
9.5 KiB
Python

import os
import sqlite3
import tempfile
import datetime as dt
from pathlib import Path
from typing import Optional, Dict, List, Self, Iterable
from .trove import Note, Trove, TreeNote, BadNoteType, TreeEntry, NoteNotFound
from . import fs_util as fsu
class FSNote(Note):
def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None):
self._trove: FSTrove = trove
self._fs_path: Path | None = path
self._inode: int | None = inode
if self._fs_path is not None:
inode = self._fs_path.stat().st_ino
if self._inode != inode and self._inode is not None:
raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}")
self._inode = inode
@property
def object_id(self) -> int:
if self._inode is None:
raise ValueError("Note not yet saved to disk")
return self._inode
@property
def mtime(self):
"""Return modification time as datetime."""
stat = self._path.stat()
return dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc)
@property
def readonly(self) -> bool:
"""Check if the note is readonly based on file permissions."""
if self._inode is None:
return False
return not os.access(self._path, os.W_OK)
@property
def mime(self) -> str:
"""Return MIME type, defaulting to generic binary stream."""
return "application/octet-stream"
@property
def _path(self) -> Path:
if self._fs_path is not None:
if self._fs_path.exists():
return self._fs_path
self._fs_path = None
if self._inode is None:
raise ValueError("Note not yet saved to disk")
self._fs_path = self._trove.get_path_by_inode(self._inode)
assert self._fs_path is not None
return self._fs_path
def get_raw_metadata(self, key: str) -> Optional[bytes]:
return self._trove._get_metadata(self._inode, key)
def set_raw_metadata(self, key: str, value: bytes) -> None:
self._trove._set_metadata(self._inode, key, value)
def read_content(self) -> bytes:
"""Read the raw content of the note."""
content_file = fsu.get_content_path(self._path)
if content_file.exists():
return content_file.read_bytes()
return b""
def write_content(self, data:bytes) -> None:
"""Write the raw content of the note."""
content_file = fsu.get_content_path(self._path)
content_file.write_bytes(data)
class FSTreeNote(FSNote, TreeNote):
@property
def mime(self) -> str:
"""Return MIME type for directory/tree nodes."""
return "inode/directory"
def link(self, name: str, note: Note):
if not isinstance(note, FSNote):
raise BadNoteType("Only blob notes can be linked")
target_path = self._path / name
if target_path.exists():
self.unlink(name)
note_path = note._path
# If the note is in .working, move it to the new location.
if self._trove.working in note_path.parents:
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
else:
# If it's already linked somewhere, create a hard link if it's a file.
if note_path.is_file():
try:
os.link(note_path, target_path)
except OSError:
# Fallback to rename if link fails (e.g. cross-device, though we assume single FS)
os.rename(note_path, target_path)
else:
# Directories cannot be hardlinked.
# We move it to the new location.
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
def unlink(self, name: str):
target_path = self._path / name
if not target_path.exists():
return
if target_path.is_dir():
target_path.rmdir()
else:
target_path.unlink()
def mkdir(self, name: str) -> 'FSTreeNote':
target_path = self._path / name
target_path.mkdir(exist_ok=True)
inode = target_path.stat().st_ino
self._trove._update_cache(inode, target_path)
return FSTreeNote(self._trove, inode=inode, path=target_path)
def entries(self) -> Iterable[TreeEntry]:
try:
for item in self._path.iterdir():
if item.name == ".trove":
continue
inode = item.stat().st_ino
self._trove._update_cache(inode, item)
yield TreeEntry(name=item.name, object_id=inode)
except OSError:
pass
def list(self) -> dict[str, int]:
res = {}
try:
for item in self._path.iterdir():
if item.name == ".trove":
continue
res[item.name] = item.stat().st_ino
self._trove._update_cache(res[item.name], item)
except OSError:
pass
return res
def child(self, name: str) -> Note:
"""Retrieve a child not by name."""
target_path = self._path / name
return self._trove.get_raw_note_by_path(target_path)
class FSTrove(Trove):
def __init__(self, root: str | Path):
self.root = Path(root).absolute()
self._root_inode = self.root.stat().st_ino
self.dot_trove = self.root / ".trove"
self.working = self.dot_trove / ".working"
self.dot_trove.mkdir(exist_ok=True)
self.working.mkdir(exist_ok=True)
db_path = self.dot_trove / "trovecache.db"
self.con = sqlite3.connect(str(db_path))
self._init_db()
@classmethod
def open(cls, path: str | Path, create: bool = False) -> 'FSTrove':
p = Path(path)
if not p.exists():
if not create:
raise FileNotFoundError(f"Root path not found: {p}")
p.mkdir(parents=True)
return cls(p)
def _init_db(self):
self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)")
self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))")
self.con.commit()
def _update_cache(self, inode: int, path: Path):
try:
rel_path = path.relative_to(self.root)
path_str = str(rel_path)
if path_str == ".":
path_str = ""
except ValueError:
# Path not under root, maybe it's the root itself?
if path == self.root:
path_str = ""
else:
return # Not under root, don't cache
self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str))
self.con.commit()
def get_path_by_inode(self, inode: int) -> Optional[Path]:
if inode == self._root_inode:
return self.root
row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone()
if row:
p = self.root / row[0]
try:
if p.exists() and p.stat().st_ino == inode:
return p
except OSError:
pass
# search
for root_dir, dirs, files in os.walk(self.root):
# Skip .trove
if ".trove" in dirs:
dirs.remove(".trove")
for name in dirs + files:
p = Path(root_dir) / name
try:
st = p.stat()
if st.st_ino == inode:
self._update_cache(inode, p)
return p
except OSError:
continue
return None
def get_raw_note_by_path(self, target_path: Path) -> Note:
if not target_path.exists():
raise NoteNotFound(target_path.relative_to(self.root))
note_id = target_path.stat().st_ino
if target_path.is_dir():
return FSTreeNote(self, inode=note_id, path=target_path)
else:
return FSNote(self, inode=note_id, path=target_path)
def get_raw_note(self, note_id: int) -> Note:
p = self.get_path_by_inode(note_id)
if not p:
raise NoteNotFound(note_id)
return self.get_raw_note_by_path(p)
def create_blob(self, data: bytes | None = None) -> Note:
fd, temp_path = tempfile.mkstemp(dir=self.working)
try:
if data:
os.write(fd, data)
finally:
os.close(fd)
p = Path(temp_path)
inode = p.stat().st_ino
self._update_cache(inode, p)
return FSNote(self, inode=inode, path=p)
def get_root(self) -> TreeNote:
return FSTreeNote(self, inode=self._root_inode, path=self.root)
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone()
return row[0] if row else None
def _set_metadata(self, inode: int, key: str, value: bytes):
self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value))
self.con.commit()
def close(self):
self.con.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()