trove/trovedb/fs.py

257 lines
8.9 KiB
Python

import os
import sqlite3
import tempfile
from pathlib import Path
from typing import Optional, Dict, List, Self, Iterable
from .trove import Note, Trove, TreeNote, BlobNote, Blob, Tree, BadNoteType, TreeEntry, NoteNotFound
class FSNote(Note):
def __init__(self, trove: 'FSTrove', *, inode: int | None = None, path: Path | None = None):
self._trove: FSTrove = trove
self._fs_path: Path | None = path
self._inode: int | None = inode
if self._fs_path is not None:
inode = self._fs_path.stat().st_ino
if self._inode != inode and self._inode is not None:
raise ValueError(f"Inconsistent inode: {self._inode} vs {inode}")
self._inode = inode
@property
def object_id(self) -> int:
if self._inode is None:
raise ValueError("Note not yet saved to disk")
return self._inode
@property
def _path(self) -> Path:
if self._fs_path is not None:
if self._fs_path.exists():
return self._fs_path
self._fs_path = None
if self._inode is None:
raise ValueError("Note not yet saved to disk")
self._fs_path = self._trove.get_path_by_inode(self._inode)
assert self._fs_path is not None
return self._fs_path
def get_raw_metadata(self, key: str) -> Optional[bytes]:
return self._trove._get_metadata(self._inode, key)
def set_raw_metadata(self, key: str, value: bytes) -> None:
self._trove._set_metadata(self._inode, key, value)
class FSBlobNote(FSNote, BlobNote):
def read(self) -> bytes:
if self._inode is None:
return b""
return self._path.read_bytes()
def write(self, data: bytes) -> None:
self._path.write_bytes(data)
# Update cache just in case inode changed (some editors do this)
try:
new_inode = self._path.stat().st_ino
if new_inode != self._inode:
self._trove._update_cache(new_inode, self._path)
self._inode = new_inode
except OSError:
pass
class FSTreeNote(FSNote, TreeNote):
def link(self, name: str, note: Note):
if not isinstance(note, FSBlobNote):
raise BadNoteType("Only blob notes can be linked")
target_path = self._path / name
if target_path.exists():
self.unlink(name)
note_path = note._path
# If the note is in .working, move it to the new location.
if self._trove.working in note_path.parents:
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
else:
# If it's already linked somewhere, create a hard link if it's a file.
if note_path.is_file():
try:
os.link(note_path, target_path)
except OSError:
# Fallback to rename if link fails (e.g. cross-device, though we assume single FS)
os.rename(note_path, target_path)
else:
# Directories cannot be hardlinked.
# We move it to the new location.
os.rename(note_path, target_path)
self._trove._update_cache(note.object_id, target_path)
def unlink(self, name: str):
target_path = self._path / name
if not target_path.exists():
return
if target_path.is_dir():
target_path.rmdir()
else:
target_path.unlink()
def mkdir(self, name: str) -> 'FSTreeNote':
target_path = self._path / name
target_path.mkdir(exist_ok=True)
inode = target_path.stat().st_ino
self._trove._update_cache(inode, target_path)
return FSTreeNote(self._trove, inode=inode, path=target_path)
def entries(self) -> Iterable[TreeEntry]:
try:
for item in self._path.iterdir():
if item.name == ".trove":
continue
inode = item.stat().st_ino
self._trove._update_cache(inode, item)
yield TreeEntry(name=item.name, object_id=inode)
except OSError:
pass
def list(self) -> dict[str, int]:
res = {}
try:
for item in self._path.iterdir():
if item.name == ".trove":
continue
res[item.name] = item.stat().st_ino
self._trove._update_cache(res[item.name], item)
except OSError:
pass
return res
def child(self, name: str) -> Note:
"""Retrieve a child not by name."""
target_path = self._path / name
return self._trove.get_raw_note_by_path(target_path)
class FSTrove(Trove):
def __init__(self, root: str | Path):
self.root = Path(root).absolute()
self._root_inode = self.root.stat().st_ino
self.dot_trove = self.root / ".trove"
self.working = self.dot_trove / ".working"
self.dot_trove.mkdir(exist_ok=True)
self.working.mkdir(exist_ok=True)
db_path = self.dot_trove / "trovecache.db"
self.con = sqlite3.connect(str(db_path))
self._init_db()
@classmethod
def open(cls, path: str | Path, create: bool = False) -> 'FSTrove':
p = Path(path)
if not p.exists():
if not create:
raise FileNotFoundError(f"Root path not found: {p}")
p.mkdir(parents=True)
return cls(p)
def _init_db(self):
self.con.execute("CREATE TABLE IF NOT EXISTS cache (inode INTEGER PRIMARY KEY, path TEXT)")
self.con.execute("CREATE TABLE IF NOT EXISTS metadata (inode INTEGER, key TEXT, value BLOB, PRIMARY KEY(inode, key))")
self.con.commit()
def _update_cache(self, inode: int, path: Path):
try:
rel_path = path.relative_to(self.root)
path_str = str(rel_path)
if path_str == ".":
path_str = ""
except ValueError:
# Path not under root, maybe it's the root itself?
if path == self.root:
path_str = ""
else:
return # Not under root, don't cache
self.con.execute("INSERT OR REPLACE INTO cache (inode, path) VALUES (?, ?)", (inode, path_str))
self.con.commit()
def get_path_by_inode(self, inode: int) -> Optional[Path]:
if inode == self._root_inode:
return self.root
row = self.con.execute("SELECT path FROM cache WHERE inode = ?", (inode,)).fetchone()
if row:
p = self.root / row[0]
try:
if p.exists() and p.stat().st_ino == inode:
return p
except OSError:
pass
# search
for root_dir, dirs, files in os.walk(self.root):
# Skip .trove
if ".trove" in dirs:
dirs.remove(".trove")
for name in dirs + files:
p = Path(root_dir) / name
try:
st = p.stat()
if st.st_ino == inode:
self._update_cache(inode, p)
return p
except OSError:
continue
return None
def get_raw_note_by_path(self, target_path: Path) -> Note:
if not target_path.exists():
raise NoteNotFound(target_path.relative_to(self.root))
note_id = target_path.stat().st_ino
if target_path.is_dir():
return FSTreeNote(self, inode=note_id, path=target_path)
else:
return FSBlobNote(self, inode=note_id, path=target_path)
def get_raw_note(self, note_id: int) -> Note:
p = self.get_path_by_inode(note_id)
if not p:
raise NoteNotFound(note_id)
return self.get_raw_note_by_path(p)
def create_blob(self, data: bytes | None = None) -> BlobNote:
fd, temp_path = tempfile.mkstemp(dir=self.working)
try:
if data:
os.write(fd, data)
finally:
os.close(fd)
p = Path(temp_path)
inode = p.stat().st_ino
self._update_cache(inode, p)
return FSBlobNote(self, inode=inode, path=p)
def get_root(self) -> TreeNote:
return FSTreeNote(self, inode=self._root_inode, path=self.root)
def _get_metadata(self, inode: int, key: str) -> Optional[bytes]:
row = self.con.execute("SELECT value FROM metadata WHERE inode = ? AND key = ?", (inode, key)).fetchone()
return row[0] if row else None
def _set_metadata(self, inode: int, key: str, value: bytes):
self.con.execute("INSERT OR REPLACE INTO metadata (inode, key, value) VALUES (?, ?, ?)", (inode, key, value))
self.con.commit()
def close(self):
self.con.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()