Improve database schema for notes-with-children

This commit is contained in:
Andrew Mulbrook 2026-03-24 22:51:56 -05:00
parent 82c272990c
commit 94d00c94d4
2 changed files with 299 additions and 99 deletions

View file

@ -10,17 +10,30 @@ import argparse
import sqlite3
import sys
import uuid
from typing import NamedTuple
from datetime import datetime, timezone
from pathlib import Path
NOTE_ROOT_ID = uuid.UUID(int=0)
class ObjectInfo(NamedTuple):
id: uuid.UUID
type: str
created: datetime
modified: datetime
executable: bool
hidden: bool
_SCHEMA = """
CREATE TABLE IF NOT EXISTS objects (
id TEXT PRIMARY KEY,
type TEXT NOT NULL CHECK(type IN ('blob', 'tree')),
data BLOB,
modified TEXT NOT NULL
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
data BLOB,
created REAL NOT NULL,
modified REAL NOT NULL,
executable INTEGER NOT NULL DEFAULT 0,
hidden INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS metadata (
@ -34,13 +47,25 @@ CREATE TABLE IF NOT EXISTS labels
(
label TEXT PRIMARY KEY,
id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE
);
);
CREATE TABLE IF NOT EXISTS tree_entries (
parent_id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE,
name TEXT NOT NULL,
child_id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE,
PRIMARY KEY (parent_id, name)
);
"""
type SqlObjectId = str | uuid.UUID
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _now() -> float:
"""Current UTC time as a Unix epoch float."""
return datetime.now(timezone.utc).timestamp()
def _to_datetime(ts: float) -> datetime:
"""Convert a Unix epoch float to a UTC datetime."""
return datetime.fromtimestamp(ts, tz=timezone.utc)
def _sql_id(id: SqlObjectId | None) -> str | None:
if id is None:
@ -49,10 +74,6 @@ def _sql_id(id: SqlObjectId | None) -> str | None:
id if isinstance(id, str) else str(id)
)
def _initialize_db(con: sqlite3.Connection):
con.executescript(_SCHEMA)
con.commit()
class Sqlite3Trove:
def __init__(self, con: sqlite3.Connection):
@ -79,7 +100,7 @@ class Sqlite3Trove:
con.commit()
obj = cls(con)
if initialize:
obj.write_tree(b"", NOTE_ROOT_ID)
obj._write_object(b"", "inode/directory", NOTE_ROOT_ID)
return obj
def close(self):
@ -91,6 +112,36 @@ class Sqlite3Trove:
def __exit__(self, *_):
self.close()
# ------------------------------------------------------------------
# Object info
# ------------------------------------------------------------------
def get_info(self, object_id: SqlObjectId) -> ObjectInfo | None:
"""Return an ObjectInfo namedtuple for the object, or None if not found."""
row = self._con.execute(
"SELECT id, type, created, modified, executable, hidden "
"FROM objects WHERE id = ?",
(_sql_id(object_id),),
).fetchone()
if row is None:
return None
return ObjectInfo(
id=uuid.UUID(row["id"]),
type=row["type"],
created=_to_datetime(row["created"]),
modified=_to_datetime(row["modified"]),
executable=bool(row["executable"]),
hidden=bool(row["hidden"]),
)
def is_tree(self, object_id: SqlObjectId) -> bool:
"""Return True if the object has any children in tree_entries."""
row = self._con.execute(
"SELECT 1 FROM tree_entries WHERE parent_id = ? LIMIT 1",
(_sql_id(object_id),),
).fetchone()
return row is not None
# ------------------------------------------------------------------
# CRUD operations
# ------------------------------------------------------------------
@ -103,9 +154,9 @@ class Sqlite3Trove:
return row["type"] if row else None
def read_object(self, object_id: SqlObjectId) -> bytes | None:
"""Return raw data for a blob object, or None if not found."""
"""Return raw data for an object, or None if not found."""
row = self._con.execute(
"SELECT data, type FROM objects WHERE id = ?", (_sql_id(object_id),)
"SELECT data FROM objects WHERE id = ?", (_sql_id(object_id),)
).fetchone()
if row is None:
return None
@ -118,7 +169,7 @@ class Sqlite3Trove:
).fetchone()
if row is None:
return None
return datetime.fromisoformat(row["modified"])
return _to_datetime(row["modified"])
def read_metadata(self, object_id: SqlObjectId, key: str) -> bytes | None:
"""Return raw metadata value for (uuid, key), or None if not found."""
@ -130,45 +181,101 @@ class Sqlite3Trove:
return bytes(row["value"]) if row["value"] is not None else b""
def write_metadata(self, object_id: SqlObjectId, key: str, value: bytes) -> None:
"""Upsert a metadata row. db.py has no write_metadata, so we go direct."""
"""Upsert a metadata row."""
self._con.execute(
"INSERT OR REPLACE INTO metadata (id, key, value) VALUES (?, ?, ?)",
(_sql_id(object_id), key, value),
)
self._con.commit()
def _write_object(self, data: bytes, dtype: str, object_id: str | uuid.UUID | None = None) -> str:
"""
Insert or replace an object. Returns the id.
If object_id is None, creates a new object with a new UUID.
If object_id is provided, updates or creates the object with that ID.
"""
modified = _now()
if object_id is None:
object_id = uuid.uuid4()
def write_content(self, object_id: SqlObjectId, data: bytes) -> None:
"""Update only the data and modified timestamp. Preserves type and flags."""
self._con.execute(
"INSERT OR REPLACE INTO objects (id, type, data, modified) VALUES (?, ?, ?, ?)",
(_sql_id(object_id), dtype, data, modified)
"UPDATE objects SET data = ?, modified = ? WHERE id = ?",
(data, _now(), _sql_id(object_id)),
)
self._con.commit()
return _sql_id(object_id)
def write_blob(self, data: bytes, object_id: SqlObjectId | None = None) -> str:
def _write_object(
self,
data: bytes,
dtype: str,
object_id: str | uuid.UUID | None = None,
executable: bool = False,
hidden: bool = False,
) -> str:
"""
Insert or replace an object. Returns the id.
On INSERT, both created and modified are set to now.
On REPLACE (existing id), created is preserved and modified is updated.
"""
now = _now()
if object_id is None:
object_id = uuid.uuid4()
sid = _sql_id(object_id)
# Preserve created timestamp on update
row = self._con.execute(
"SELECT created FROM objects WHERE id = ?", (sid,)
).fetchone()
created = row["created"] if row else now
self._con.execute(
"""INSERT INTO objects (id, type, data, created, modified, executable, hidden)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
type=excluded.type, data=excluded.data, modified=excluded.modified,
executable=excluded.executable, hidden=excluded.hidden""",
(sid, dtype, data, created, now, int(executable), int(hidden)),
)
self._con.commit()
return sid
def write_blob(self, data: bytes, object_id: SqlObjectId | None = None,
dtype: str = "application/octet-stream",
executable: bool = False, hidden: bool = False) -> str:
"""
Insert or replace a blob. Returns the id.
Pass object_id to update an existing object.
"""
return self._write_object(data, "blob", _sql_id(object_id))
return self._write_object(data, dtype, _sql_id(object_id),
executable=executable, hidden=hidden)
def write_tree(self, data: bytes, object_id: SqlObjectId | None = None) -> str:
def write_tree(self, data: bytes, object_id: SqlObjectId | None = None,
hidden: bool = False) -> str:
"""Write a tree-typed object. Returns the assigned id."""
return self._write_object(data, "tree", _sql_id(object_id))
return self._write_object(data, "inode/directory", _sql_id(object_id),
hidden=hidden)
def set_executable(self, object_id: SqlObjectId, executable: bool) -> None:
"""Set or clear the executable flag on an object."""
self._con.execute(
"UPDATE objects SET executable = ?, modified = ? WHERE id = ?",
(int(executable), _now(), _sql_id(object_id)),
)
self._con.commit()
def set_hidden(self, object_id: SqlObjectId, hidden: bool) -> None:
"""Set or clear the hidden flag on an object."""
self._con.execute(
"UPDATE objects SET hidden = ?, modified = ? WHERE id = ?",
(int(hidden), _now(), _sql_id(object_id)),
)
self._con.commit()
def set_type(self, object_id: SqlObjectId, dtype: str) -> None:
"""Update the MIME type of an object."""
self._con.execute(
"UPDATE objects SET type = ?, modified = ? WHERE id = ?",
(dtype, _now(), _sql_id(object_id)),
)
self._con.commit()
def delete_object(self, object_id: SqlObjectId) -> bool:
"""
Delete a blob and all its metadata rows.
Delete an object and all its metadata rows.
Returns True if an object was deleted, False if id not found.
Foreign key cascade handles the metadata rows.
Foreign key cascade handles the metadata and tree_entries rows.
"""
cur = self._con.execute(
"DELETE FROM objects WHERE id = ?", (_sql_id(object_id),)
@ -176,6 +283,62 @@ class Sqlite3Trove:
self._con.commit()
return cur.rowcount > 0
# ------------------------------------------------------------------
# Tree entry operations
# ------------------------------------------------------------------
def link(self, parent_id: SqlObjectId, name: str, child_id: SqlObjectId) -> None:
"""
Link a child object into a tree under the given name.
Replaces any existing entry with the same name in this tree.
Both parent_id and child_id must exist in the objects table
(enforced by FK constraints).
"""
self._con.execute(
"INSERT OR REPLACE INTO tree_entries (parent_id, name, child_id) "
"VALUES (?, ?, ?)",
(_sql_id(parent_id), name, _sql_id(child_id)),
)
self._con.execute(
"UPDATE objects SET modified = ? WHERE id = ?",
(_now(), _sql_id(parent_id)),
)
self._con.commit()
def unlink(self, parent_id: SqlObjectId, name: str) -> bool:
"""
Remove a named entry from a tree.
Returns True if an entry was removed, False if not found.
Does not delete the child object itself.
"""
cur = self._con.execute(
"DELETE FROM tree_entries WHERE parent_id = ? AND name = ?",
(_sql_id(parent_id), name),
)
if cur.rowcount > 0:
self._con.execute(
"UPDATE objects SET modified = ? WHERE id = ?",
(_now(), _sql_id(parent_id)),
)
self._con.commit()
return cur.rowcount > 0
def list_tree(self, parent_id: SqlObjectId) -> dict[str, str]:
"""
Return all entries in a tree as {name: child_id}.
Returns an empty dict if the tree has no entries.
"""
rows = self._con.execute(
"SELECT name, child_id FROM tree_entries "
"WHERE parent_id = ? ORDER BY name",
(_sql_id(parent_id),),
).fetchall()
return {row["name"]: row["child_id"] for row in rows}
# ------------------------------------------------------------------
# Label operations
# ------------------------------------------------------------------
def get_label(self, label: str) -> SqlObjectId | None:
"""
Return the ID associated with a label, or None if not found.
@ -228,18 +391,25 @@ def main():
# Create database
subparsers.add_parser("create", help="Create a new database")
# Get blob
get_parser = subparsers.add_parser("get", help="Get a blob object by ID")
get_parser.add_argument("id", help="ID of the blob to retrieve")
# Get object data
get_parser = subparsers.add_parser("get", help="Get object data by ID")
get_parser.add_argument("id", help="ID of the object to retrieve")
# Info
info_parser = subparsers.add_parser("info", help="Show object metadata")
info_parser.add_argument("id", help="Object ID")
# Write blob
write_parser = subparsers.add_parser("write", help="Write data to a blob")
write_parser.add_argument("data", help="Data to write (as string, will be encoded as UTF-8)")
write_parser.add_argument("--id", help="ID of existing blob to update (optional)")
write_parser.add_argument("--type", default="text/plain", help="MIME type (default: text/plain)")
write_parser.add_argument("--executable", action="store_true", help="Mark as executable")
write_parser.add_argument("--hidden", action="store_true", help="Mark as hidden")
# Delete blob
delete_parser = subparsers.add_parser("delete", help="Delete a blob by ID")
delete_parser.add_argument("id", help="ID of the blob to delete")
# Delete object
delete_parser = subparsers.add_parser("delete", help="Delete an object by ID")
delete_parser.add_argument("id", help="ID of the object to delete")
# Set label
setlabel_parser = subparsers.add_parser("setlabel", help="Create or update a label to point to an ID")
@ -253,6 +423,19 @@ def main():
# List labels
subparsers.add_parser("labels", help="List all labels")
# Tree operations
link_parser = subparsers.add_parser("link", help="Link a child into a tree")
link_parser.add_argument("parent_id", help="Parent tree object ID")
link_parser.add_argument("name", help="Entry name")
link_parser.add_argument("child_id", help="Child object ID")
unlink_parser = subparsers.add_parser("unlink", help="Unlink a child from a tree")
unlink_parser.add_argument("parent_id", help="Parent tree object ID")
unlink_parser.add_argument("name", help="Entry name to remove")
ls_parser = subparsers.add_parser("ls", help="List tree entries")
ls_parser.add_argument("parent_id", help="Tree object ID to list")
args = parser.parse_args()
try:
@ -270,14 +453,33 @@ def main():
data = db.read_object(args.id)
db.close()
if data is None:
print(f"Blob not found: {args.id}")
print(f"Object not found: {args.id}")
sys.exit(1)
sys.stdout.buffer.write(data)
case "info":
db = Sqlite3Trove.open(args.database)
obj = db.get_info(args.id)
has_children = db.is_tree(args.id)
db.close()
if obj is None:
print(f"Object not found: {args.id}")
sys.exit(1)
print(f"id: {obj.id}")
print(f"type: {obj.type}")
print(f"created: {obj.created.isoformat()}")
print(f"modified: {obj.modified.isoformat()}")
print(f"executable: {obj.executable}")
print(f"hidden: {obj.hidden}")
print(f"children: {has_children}")
case "write":
data_bytes = args.data.encode("utf-8")
db = Sqlite3Trove.open(args.database)
object_id = db.write_blob(data_bytes, args.id)
object_id = db.write_blob(data_bytes, args.id,
dtype=args.type,
executable=args.executable,
hidden=args.hidden)
db.close()
print(object_id)
@ -286,9 +488,9 @@ def main():
deleted = db.delete_object(args.id)
db.close()
if deleted:
print(f"Deleted blob: {args.id}")
print(f"Deleted: {args.id}")
else:
print(f"Blob not found: {args.id}")
print(f"Object not found: {args.id}")
sys.exit(1)
case "setlabel":
@ -317,6 +519,32 @@ def main():
else:
print("No labels found.")
case "link":
db = Sqlite3Trove.open(args.database)
db.link(args.parent_id, args.name, args.child_id)
db.close()
print(f"Linked '{args.name}' -> {args.child_id} in {args.parent_id}")
case "unlink":
db = Sqlite3Trove.open(args.database)
removed = db.unlink(args.parent_id, args.name)
db.close()
if removed:
print(f"Unlinked '{args.name}' from {args.parent_id}")
else:
print(f"Entry '{args.name}' not found in {args.parent_id}")
sys.exit(1)
case "ls":
db = Sqlite3Trove.open(args.database)
entries = db.list_tree(args.parent_id)
db.close()
if entries:
for name, child_id in entries.items():
print(f"{name}: {child_id}")
else:
print("No entries.")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)