""" Python API for accessing underlying SQlite3 File Database A Sqlite3 database provides the basic underlying interface to the underlying notes interface. The low level database is an extremely simple model using similar ideas to git storage. """ import argparse import sqlite3 import sys import uuid from typing import NamedTuple from datetime import datetime, timezone from pathlib import Path from . import trove as tr NOTE_ROOT_ID = uuid.UUID(int=0) class ObjectInfo(NamedTuple): id: uuid.UUID type: str created: datetime modified: datetime executable: bool hidden: bool _SCHEMA = """ CREATE TABLE IF NOT EXISTS objects ( id TEXT PRIMARY KEY, type TEXT NOT NULL, data BLOB, created REAL NOT NULL, modified REAL NOT NULL, executable INTEGER NOT NULL DEFAULT 0, hidden INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS metadata ( id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE, key TEXT NOT NULL, value BLOB, PRIMARY KEY (id, key) ); CREATE TABLE IF NOT EXISTS labels ( label TEXT PRIMARY KEY, id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS tree_entries ( parent_id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE, name TEXT NOT NULL, child_id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE, PRIMARY KEY (parent_id, name) ); """ type SqlObjectId = str | uuid.UUID def _now() -> float: """Current UTC time as a Unix epoch float.""" return datetime.now(timezone.utc).timestamp() def _to_datetime(ts: float) -> datetime: """Convert a Unix epoch float to a UTC datetime.""" return datetime.fromtimestamp(ts, tz=timezone.utc) def _sql_id(id: SqlObjectId | None) -> str | None: if id is None: return None return ( id if isinstance(id, str) else str(id) ) class Sqlite3Trove: def __init__(self, con: sqlite3.Connection): self._con = con self._con.execute("PRAGMA foreign_keys = ON") self._con.row_factory = sqlite3.Row # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ @classmethod def open(cls, path: str | Path, create: bool = False) -> "Sqlite3Trove": """Open an existing Trove database.""" p = Path(path) initialize = False if not p.exists(): if not create: raise FileNotFoundError(f"Database not found: {p}") initialize = True con = sqlite3.connect(str(p)) if initialize: con.executescript(_SCHEMA) con.commit() obj = cls(con) if initialize: obj._write_object(b"", "inode/directory", NOTE_ROOT_ID) return obj def close(self): self._con.close() def __enter__(self): return self def __exit__(self, *_): self.close() # ------------------------------------------------------------------ # Object info # ------------------------------------------------------------------ def get_info(self, object_id: SqlObjectId) -> ObjectInfo | None: """Return an ObjectInfo namedtuple for the object, or None if not found.""" row = self._con.execute( "SELECT id, type, created, modified, executable, hidden " "FROM objects WHERE id = ?", (_sql_id(object_id),), ).fetchone() if row is None: return None return ObjectInfo( id=uuid.UUID(row["id"]), type=row["type"], created=_to_datetime(row["created"]), modified=_to_datetime(row["modified"]), executable=bool(row["executable"]), hidden=bool(row["hidden"]), ) def is_tree(self, object_id: SqlObjectId) -> bool: """Return True if the object has any children in tree_entries.""" row = self._con.execute( "SELECT 1 FROM tree_entries WHERE parent_id = ? LIMIT 1", (_sql_id(object_id),), ).fetchone() return row is not None # ------------------------------------------------------------------ # CRUD operations # ------------------------------------------------------------------ def get_object_type(self, object_id: SqlObjectId) -> str | None: """Return the type column for an object, or None if not found.""" row = self._con.execute( "SELECT type FROM objects WHERE id = ?", (_sql_id(object_id),) ).fetchone() return row["type"] if row else None def read_object(self, object_id: SqlObjectId) -> bytes | None: """Return raw data for an object, or None if not found.""" row = self._con.execute( "SELECT data FROM objects WHERE id = ?", (_sql_id(object_id),) ).fetchone() if row is None: return None return bytes(row["data"]) if row["data"] is not None else b"" def get_mtime(self, object_id: SqlObjectId) -> datetime | None: """Return the modified timestamp for an object, or None if not found.""" row = self._con.execute( "SELECT modified FROM objects WHERE id = ?", (_sql_id(object_id),) ).fetchone() if row is None: return None return _to_datetime(row["modified"]) def read_metadata(self, object_id: SqlObjectId, key: str) -> bytes | None: """Return raw metadata value for (uuid, key), or None if not found.""" row = self._con.execute( "SELECT value FROM metadata WHERE id = ? AND key = ?", (_sql_id(object_id), key) ).fetchone() if row is None: return None return bytes(row["value"]) if row["value"] is not None else b"" def write_metadata(self, object_id: SqlObjectId, key: str, value: bytes) -> None: """Upsert a metadata row.""" self._con.execute( "INSERT OR REPLACE INTO metadata (id, key, value) VALUES (?, ?, ?)", (_sql_id(object_id), key, value), ) self._con.commit() def write_content(self, object_id: SqlObjectId, data: bytes) -> None: """Update only the data and modified timestamp. Preserves type and flags.""" self._con.execute( "UPDATE objects SET data = ?, modified = ? WHERE id = ?", (data, _now(), _sql_id(object_id)), ) self._con.commit() def _write_object( self, data: bytes, dtype: str, object_id: str | uuid.UUID | None = None, executable: bool = False, hidden: bool = False, ) -> str: """ Insert or replace an object. Returns the id. On INSERT, both created and modified are set to now. On REPLACE (existing id), created is preserved and modified is updated. """ now = _now() if object_id is None: object_id = uuid.uuid4() sid = _sql_id(object_id) assert sid is not None # Preserve created timestamp on update row = self._con.execute( "SELECT created FROM objects WHERE id = ?", (sid,) ).fetchone() created = row["created"] if row else now self._con.execute( """INSERT INTO objects (id, type, data, created, modified, executable, hidden) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET type=excluded.type, data=excluded.data, modified=excluded.modified, executable=excluded.executable, hidden=excluded.hidden""", (sid, dtype, data, created, now, int(executable), int(hidden)), ) self._con.commit() return sid def write_blob(self, data: bytes, object_id: SqlObjectId | None = None, dtype: str = "application/octet-stream", executable: bool = False, hidden: bool = False) -> str: """ Insert or replace a blob. Returns the id. Pass object_id to update an existing object. """ return self._write_object(data, dtype, _sql_id(object_id), executable=executable, hidden=hidden) def write_tree(self, data: bytes, object_id: SqlObjectId | None = None, hidden: bool = False) -> str: """Write a tree-typed object. Returns the assigned id.""" return self._write_object(data, "inode/directory", _sql_id(object_id), hidden=hidden) def set_executable(self, object_id: SqlObjectId, executable: bool) -> None: """Set or clear the executable flag on an object.""" self._con.execute( "UPDATE objects SET executable = ?, modified = ? WHERE id = ?", (int(executable), _now(), _sql_id(object_id)), ) self._con.commit() def set_hidden(self, object_id: SqlObjectId, hidden: bool) -> None: """Set or clear the hidden flag on an object.""" self._con.execute( "UPDATE objects SET hidden = ?, modified = ? WHERE id = ?", (int(hidden), _now(), _sql_id(object_id)), ) self._con.commit() def set_type(self, object_id: SqlObjectId, dtype: str) -> None: """Update the MIME type of an object.""" self._con.execute( "UPDATE objects SET type = ?, modified = ? WHERE id = ?", (dtype, _now(), _sql_id(object_id)), ) self._con.commit() def delete_object(self, object_id: SqlObjectId) -> bool: """ Delete an object and all its metadata rows. Returns True if an object was deleted, False if id not found. Foreign key cascade handles the metadata and tree_entries rows. """ cur = self._con.execute( "DELETE FROM objects WHERE id = ?", (_sql_id(object_id),) ) self._con.commit() return cur.rowcount > 0 # ------------------------------------------------------------------ # Tree entry operations # ------------------------------------------------------------------ def link(self, parent_id: SqlObjectId, name: str, child_id: SqlObjectId, overwrite: bool = True) -> None: """ Link a child object into a tree under the given name. Replaces any existing entry with the same name in this tree if overwrite=True. Both parent_id and child_id must exist in the objects table (enforced by FK constraints). """ if not overwrite: existing = self._con.execute( "SELECT 1 FROM tree_entries WHERE parent_id = ? AND name = ?", (_sql_id(parent_id), name), ).fetchone() if existing: raise tr.ErrorExists(f"Entry '{name}' already exists in tree {parent_id}") self._con.execute( "INSERT OR REPLACE INTO tree_entries (parent_id, name, child_id) " "VALUES (?, ?, ?)", (_sql_id(parent_id), name, _sql_id(child_id)), ) self._con.execute( "UPDATE objects SET modified = ? WHERE id = ?", (_now(), _sql_id(parent_id)), ) self._con.commit() def unlink(self, parent_id: SqlObjectId, name: str) -> bool: """ Remove a named entry from a tree. Returns True if an entry was removed, False if not found. Does not delete the child object itself. """ cur = self._con.execute( "DELETE FROM tree_entries WHERE parent_id = ? AND name = ?", (_sql_id(parent_id), name), ) if cur.rowcount > 0: self._con.execute( "UPDATE objects SET modified = ? WHERE id = ?", (_now(), _sql_id(parent_id)), ) self._con.commit() return cur.rowcount > 0 def list_tree(self, parent_id: SqlObjectId) -> dict[str, str]: """ Return all entries in a tree as {name: child_id}. Returns an empty dict if the tree has no entries. """ rows = self._con.execute( "SELECT name, child_id FROM tree_entries " "WHERE parent_id = ? ORDER BY name", (_sql_id(parent_id),), ).fetchall() return {row["name"]: row["child_id"] for row in rows} # ------------------------------------------------------------------ # Label operations # ------------------------------------------------------------------ def get_label(self, label: str) -> SqlObjectId | None: """ Return the ID associated with a label, or None if not found. """ row = self._con.execute( "SELECT id FROM labels WHERE label = ?", (label,) ).fetchone() if row is None: return None return uuid.UUID(row["id"]) def set_label(self, label: str, object_id: SqlObjectId) -> None: """ Set a label to point to an ID. Creates or updates the label. The ID must exist in the objects table. """ self._con.execute( "INSERT OR REPLACE INTO labels (label, id) VALUES (?, ?)", (label, _sql_id(object_id)), ) self._con.commit() def delete_label(self, label: str) -> bool: """ Delete a label. Returns True if a label was deleted, False if not found. """ cur = self._con.execute( "DELETE FROM labels WHERE label = ?", (label,) ) self._con.commit() return cur.rowcount > 0 def list_labels(self) -> list[tuple[str, str]]: """ Return all labels as a list of (label, id) tuples. """ rows = self._con.execute( "SELECT label, id FROM labels ORDER BY label" ).fetchall() return [(row["label"], row["id"]) for row in rows] def main(): """Command-line interface for TroveDB.""" parser = argparse.ArgumentParser(description="TroveDB command-line interface") parser.add_argument("database", help="Path to the database file") subparsers = parser.add_subparsers(dest="operation", required=True, help="Operation to perform") # Create database subparsers.add_parser("create", help="Create a new database") # Get object data get_parser = subparsers.add_parser("get", help="Get object data by ID") get_parser.add_argument("id", help="ID of the object to retrieve") # Info info_parser = subparsers.add_parser("info", help="Show object metadata") info_parser.add_argument("id", help="Object ID") # Write blob write_parser = subparsers.add_parser("write", help="Write data to a blob") write_parser.add_argument("data", help="Data to write (as string, will be encoded as UTF-8)") write_parser.add_argument("--id", help="ID of existing blob to update (optional)") write_parser.add_argument("--type", default="text/plain", help="MIME type (default: text/plain)") write_parser.add_argument("--executable", action="store_true", help="Mark as executable") write_parser.add_argument("--hidden", action="store_true", help="Mark as hidden") # Delete object delete_parser = subparsers.add_parser("delete", help="Delete an object by ID") delete_parser.add_argument("id", help="ID of the object to delete") # Set label setlabel_parser = subparsers.add_parser("setlabel", help="Create or update a label to point to an ID") setlabel_parser.add_argument("label", help="Label name") setlabel_parser.add_argument("id", help="ID to associate with the label") # Remove label rmlabel_parser = subparsers.add_parser("rmlabel", help="Delete a label") rmlabel_parser.add_argument("label", help="Label name to delete") # List labels subparsers.add_parser("labels", help="List all labels") # Tree operations link_parser = subparsers.add_parser("link", help="Link a child into a tree") link_parser.add_argument("parent_id", help="Parent tree object ID") link_parser.add_argument("name", help="Entry name") link_parser.add_argument("child_id", help="Child object ID") unlink_parser = subparsers.add_parser("unlink", help="Unlink a child from a tree") unlink_parser.add_argument("parent_id", help="Parent tree object ID") unlink_parser.add_argument("name", help="Entry name to remove") ls_parser = subparsers.add_parser("ls", help="List tree entries") ls_parser.add_argument("parent_id", help="Tree object ID to list") args = parser.parse_args() try: match args.operation: case "create": if Path(args.database).exists(): print(f"Database already exists: {args.database}", file=sys.stderr) sys.exit(1) db = Sqlite3Trove.open(args.database, create=True) db.close() print(f"Database created: {args.database}") case "get": db = Sqlite3Trove.open(args.database) data = db.read_object(args.id) db.close() if data is None: print(f"Object not found: {args.id}") sys.exit(1) sys.stdout.buffer.write(data) case "info": db = Sqlite3Trove.open(args.database) obj = db.get_info(args.id) has_children = db.is_tree(args.id) db.close() if obj is None: print(f"Object not found: {args.id}") sys.exit(1) print(f"id: {obj.id}") print(f"type: {obj.type}") print(f"created: {obj.created.isoformat()}") print(f"modified: {obj.modified.isoformat()}") print(f"executable: {obj.executable}") print(f"hidden: {obj.hidden}") print(f"children: {has_children}") case "write": data_bytes = args.data.encode("utf-8") db = Sqlite3Trove.open(args.database) object_id = db.write_blob(data_bytes, args.id, dtype=args.type, executable=args.executable, hidden=args.hidden) db.close() print(object_id) case "delete": db = Sqlite3Trove.open(args.database) deleted = db.delete_object(args.id) db.close() if deleted: print(f"Deleted: {args.id}") else: print(f"Object not found: {args.id}") sys.exit(1) case "setlabel": db = Sqlite3Trove.open(args.database) db.set_label(args.label, args.id) db.close() print(f"Label '{args.label}' set to ID: {args.id}") case "rmlabel": db = Sqlite3Trove.open(args.database) deleted = db.delete_label(args.label) db.close() if deleted: print(f"Deleted label: {args.label}") else: print(f"Label not found: {args.label}") sys.exit(1) case "labels": db = Sqlite3Trove.open(args.database) labels = db.list_labels() db.close() if labels: for label, id in labels: print(f"{label}: {id}") else: print("No labels found.") case "link": db = Sqlite3Trove.open(args.database) db.link(args.parent_id, args.name, args.child_id) db.close() print(f"Linked '{args.name}' -> {args.child_id} in {args.parent_id}") case "unlink": db = Sqlite3Trove.open(args.database) removed = db.unlink(args.parent_id, args.name) db.close() if removed: print(f"Unlinked '{args.name}' from {args.parent_id}") else: print(f"Entry '{args.name}' not found in {args.parent_id}") sys.exit(1) case "ls": db = Sqlite3Trove.open(args.database) entries = db.list_tree(args.parent_id) db.close() if entries: for name, child_id in entries.items(): print(f"{name}: {child_id}") else: print("No entries.") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()