565 lines
20 KiB
Python
565 lines
20 KiB
Python
"""
|
|
Python API for accessing underlying SQlite3 File Database
|
|
|
|
A Sqlite3 database provides the basic underlying interface to the underlying
|
|
notes interface. The low level database is an extremely simple model using
|
|
similar ideas to git storage.
|
|
"""
|
|
|
|
import argparse
|
|
import sqlite3
|
|
import sys
|
|
import uuid
|
|
from typing import NamedTuple
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from . import trove as tr
|
|
|
|
NOTE_ROOT_ID = uuid.UUID(int=0)
|
|
|
|
class ObjectInfo(NamedTuple):
|
|
id: uuid.UUID
|
|
type: str
|
|
created: datetime
|
|
modified: datetime
|
|
executable: bool
|
|
hidden: bool
|
|
|
|
|
|
_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS objects (
|
|
id TEXT PRIMARY KEY,
|
|
type TEXT NOT NULL,
|
|
data BLOB,
|
|
created REAL NOT NULL,
|
|
modified REAL NOT NULL,
|
|
executable INTEGER NOT NULL DEFAULT 0,
|
|
hidden INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS metadata (
|
|
id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE,
|
|
key TEXT NOT NULL,
|
|
value BLOB,
|
|
PRIMARY KEY (id, key)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS labels
|
|
(
|
|
label TEXT PRIMARY KEY,
|
|
id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS tree_entries (
|
|
parent_id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE,
|
|
name TEXT NOT NULL,
|
|
child_id TEXT NOT NULL REFERENCES objects(id) ON DELETE CASCADE,
|
|
PRIMARY KEY (parent_id, name)
|
|
);
|
|
"""
|
|
|
|
type SqlObjectId = str | uuid.UUID
|
|
|
|
def _now() -> float:
|
|
"""Current UTC time as a Unix epoch float."""
|
|
return datetime.now(timezone.utc).timestamp()
|
|
|
|
def _to_datetime(ts: float) -> datetime:
|
|
"""Convert a Unix epoch float to a UTC datetime."""
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc)
|
|
|
|
def _sql_id(id: SqlObjectId | None) -> str | None:
|
|
if id is None:
|
|
return None
|
|
return (
|
|
id if isinstance(id, str) else str(id)
|
|
)
|
|
|
|
|
|
class Sqlite3Trove:
|
|
def __init__(self, con: sqlite3.Connection):
|
|
self._con = con
|
|
self._con.execute("PRAGMA foreign_keys = ON")
|
|
self._con.row_factory = sqlite3.Row
|
|
|
|
# ------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ------------------------------------------------------------------
|
|
|
|
@classmethod
|
|
def open(cls, path: str | Path, create: bool = False) -> "Sqlite3Trove":
|
|
"""Open an existing Trove database."""
|
|
p = Path(path)
|
|
initialize = False
|
|
if not p.exists():
|
|
if not create:
|
|
raise FileNotFoundError(f"Database not found: {p}")
|
|
initialize = True
|
|
con = sqlite3.connect(str(p))
|
|
if initialize:
|
|
con.executescript(_SCHEMA)
|
|
con.commit()
|
|
obj = cls(con)
|
|
if initialize:
|
|
obj._write_object(b"", "inode/directory", NOTE_ROOT_ID)
|
|
return obj
|
|
|
|
def close(self):
|
|
self._con.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *_):
|
|
self.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Object info
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_info(self, object_id: SqlObjectId) -> ObjectInfo | None:
|
|
"""Return an ObjectInfo namedtuple for the object, or None if not found."""
|
|
row = self._con.execute(
|
|
"SELECT id, type, created, modified, executable, hidden "
|
|
"FROM objects WHERE id = ?",
|
|
(_sql_id(object_id),),
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
return ObjectInfo(
|
|
id=uuid.UUID(row["id"]),
|
|
type=row["type"],
|
|
created=_to_datetime(row["created"]),
|
|
modified=_to_datetime(row["modified"]),
|
|
executable=bool(row["executable"]),
|
|
hidden=bool(row["hidden"]),
|
|
)
|
|
|
|
def is_tree(self, object_id: SqlObjectId) -> bool:
|
|
"""Return True if the object has any children in tree_entries."""
|
|
row = self._con.execute(
|
|
"SELECT 1 FROM tree_entries WHERE parent_id = ? LIMIT 1",
|
|
(_sql_id(object_id),),
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
# ------------------------------------------------------------------
|
|
# CRUD operations
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_object_type(self, object_id: SqlObjectId) -> str | None:
|
|
"""Return the type column for an object, or None if not found."""
|
|
row = self._con.execute(
|
|
"SELECT type FROM objects WHERE id = ?", (_sql_id(object_id),)
|
|
).fetchone()
|
|
return row["type"] if row else None
|
|
|
|
def read_object(self, object_id: SqlObjectId) -> bytes | None:
|
|
"""Return raw data for an object, or None if not found."""
|
|
row = self._con.execute(
|
|
"SELECT data FROM objects WHERE id = ?", (_sql_id(object_id),)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
return bytes(row["data"]) if row["data"] is not None else b""
|
|
|
|
def get_mtime(self, object_id: SqlObjectId) -> datetime | None:
|
|
"""Return the modified timestamp for an object, or None if not found."""
|
|
row = self._con.execute(
|
|
"SELECT modified FROM objects WHERE id = ?", (_sql_id(object_id),)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
return _to_datetime(row["modified"])
|
|
|
|
def read_metadata(self, object_id: SqlObjectId, key: str) -> bytes | None:
|
|
"""Return raw metadata value for (uuid, key), or None if not found."""
|
|
row = self._con.execute(
|
|
"SELECT value FROM metadata WHERE id = ? AND key = ?", (_sql_id(object_id), key)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
return bytes(row["value"]) if row["value"] is not None else b""
|
|
|
|
def write_metadata(self, object_id: SqlObjectId, key: str, value: bytes) -> None:
|
|
"""Upsert a metadata row."""
|
|
self._con.execute(
|
|
"INSERT OR REPLACE INTO metadata (id, key, value) VALUES (?, ?, ?)",
|
|
(_sql_id(object_id), key, value),
|
|
)
|
|
self._con.commit()
|
|
|
|
def write_content(self, object_id: SqlObjectId, data: bytes) -> None:
|
|
"""Update only the data and modified timestamp. Preserves type and flags."""
|
|
self._con.execute(
|
|
"UPDATE objects SET data = ?, modified = ? WHERE id = ?",
|
|
(data, _now(), _sql_id(object_id)),
|
|
)
|
|
self._con.commit()
|
|
|
|
def _write_object(
|
|
self,
|
|
data: bytes,
|
|
dtype: str,
|
|
object_id: str | uuid.UUID | None = None,
|
|
executable: bool = False,
|
|
hidden: bool = False,
|
|
) -> str:
|
|
"""
|
|
Insert or replace an object. Returns the id.
|
|
On INSERT, both created and modified are set to now.
|
|
On REPLACE (existing id), created is preserved and modified is updated.
|
|
"""
|
|
now = _now()
|
|
if object_id is None:
|
|
object_id = uuid.uuid4()
|
|
sid = _sql_id(object_id)
|
|
assert sid is not None
|
|
|
|
# Preserve created timestamp on update
|
|
row = self._con.execute(
|
|
"SELECT created FROM objects WHERE id = ?", (sid,)
|
|
).fetchone()
|
|
created = row["created"] if row else now
|
|
|
|
self._con.execute(
|
|
"""INSERT INTO objects (id, type, data, created, modified, executable, hidden)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
type=excluded.type, data=excluded.data, modified=excluded.modified,
|
|
executable=excluded.executable, hidden=excluded.hidden""",
|
|
(sid, dtype, data, created, now, int(executable), int(hidden)),
|
|
)
|
|
self._con.commit()
|
|
return sid
|
|
|
|
def write_blob(self, data: bytes, object_id: SqlObjectId | None = None,
|
|
dtype: str = "application/octet-stream",
|
|
executable: bool = False, hidden: bool = False) -> str:
|
|
"""
|
|
Insert or replace a blob. Returns the id.
|
|
Pass object_id to update an existing object.
|
|
"""
|
|
return self._write_object(data, dtype, _sql_id(object_id),
|
|
executable=executable, hidden=hidden)
|
|
|
|
def write_tree(self, data: bytes, object_id: SqlObjectId | None = None,
|
|
hidden: bool = False) -> str:
|
|
"""Write a tree-typed object. Returns the assigned id."""
|
|
return self._write_object(data, "inode/directory", _sql_id(object_id),
|
|
hidden=hidden)
|
|
|
|
def set_executable(self, object_id: SqlObjectId, executable: bool) -> None:
|
|
"""Set or clear the executable flag on an object."""
|
|
self._con.execute(
|
|
"UPDATE objects SET executable = ?, modified = ? WHERE id = ?",
|
|
(int(executable), _now(), _sql_id(object_id)),
|
|
)
|
|
self._con.commit()
|
|
|
|
def set_hidden(self, object_id: SqlObjectId, hidden: bool) -> None:
|
|
"""Set or clear the hidden flag on an object."""
|
|
self._con.execute(
|
|
"UPDATE objects SET hidden = ?, modified = ? WHERE id = ?",
|
|
(int(hidden), _now(), _sql_id(object_id)),
|
|
)
|
|
self._con.commit()
|
|
|
|
def set_type(self, object_id: SqlObjectId, dtype: str) -> None:
|
|
"""Update the MIME type of an object."""
|
|
self._con.execute(
|
|
"UPDATE objects SET type = ?, modified = ? WHERE id = ?",
|
|
(dtype, _now(), _sql_id(object_id)),
|
|
)
|
|
self._con.commit()
|
|
|
|
def delete_object(self, object_id: SqlObjectId) -> bool:
|
|
"""
|
|
Delete an object and all its metadata rows.
|
|
Returns True if an object was deleted, False if id not found.
|
|
Foreign key cascade handles the metadata and tree_entries rows.
|
|
"""
|
|
cur = self._con.execute(
|
|
"DELETE FROM objects WHERE id = ?", (_sql_id(object_id),)
|
|
)
|
|
self._con.commit()
|
|
return cur.rowcount > 0
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tree entry operations
|
|
# ------------------------------------------------------------------
|
|
|
|
def link(self, parent_id: SqlObjectId, name: str, child_id: SqlObjectId, overwrite: bool = True) -> None:
|
|
"""
|
|
Link a child object into a tree under the given name.
|
|
Replaces any existing entry with the same name in this tree if overwrite=True.
|
|
Both parent_id and child_id must exist in the objects table
|
|
(enforced by FK constraints).
|
|
"""
|
|
if not overwrite:
|
|
existing = self._con.execute(
|
|
"SELECT 1 FROM tree_entries WHERE parent_id = ? AND name = ?",
|
|
(_sql_id(parent_id), name),
|
|
).fetchone()
|
|
if existing:
|
|
raise tr.ErrorExists(f"Entry '{name}' already exists in tree {parent_id}")
|
|
|
|
self._con.execute(
|
|
"INSERT OR REPLACE INTO tree_entries (parent_id, name, child_id) "
|
|
"VALUES (?, ?, ?)",
|
|
(_sql_id(parent_id), name, _sql_id(child_id)),
|
|
)
|
|
self._con.execute(
|
|
"UPDATE objects SET modified = ? WHERE id = ?",
|
|
(_now(), _sql_id(parent_id)),
|
|
)
|
|
self._con.commit()
|
|
|
|
def unlink(self, parent_id: SqlObjectId, name: str) -> bool:
|
|
"""
|
|
Remove a named entry from a tree.
|
|
Returns True if an entry was removed, False if not found.
|
|
Does not delete the child object itself.
|
|
"""
|
|
cur = self._con.execute(
|
|
"DELETE FROM tree_entries WHERE parent_id = ? AND name = ?",
|
|
(_sql_id(parent_id), name),
|
|
)
|
|
if cur.rowcount > 0:
|
|
self._con.execute(
|
|
"UPDATE objects SET modified = ? WHERE id = ?",
|
|
(_now(), _sql_id(parent_id)),
|
|
)
|
|
self._con.commit()
|
|
return cur.rowcount > 0
|
|
|
|
def list_tree(self, parent_id: SqlObjectId) -> dict[str, str]:
|
|
"""
|
|
Return all entries in a tree as {name: child_id}.
|
|
Returns an empty dict if the tree has no entries.
|
|
"""
|
|
rows = self._con.execute(
|
|
"SELECT name, child_id FROM tree_entries "
|
|
"WHERE parent_id = ? ORDER BY name",
|
|
(_sql_id(parent_id),),
|
|
).fetchall()
|
|
return {row["name"]: row["child_id"] for row in rows}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Label operations
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_label(self, label: str) -> SqlObjectId | None:
|
|
"""
|
|
Return the ID associated with a label, or None if not found.
|
|
"""
|
|
row = self._con.execute(
|
|
"SELECT id FROM labels WHERE label = ?", (label,)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
return uuid.UUID(row["id"])
|
|
|
|
def set_label(self, label: str, object_id: SqlObjectId) -> None:
|
|
"""
|
|
Set a label to point to an ID. Creates or updates the label.
|
|
The ID must exist in the objects table.
|
|
"""
|
|
self._con.execute(
|
|
"INSERT OR REPLACE INTO labels (label, id) VALUES (?, ?)",
|
|
(label, _sql_id(object_id)),
|
|
)
|
|
self._con.commit()
|
|
|
|
def delete_label(self, label: str) -> bool:
|
|
"""
|
|
Delete a label. Returns True if a label was deleted, False if not found.
|
|
"""
|
|
cur = self._con.execute(
|
|
"DELETE FROM labels WHERE label = ?", (label,)
|
|
)
|
|
self._con.commit()
|
|
return cur.rowcount > 0
|
|
|
|
def list_labels(self) -> list[tuple[str, str]]:
|
|
"""
|
|
Return all labels as a list of (label, id) tuples.
|
|
"""
|
|
rows = self._con.execute(
|
|
"SELECT label, id FROM labels ORDER BY label"
|
|
).fetchall()
|
|
return [(row["label"], row["id"]) for row in rows]
|
|
|
|
|
|
def main():
|
|
"""Command-line interface for TroveDB."""
|
|
parser = argparse.ArgumentParser(description="TroveDB command-line interface")
|
|
parser.add_argument("database", help="Path to the database file")
|
|
|
|
subparsers = parser.add_subparsers(dest="operation", required=True, help="Operation to perform")
|
|
|
|
# Create database
|
|
subparsers.add_parser("create", help="Create a new database")
|
|
|
|
# Get object data
|
|
get_parser = subparsers.add_parser("get", help="Get object data by ID")
|
|
get_parser.add_argument("id", help="ID of the object to retrieve")
|
|
|
|
# Info
|
|
info_parser = subparsers.add_parser("info", help="Show object metadata")
|
|
info_parser.add_argument("id", help="Object ID")
|
|
|
|
# Write blob
|
|
write_parser = subparsers.add_parser("write", help="Write data to a blob")
|
|
write_parser.add_argument("data", help="Data to write (as string, will be encoded as UTF-8)")
|
|
write_parser.add_argument("--id", help="ID of existing blob to update (optional)")
|
|
write_parser.add_argument("--type", default="text/plain", help="MIME type (default: text/plain)")
|
|
write_parser.add_argument("--executable", action="store_true", help="Mark as executable")
|
|
write_parser.add_argument("--hidden", action="store_true", help="Mark as hidden")
|
|
|
|
# Delete object
|
|
delete_parser = subparsers.add_parser("delete", help="Delete an object by ID")
|
|
delete_parser.add_argument("id", help="ID of the object to delete")
|
|
|
|
# Set label
|
|
setlabel_parser = subparsers.add_parser("setlabel", help="Create or update a label to point to an ID")
|
|
setlabel_parser.add_argument("label", help="Label name")
|
|
setlabel_parser.add_argument("id", help="ID to associate with the label")
|
|
|
|
# Remove label
|
|
rmlabel_parser = subparsers.add_parser("rmlabel", help="Delete a label")
|
|
rmlabel_parser.add_argument("label", help="Label name to delete")
|
|
|
|
# List labels
|
|
subparsers.add_parser("labels", help="List all labels")
|
|
|
|
# Tree operations
|
|
link_parser = subparsers.add_parser("link", help="Link a child into a tree")
|
|
link_parser.add_argument("parent_id", help="Parent tree object ID")
|
|
link_parser.add_argument("name", help="Entry name")
|
|
link_parser.add_argument("child_id", help="Child object ID")
|
|
|
|
unlink_parser = subparsers.add_parser("unlink", help="Unlink a child from a tree")
|
|
unlink_parser.add_argument("parent_id", help="Parent tree object ID")
|
|
unlink_parser.add_argument("name", help="Entry name to remove")
|
|
|
|
ls_parser = subparsers.add_parser("ls", help="List tree entries")
|
|
ls_parser.add_argument("parent_id", help="Tree object ID to list")
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
match args.operation:
|
|
case "create":
|
|
if Path(args.database).exists():
|
|
print(f"Database already exists: {args.database}", file=sys.stderr)
|
|
sys.exit(1)
|
|
db = Sqlite3Trove.open(args.database, create=True)
|
|
db.close()
|
|
print(f"Database created: {args.database}")
|
|
|
|
case "get":
|
|
db = Sqlite3Trove.open(args.database)
|
|
data = db.read_object(args.id)
|
|
db.close()
|
|
if data is None:
|
|
print(f"Object not found: {args.id}")
|
|
sys.exit(1)
|
|
sys.stdout.buffer.write(data)
|
|
|
|
case "info":
|
|
db = Sqlite3Trove.open(args.database)
|
|
obj = db.get_info(args.id)
|
|
has_children = db.is_tree(args.id)
|
|
db.close()
|
|
if obj is None:
|
|
print(f"Object not found: {args.id}")
|
|
sys.exit(1)
|
|
print(f"id: {obj.id}")
|
|
print(f"type: {obj.type}")
|
|
print(f"created: {obj.created.isoformat()}")
|
|
print(f"modified: {obj.modified.isoformat()}")
|
|
print(f"executable: {obj.executable}")
|
|
print(f"hidden: {obj.hidden}")
|
|
print(f"children: {has_children}")
|
|
|
|
case "write":
|
|
data_bytes = args.data.encode("utf-8")
|
|
db = Sqlite3Trove.open(args.database)
|
|
object_id = db.write_blob(data_bytes, args.id,
|
|
dtype=args.type,
|
|
executable=args.executable,
|
|
hidden=args.hidden)
|
|
db.close()
|
|
print(object_id)
|
|
|
|
case "delete":
|
|
db = Sqlite3Trove.open(args.database)
|
|
deleted = db.delete_object(args.id)
|
|
db.close()
|
|
if deleted:
|
|
print(f"Deleted: {args.id}")
|
|
else:
|
|
print(f"Object not found: {args.id}")
|
|
sys.exit(1)
|
|
|
|
case "setlabel":
|
|
db = Sqlite3Trove.open(args.database)
|
|
db.set_label(args.label, args.id)
|
|
db.close()
|
|
print(f"Label '{args.label}' set to ID: {args.id}")
|
|
|
|
case "rmlabel":
|
|
db = Sqlite3Trove.open(args.database)
|
|
deleted = db.delete_label(args.label)
|
|
db.close()
|
|
if deleted:
|
|
print(f"Deleted label: {args.label}")
|
|
else:
|
|
print(f"Label not found: {args.label}")
|
|
sys.exit(1)
|
|
|
|
case "labels":
|
|
db = Sqlite3Trove.open(args.database)
|
|
labels = db.list_labels()
|
|
db.close()
|
|
if labels:
|
|
for label, id in labels:
|
|
print(f"{label}: {id}")
|
|
else:
|
|
print("No labels found.")
|
|
|
|
case "link":
|
|
db = Sqlite3Trove.open(args.database)
|
|
db.link(args.parent_id, args.name, args.child_id)
|
|
db.close()
|
|
print(f"Linked '{args.name}' -> {args.child_id} in {args.parent_id}")
|
|
|
|
case "unlink":
|
|
db = Sqlite3Trove.open(args.database)
|
|
removed = db.unlink(args.parent_id, args.name)
|
|
db.close()
|
|
if removed:
|
|
print(f"Unlinked '{args.name}' from {args.parent_id}")
|
|
else:
|
|
print(f"Entry '{args.name}' not found in {args.parent_id}")
|
|
sys.exit(1)
|
|
|
|
case "ls":
|
|
db = Sqlite3Trove.open(args.database)
|
|
entries = db.list_tree(args.parent_id)
|
|
db.close()
|
|
if entries:
|
|
for name, child_id in entries.items():
|
|
print(f"{name}: {child_id}")
|
|
else:
|
|
print("No entries.")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|