From a3390d5171eda9c07b7e387a68bdd80ba7a451a3 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:08:36 +0000 Subject: [PATCH] =?UTF-8?q?p1-6:=20scaffold=20kb-store-sqlite=20crate=20+?= =?UTF-8?q?=20V001=20full=20=C2=A75=20DDL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New workspace member crate `kb-store-sqlite` (allowed deps only: kb-core, kb-config, rusqlite[bundled], refinery, serde, serde_json, time, blake3, tracing, anyhow, thiserror; dev-deps add kb-parse-md / kb-normalize / kb-chunk for the contract round-trip test). Migration V001 replaces the P0-1 stub with the full §5 DDL (assets, documents, document_tags, blocks, chunks with policy_hash, embedding_records, jobs, ingest_runs, answers, eval_runs, eval_query_results) plus the §5 indexes. FTS5 virtual table + triggers remain deferred to V002 (P2-1). Public surface per task spec: SqliteStore::open / run_migrations / put_asset_with_bytes impl DocumentStore for SqliteStore (7 trait methods) impl JobRepo for SqliteStore (4 trait methods) StoreError { Sqlx, Migration, Conflict } Behavior: - Pragmas at open: foreign_keys=ON, journal_mode=WAL, synchronous=NORMAL, temp_store=MEMORY. - Asset writer: byte_len ≤ copy_threshold_mb * 1MiB → copy to data_dir/assets// (mode 0o644 on Unix), else reference. blake3(bytes) verified against asset.checksum; mismatch → Conflict. - Idempotency: put_document UPSERTs and bumps doc_version + 1 on conflict; put_blocks / put_chunks DELETE-then-INSERT; document_tags re-derived per put_document. - get_document rehydrates blocks via payload_json ordered by stream ordinal. - list_documents builds dynamic WHERE from DocFilter (lang / trust_min / path_glob via GLOB / tags_any via document_tags subquery). - JobRepo: jobs.kind/status are stored as lowercase enum tags; create mints a 32-hex JobId via blake3(kind || payload || nanos). Tests follow in subsequent commits. --- Cargo.lock | 452 +++++++++++++++++ Cargo.toml | 1 + crates/kb-store-sqlite/Cargo.toml | 35 ++ crates/kb-store-sqlite/src/documents.rs | 641 ++++++++++++++++++++++++ crates/kb-store-sqlite/src/error.rs | 20 + crates/kb-store-sqlite/src/jobs.rs | 252 ++++++++++ crates/kb-store-sqlite/src/lib.rs | 23 + crates/kb-store-sqlite/src/schema.rs | 14 + crates/kb-store-sqlite/src/store.rs | 297 +++++++++++ migrations/V001__init.sql | 175 ++++++- 10 files changed, 1906 insertions(+), 4 deletions(-) create mode 100644 crates/kb-store-sqlite/Cargo.toml create mode 100644 crates/kb-store-sqlite/src/documents.rs create mode 100644 crates/kb-store-sqlite/src/error.rs create mode 100644 crates/kb-store-sqlite/src/jobs.rs create mode 100644 crates/kb-store-sqlite/src/lib.rs create mode 100644 crates/kb-store-sqlite/src/schema.rs create mode 100644 crates/kb-store-sqlite/src/store.rs diff --git a/Cargo.lock b/Cargo.lock index 11bdbb3..d01b60a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -79,6 +91,17 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -286,6 +309,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.15.0" @@ -308,6 +342,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.4.1" @@ -329,6 +375,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fst" version = "0.4.7" @@ -415,6 +470,9 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] [[package]] name = "hashbrown" @@ -431,18 +489,129 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + [[package]] name = "id-arena" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "ignore" version = "0.4.25" @@ -649,6 +818,27 @@ dependencies = [ "walkdir", ] +[[package]] +name = "kb-store-sqlite" +version = "0.1.0" +dependencies = [ + "anyhow", + "blake3", + "kb-chunk", + "kb-config", + "kb-core", + "kb-normalize", + "kb-parse-md", + "refinery", + "rusqlite", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.18", + "time", + "tracing", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -676,6 +866,17 @@ dependencies = [ "libc", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "lingua" version = "1.8.0" @@ -744,6 +945,12 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" +[[package]] +name = "litemap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" + [[package]] name = "lock_api" version = "0.4.14" @@ -835,12 +1042,33 @@ dependencies = [ "windows-link", ] +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + [[package]] name = "pin-project-lite" version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + +[[package]] +name = "potential_utf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" +dependencies = [ + "zerovec", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -938,6 +1166,50 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "refinery" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba5d693abf62492c37268512ff35b77655d2e957ca53dab85bf993fe9172d15" +dependencies = [ + "refinery-core", + "refinery-macros", +] + +[[package]] +name = "refinery-core" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a83581f18c1a4c3a6ebd7a174bdc665f17f618d79f7edccb6a0ac67e660b319" +dependencies = [ + "async-trait", + "cfg-if", + "log", + "regex", + "rusqlite", + "serde", + "siphasher", + "thiserror 1.0.69", + "time", + "toml", + "url", + "walkdir", +] + +[[package]] +name = "refinery-macros" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72c225407d8e52ef8cf094393781ecda9a99d6544ec28d90a6915751de259264" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "refinery-core", + "regex", + "syn", +] + [[package]] name = "regex" version = "1.12.3" @@ -967,6 +1239,20 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustix" version = "1.1.4" @@ -1121,6 +1407,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "siphasher" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" + [[package]] name = "slab" version = "0.4.12" @@ -1133,6 +1425,12 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "strsim" version = "0.11.1" @@ -1174,6 +1472,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tempfile" version = "3.27.0" @@ -1267,6 +1576,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinystr" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinyvec" version = "1.11.0" @@ -1443,6 +1762,24 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1455,6 +1792,18 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" @@ -1761,6 +2110,109 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "writeable" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" + +[[package]] +name = "yoke" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerocopy" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerofrom" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/Cargo.toml b/Cargo.toml index b09dfed..b957c22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/kb-parse-md", "crates/kb-normalize", "crates/kb-chunk", + "crates/kb-store-sqlite", "crates/kb-app", "crates/kb-cli", ] diff --git a/crates/kb-store-sqlite/Cargo.toml b/crates/kb-store-sqlite/Cargo.toml new file mode 100644 index 0000000..b0862ef --- /dev/null +++ b/crates/kb-store-sqlite/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "kb-store-sqlite" +version = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +description = "SQLite-backed DocumentStore + JobRepo for kb (§5 DDL, §7.2 traits)" + +[dependencies] +kb-core = { path = "../kb-core" } +kb-config = { path = "../kb-config" } +# `bundled` ships SQLite source + builds in-tree (no system libsqlite3). +# Explicitly NOT `bundled-sqlcipher` per task allowed-deps list. +rusqlite = { version = "0.32", features = ["bundled"] } +refinery = { version = "0.8", features = ["rusqlite"] } +serde = { workspace = true } +serde_json = { workspace = true } +time = { workspace = true } +blake3 = { workspace = true } +tracing = { workspace = true } +anyhow = { workspace = true } +thiserror = { workspace = true } + +[dev-dependencies] +tempfile = "3" +serde_json = { workspace = true } +# kb-parse-md / kb-normalize / kb-chunk are dev-only — used to build a +# CanonicalDocument + Vec from a fixture in the contract round-trip +# test. Forbidden as regular deps per design §8 (store consumes domain +# types from kb-core only); `cargo tree -p kb-store-sqlite --depth 1` +# (default scope, excludes dev-deps) confirms this. +kb-parse-md = { path = "../kb-parse-md" } +kb-normalize = { path = "../kb-normalize" } +kb-chunk = { path = "../kb-chunk" } diff --git a/crates/kb-store-sqlite/src/documents.rs b/crates/kb-store-sqlite/src/documents.rs new file mode 100644 index 0000000..39cdeba --- /dev/null +++ b/crates/kb-store-sqlite/src/documents.rs @@ -0,0 +1,641 @@ +//! `DocumentStore` impl: assets, documents, document_tags, blocks, chunks. +//! +//! Transactions: per design §5.8, one ingest of one document is one +//! transaction. We expose the raw trait methods at fine granularity (so +//! `kb-app` can compose), and each one wraps its own short transaction. +//! A higher-level `ingest_document` helper that wraps put_document + +//! put_blocks + put_chunks in a single tx is intentionally NOT shipped in +//! P1-6 — `kb-app` (P1's caller layer) is the right place to compose. +//! +//! Idempotency: re-ingesting `(workspace_path, asset_id, parser_version)` +//! UPSERTs the documents row, bumps `doc_version`, and replaces all +//! blocks / chunks / document_tags. No row duplication. + +use anyhow::{Context, Result}; +use rusqlite::params; +use time::OffsetDateTime; + +use crate::error::StoreError; +use crate::store::{SqliteStore, upsert_asset_row}; + +impl kb_core::DocumentStore for SqliteStore { + fn put_asset(&self, asset: &kb_core::RawAsset) -> Result<()> { + // No bytes here — read storage_kind/storage_path from the + // RawAsset's `stored` field per its convention (§3.3). Callers + // that have raw bytes go through `put_asset_with_bytes` instead; + // this branch is for the case where bytes were already persisted + // (or referenced) and we just want to record the row. + let (storage_kind, storage_path) = match &asset.stored { + kb_core::AssetStorage::Copied { path } => { + ("copied", path.to_string_lossy().into_owned()) + } + kb_core::AssetStorage::Reference { path, .. } => { + ("reference", path.to_string_lossy().into_owned()) + } + }; + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + upsert_asset_row(&conn, asset, storage_kind, &storage_path) + } + + fn put_document(&self, doc: &kb_core::CanonicalDocument) -> Result<()> { + let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let tx = conn.transaction().map_err(StoreError::from)?; + upsert_document(&tx, doc)?; + replace_document_tags(&tx, &doc.doc_id, &doc.metadata.tags)?; + tx.commit().map_err(StoreError::from)?; + Ok(()) + } + + fn put_blocks( + &self, + doc: &kb_core::DocumentId, + blocks: &[kb_core::Block], + ) -> Result<()> { + let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let tx = conn.transaction().map_err(StoreError::from)?; + // DELETE-then-INSERT: §5.4 has no UNIQUE on (doc_id, ordinal) + // so we cannot rely on UPSERT to surface block_id collisions. The + // simplest correct path is to wipe and re-insert; the §5.8 + // per-document transaction wraps both halves so a partial state + // never lands. + tx.execute("DELETE FROM blocks WHERE doc_id = ?", params![doc.0]) + .map_err(StoreError::from)?; + let mut stmt = tx + .prepare( + "INSERT INTO blocks ( + block_id, doc_id, kind, heading_path_json, + ordinal, source_span_json, payload_json + ) VALUES (?, ?, ?, ?, ?, ?, ?)", + ) + .map_err(StoreError::from)?; + // Ordinal here is the position of the block in the document's + // overall block stream — used for sort-on-load, not the §4.3 + // (heading_path, kind)-scoped ordinal that fed `block_id`. + for (i, block) in blocks.iter().enumerate() { + let row = block_to_row(doc, block, i as i64)?; + stmt.execute(params![ + row.block_id, + row.doc_id, + row.kind, + row.heading_path_json, + row.ordinal, + row.source_span_json, + row.payload_json, + ]) + .map_err(StoreError::from)?; + } + drop(stmt); + tx.commit().map_err(StoreError::from)?; + Ok(()) + } + + fn put_chunks( + &self, + doc: &kb_core::DocumentId, + chunks: &[kb_core::Chunk], + ) -> Result<()> { + let now = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .context("format chunk created_at")?; + let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let tx = conn.transaction().map_err(StoreError::from)?; + tx.execute("DELETE FROM chunks WHERE doc_id = ?", params![doc.0]) + .map_err(StoreError::from)?; + let mut stmt = tx + .prepare( + "INSERT INTO chunks ( + chunk_id, doc_id, text, heading_path_json, + section_label, source_spans_json, token_estimate, + chunker_version, policy_hash, block_ids_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .map_err(StoreError::from)?; + for chunk in chunks { + let heading_path = serde_json::to_string(&chunk.heading_path) + .context("serialize chunk.heading_path")?; + let source_spans = serde_json::to_string(&chunk.source_spans) + .context("serialize chunk.source_spans")?; + let block_ids = serde_json::to_string(&chunk.block_ids) + .context("serialize chunk.block_ids")?; + // §5.5 has a `section_label` column but the in-memory Chunk + // struct does not carry it (nor does the wire schema §2.6). + // Persist NULL until a future bump introduces the field. + stmt.execute(params![ + chunk.chunk_id.0, + chunk.doc_id.0, + chunk.text, + heading_path, + Option::::None, + source_spans, + chunk.token_estimate as i64, + chunk.chunker_version.0, + chunk.policy_hash, + block_ids, + now, + ]) + .map_err(StoreError::from)?; + } + drop(stmt); + tx.commit().map_err(StoreError::from)?; + Ok(()) + } + + fn get_document( + &self, + id: &kb_core::DocumentId, + ) -> Result> { + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let row: Option = conn + .query_row( + "SELECT + doc_id, asset_id, workspace_path, title, lang, + source_type, trust_level, parser_version, + doc_version, schema_version, metadata_json, + provenance_json, created_at, updated_at + FROM documents WHERE doc_id = ?", + params![id.0], + document_row_from_sql, + ) + .map(Some) + .or_else(rows_optional) + .map_err(StoreError::from)?; + let Some(row) = row else { return Ok(None) }; + + // Rehydrate blocks. Sort by stream-ordinal so the returned + // CanonicalDocument matches the order originally persisted. + let mut blocks_stmt = conn + .prepare( + "SELECT payload_json FROM blocks + WHERE doc_id = ? ORDER BY ordinal ASC", + ) + .map_err(StoreError::from)?; + let block_rows = blocks_stmt + .query_map(params![id.0], |r| { + let payload_json: String = r.get(0)?; + Ok(payload_json) + }) + .map_err(StoreError::from)?; + let mut blocks: Vec = Vec::new(); + for row in block_rows { + let payload_json = row.map_err(StoreError::from)?; + let block: kb_core::Block = serde_json::from_str(&payload_json) + .context("deserialize block payload_json")?; + blocks.push(block); + } + + let metadata: kb_core::Metadata = serde_json::from_str(&row.metadata_json) + .context("deserialize metadata_json")?; + let provenance: kb_core::Provenance = + serde_json::from_str(&row.provenance_json) + .context("deserialize provenance_json")?; + + Ok(Some(kb_core::CanonicalDocument { + doc_id: kb_core::DocumentId(row.doc_id), + source_asset_id: kb_core::AssetId(row.asset_id), + workspace_path: kb_core::WorkspacePath(row.workspace_path), + title: row.title.unwrap_or_default(), + lang: kb_core::Lang(row.lang.unwrap_or_default()), + blocks, + metadata, + provenance, + parser_version: kb_core::ParserVersion(row.parser_version), + schema_version: row.schema_version as u32, + doc_version: row.doc_version as u32, + })) + } + + fn get_chunk(&self, id: &kb_core::ChunkId) -> Result> { + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let row = conn + .query_row( + "SELECT + chunk_id, doc_id, text, heading_path_json, + source_spans_json, token_estimate, chunker_version, + policy_hash, block_ids_json + FROM chunks WHERE chunk_id = ?", + params![id.0], + chunk_row_from_sql, + ) + .map(Some) + .or_else(rows_optional) + .map_err(StoreError::from)?; + let Some(row) = row else { return Ok(None) }; + let heading_path: Vec = serde_json::from_str(&row.heading_path_json) + .context("deserialize chunk.heading_path_json")?; + let source_spans: Vec = + serde_json::from_str(&row.source_spans_json) + .context("deserialize chunk.source_spans_json")?; + let block_ids: Vec = + serde_json::from_str(&row.block_ids_json) + .context("deserialize chunk.block_ids_json")?; + Ok(Some(kb_core::Chunk { + chunk_id: kb_core::ChunkId(row.chunk_id), + doc_id: kb_core::DocumentId(row.doc_id), + block_ids, + text: row.text, + heading_path, + source_spans, + token_estimate: row.token_estimate as usize, + chunker_version: kb_core::ChunkerVersion(row.chunker_version), + policy_hash: row.policy_hash, + })) + } + + fn list_documents( + &self, + filter: &kb_core::DocFilter, + ) -> Result> { + // Build a dynamic WHERE clause from the filter. Each condition + // appends one positional `?` placeholder and one `Box` to `params` so order stays in sync. + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut sql = String::from( + "SELECT d.doc_id, d.workspace_path, d.title, d.lang, + d.source_type, d.trust_level, d.parser_version, + d.created_at, d.updated_at, + a.byte_len, + (SELECT COUNT(*) FROM chunks c WHERE c.doc_id = d.doc_id) AS chunk_count, + -- chunker_version: assume one chunker per doc; pick + -- any row's value. NULL if no chunks yet. + (SELECT chunker_version FROM chunks c2 + WHERE c2.doc_id = d.doc_id LIMIT 1) AS chunker_version + FROM documents d + JOIN assets a ON a.asset_id = d.asset_id + WHERE 1=1", + ); + let mut params_dyn: Vec> = Vec::new(); + + if let Some(lang) = &filter.lang { + sql.push_str(" AND d.lang = ?"); + params_dyn.push(Box::new(lang.0.clone())); + } + if let Some(trust_min) = &filter.trust_min { + // Map the enum to its rank: Generated < Secondary < Primary. + // (Higher trust strictly contains lower trust.) + sql.push_str(" AND CASE d.trust_level + WHEN 'primary' THEN 3 + WHEN 'secondary' THEN 2 + WHEN 'generated' THEN 1 + ELSE 0 + END >= ?"); + let rank: i64 = match trust_min { + kb_core::TrustLevel::Primary => 3, + kb_core::TrustLevel::Secondary => 2, + kb_core::TrustLevel::Generated => 1, + }; + params_dyn.push(Box::new(rank)); + } + if let Some(glob) = &filter.path_glob { + sql.push_str(" AND d.workspace_path GLOB ?"); + params_dyn.push(Box::new(glob.clone())); + } + if !filter.tags_any.is_empty() { + // INTERSECT-style filter: doc must own at least one of the + // requested tags. Use IN with a placeholder list. + sql.push_str(" AND d.doc_id IN (SELECT doc_id FROM document_tags WHERE tag IN ("); + for (i, tag) in filter.tags_any.iter().enumerate() { + if i > 0 { + sql.push(','); + } + sql.push('?'); + params_dyn.push(Box::new(tag.clone())); + } + sql.push_str("))"); + } + sql.push_str(" ORDER BY d.workspace_path"); + + let mut stmt = conn.prepare(&sql).map_err(StoreError::from)?; + let rows = stmt + .query_map( + rusqlite::params_from_iter(params_dyn.iter().map(|b| b.as_ref())), + doc_summary_from_sql, + ) + .map_err(StoreError::from)?; + let mut out = Vec::new(); + for r in rows { + let summary = r.map_err(StoreError::from)?; + // tags filter at row-load time: pull the tag list per doc. + let mut tag_stmt = conn + .prepare("SELECT tag FROM document_tags WHERE doc_id = ? ORDER BY tag") + .map_err(StoreError::from)?; + let tag_iter = tag_stmt + .query_map(params![summary.doc_id.0], |r| r.get::<_, String>(0)) + .map_err(StoreError::from)?; + let tags: Vec = tag_iter + .collect::>>() + .map_err(StoreError::from)?; + out.push(kb_core::DocSummary { tags, ..summary }); + } + Ok(out) + } +} + +// ── Internal row + (de)serialization helpers ───────────────────────────── + +struct DocumentRow { + doc_id: String, + asset_id: String, + workspace_path: String, + title: Option, + lang: Option, + parser_version: String, + doc_version: i64, + schema_version: i64, + metadata_json: String, + provenance_json: String, + // source_type / trust_level are loaded back via metadata_json round-trip, + // so we do not need separate fields here for `get_document`. +} + +fn document_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result { + Ok(DocumentRow { + doc_id: row.get(0)?, + asset_id: row.get(1)?, + workspace_path: row.get(2)?, + title: row.get(3)?, + lang: row.get(4)?, + // 5: source_type, 6: trust_level — read but unused here (metadata_json + // is authoritative). Keeping them in the SELECT makes the column + // ordering match the INSERT and allows future fields without + // shifting indexes. + parser_version: row.get(7)?, + doc_version: row.get(8)?, + schema_version: row.get(9)?, + metadata_json: row.get(10)?, + provenance_json: row.get(11)?, + }) +} + +struct ChunkRow { + chunk_id: String, + doc_id: String, + text: String, + heading_path_json: String, + source_spans_json: String, + token_estimate: i64, + chunker_version: String, + policy_hash: String, + block_ids_json: String, +} + +fn chunk_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result { + Ok(ChunkRow { + chunk_id: row.get(0)?, + doc_id: row.get(1)?, + text: row.get(2)?, + heading_path_json: row.get(3)?, + source_spans_json: row.get(4)?, + token_estimate: row.get(5)?, + chunker_version: row.get(6)?, + policy_hash: row.get(7)?, + block_ids_json: row.get(8)?, + }) +} + +fn doc_summary_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let doc_id: String = row.get(0)?; + let workspace_path: String = row.get(1)?; + let title: Option = row.get(2)?; + let lang: Option = row.get(3)?; + let source_type_raw: String = row.get(4)?; + let trust_level_raw: String = row.get(5)?; + let parser_version: String = row.get(6)?; + let created_at_raw: String = row.get(7)?; + let updated_at_raw: String = row.get(8)?; + let byte_len: i64 = row.get(9)?; + let chunk_count: i64 = row.get(10)?; + let chunker_version: Option = row.get(11)?; + + // De-serialize the lowercase string forms that match + // `#[serde(rename_all = "lowercase")]` on each enum. + let source_type: kb_core::SourceType = + serde_json::from_value(serde_json::Value::String(source_type_raw)) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?; + let trust_level: kb_core::TrustLevel = + serde_json::from_value(serde_json::Value::String(trust_level_raw)) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))?; + let created_at = OffsetDateTime::parse( + &created_at_raw, + &time::format_description::well_known::Rfc3339, + ) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e)))?; + let updated_at = OffsetDateTime::parse( + &updated_at_raw, + &time::format_description::well_known::Rfc3339, + ) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(8, rusqlite::types::Type::Text, Box::new(e)))?; + + Ok(kb_core::DocSummary { + doc_id: kb_core::DocumentId(doc_id), + doc_path: kb_core::WorkspacePath(workspace_path), + title: title.unwrap_or_default(), + lang: kb_core::Lang(lang.unwrap_or_default()), + // tags filled in by caller after a per-doc fetch. + tags: Vec::new(), + trust_level, + source_type, + byte_len: byte_len as u64, + chunk_count: chunk_count as u32, + created_at, + updated_at, + parser_version: kb_core::ParserVersion(parser_version), + // chunker_version may be NULL when the doc has no chunks yet. + // Empty string is the cleanest fallback consistent with the wire + // schema's required `chunker_version` field on DocSummary v1. + chunker_version: kb_core::ChunkerVersion(chunker_version.unwrap_or_default()), + }) +} + +/// Map a `QueryReturnedNoRows` into `Ok(None)` so the trait returns +/// `Option` rather than an error for the common "missing" case. +fn rows_optional(err: rusqlite::Error) -> rusqlite::Result> { + match err { + rusqlite::Error::QueryReturnedNoRows => Ok(None), + e => Err(e), + } +} + +/// UPSERT the documents row and bump `doc_version` on conflict. +fn upsert_document( + tx: &rusqlite::Transaction<'_>, + doc: &kb_core::CanonicalDocument, +) -> Result<()> { + let metadata_json = serde_json::to_string(&doc.metadata) + .context("serialize metadata")?; + let provenance_json = serde_json::to_string(&doc.provenance) + .context("serialize provenance")?; + // String form of the lowercase serde representation. We avoid + // embedding `serde_json::to_string` quotes (`"markdown"` → just + // `markdown` for the column). + let source_type = source_type_label(&doc.metadata.source_type); + let trust_level = trust_level_label(&doc.metadata.trust_level); + let created_at = doc + .metadata + .created_at + .format(&time::format_description::well_known::Rfc3339) + .context("format created_at")?; + let now = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .context("format updated_at")?; + + tx.execute( + "INSERT INTO documents ( + doc_id, asset_id, workspace_path, title, lang, + source_type, trust_level, parser_version, + doc_version, schema_version, metadata_json, + provenance_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(doc_id) DO UPDATE SET + asset_id = excluded.asset_id, + workspace_path = excluded.workspace_path, + title = excluded.title, + lang = excluded.lang, + source_type = excluded.source_type, + trust_level = excluded.trust_level, + parser_version = excluded.parser_version, + -- doc_version: bump on update. excluded.doc_version is the + -- caller's submitted value; we ignore it and add 1 to the + -- existing column so each re-ingest cleanly increments. + doc_version = documents.doc_version + 1, + schema_version = excluded.schema_version, + metadata_json = excluded.metadata_json, + provenance_json = excluded.provenance_json, + updated_at = excluded.updated_at", + params![ + doc.doc_id.0, + doc.source_asset_id.0, + doc.workspace_path.0, + doc.title, + doc.lang.0, + source_type, + trust_level, + doc.parser_version.0, + doc.doc_version as i64, + doc.schema_version as i64, + metadata_json, + provenance_json, + created_at, + now, + ], + ) + .map_err(StoreError::from)?; + Ok(()) +} + +fn source_type_label(s: &kb_core::SourceType) -> &'static str { + match s { + kb_core::SourceType::Markdown => "markdown", + kb_core::SourceType::Note => "note", + kb_core::SourceType::Paper => "paper", + kb_core::SourceType::Reference => "reference", + kb_core::SourceType::Inbox => "inbox", + } +} + +fn trust_level_label(s: &kb_core::TrustLevel) -> &'static str { + match s { + kb_core::TrustLevel::Primary => "primary", + kb_core::TrustLevel::Secondary => "secondary", + kb_core::TrustLevel::Generated => "generated", + } +} + +fn replace_document_tags( + tx: &rusqlite::Transaction<'_>, + doc_id: &kb_core::DocumentId, + tags: &[String], +) -> Result<()> { + tx.execute("DELETE FROM document_tags WHERE doc_id = ?", params![doc_id.0]) + .map_err(StoreError::from)?; + let mut stmt = tx + .prepare( + "INSERT INTO document_tags (doc_id, tag) VALUES (?, ?) + ON CONFLICT(doc_id, tag) DO NOTHING", + ) + .map_err(StoreError::from)?; + for tag in tags { + stmt.execute(params![doc_id.0, tag]) + .map_err(StoreError::from)?; + } + Ok(()) +} + +struct BlockRow { + block_id: String, + doc_id: String, + kind: &'static str, + heading_path_json: String, + ordinal: i64, + source_span_json: String, + /// The full Block JSON — round-trip path for `get_document`. Also + /// future-proofs new variants without schema churn. + payload_json: String, +} + +fn block_to_row( + doc: &kb_core::DocumentId, + block: &kb_core::Block, + stream_ordinal: i64, +) -> Result { + let (block_id, kind, heading_path_json, source_span_json) = match block { + kb_core::Block::Heading(b) => ( + b.common.block_id.0.clone(), + "heading", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::Paragraph(b) | kb_core::Block::Quote(b) => ( + b.common.block_id.0.clone(), + // Discriminate Paragraph vs Quote on the enum tag: payload + // round-trip carries the variant, but the column needs a + // stable label for filtering. + if matches!(block, kb_core::Block::Paragraph(_)) { + "paragraph" + } else { + "quote" + }, + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::List(b) => ( + b.common.block_id.0.clone(), + "list", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::Code(b) => ( + b.common.block_id.0.clone(), + "code", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::Table(b) => ( + b.common.block_id.0.clone(), + "table", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::ImageRef(b) => ( + b.common.block_id.0.clone(), + "imageref", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::AudioRef(b) => ( + b.common.block_id.0.clone(), + "audioref", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + }; + let payload_json = serde_json::to_string(block).context("serialize block")?; + Ok(BlockRow { + block_id, + doc_id: doc.0.clone(), + kind, + heading_path_json, + ordinal: stream_ordinal, + source_span_json, + payload_json, + }) +} diff --git a/crates/kb-store-sqlite/src/error.rs b/crates/kb-store-sqlite/src/error.rs new file mode 100644 index 0000000..fd6e42c --- /dev/null +++ b/crates/kb-store-sqlite/src/error.rs @@ -0,0 +1,20 @@ +//! Crate-local error type per design §10. +//! +//! Boundary code (`kb-app`, `kb-cli`) flattens these into `anyhow::Error`, +//! so the trait impls return `anyhow::Result` directly. Internally we +//! still distinguish `Conflict` (e.g. checksum mismatch) from `Sqlx` / +//! `Migration` so callers that downcast can route refusal-style flows. + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum StoreError { + #[error("sqlite error: {0}")] + Sqlx(#[from] rusqlite::Error), + + #[error("migration error: {0}")] + Migration(String), + + #[error("conflict: {0}")] + Conflict(String), +} diff --git a/crates/kb-store-sqlite/src/jobs.rs b/crates/kb-store-sqlite/src/jobs.rs new file mode 100644 index 0000000..89d71dd --- /dev/null +++ b/crates/kb-store-sqlite/src/jobs.rs @@ -0,0 +1,252 @@ +//! `JobRepo` impl per design §7.2. +//! +//! The `jobs` table is the §5.7 schema. JobIds are minted via blake3 over +//! `(now, kind, payload)` so two `create` calls in the same millisecond +//! still distinguish. + +use anyhow::{Context, Result}; +use rusqlite::params; +use serde_json::Value; +use time::OffsetDateTime; + +use crate::error::StoreError; +use crate::store::SqliteStore; + +impl kb_core::JobRepo for SqliteStore { + fn create( + &self, + kind: kb_core::JobKind, + payload: Value, + ) -> Result { + let now_dt = OffsetDateTime::now_utc(); + let now = now_dt + .format(&time::format_description::well_known::Rfc3339) + .context("format job created_at")?; + // JobId recipe: stable hex over (kind, payload_canonical, ns). + // The nanosecond timestamp is included so two `create` calls with + // identical `(kind, payload)` still get distinct IDs. + let job_id = mint_job_id(&kind, &payload, now_dt); + let kind_label = job_kind_label(&kind); + let payload_json = serde_json::to_string(&payload) + .context("serialize job payload")?; + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + conn.execute( + "INSERT INTO jobs ( + job_id, kind, status, payload_json, progress_json, + error_json, created_at, updated_at, finished_at + ) VALUES (?, ?, 'pending', ?, NULL, NULL, ?, ?, NULL)", + params![job_id.0, kind_label, payload_json, now, now], + ) + .map_err(StoreError::from)?; + Ok(job_id) + } + + fn update_progress( + &self, + id: &kb_core::JobId, + progress: Value, + ) -> Result<()> { + let progress_json = serde_json::to_string(&progress) + .context("serialize job progress")?; + let now = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .context("format job updated_at")?; + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + // status='pending' → 'running' on first progress update; later + // progress calls keep status='running' until finish(). + conn.execute( + "UPDATE jobs SET + progress_json = ?, + status = CASE status WHEN 'pending' THEN 'running' ELSE status END, + updated_at = ? + WHERE job_id = ?", + params![progress_json, now, id.0], + ) + .map_err(StoreError::from)?; + Ok(()) + } + + fn finish( + &self, + id: &kb_core::JobId, + status: kb_core::JobStatus, + error: Option<&str>, + ) -> Result<()> { + let now = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .context("format job finished_at")?; + let status_label = job_status_label(&status); + let error_json = error + .map(|e| serde_json::to_string(&serde_json::json!({ "message": e }))) + .transpose() + .context("serialize job error")?; + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + conn.execute( + "UPDATE jobs SET + status = ?, + error_json = ?, + updated_at = ?, + finished_at = ? + WHERE job_id = ?", + params![status_label, error_json, now, now, id.0], + ) + .map_err(StoreError::from)?; + Ok(()) + } + + fn list( + &self, + filter: &kb_core::JobFilter, + ) -> Result> { + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut sql = String::from( + "SELECT job_id, kind, status, payload_json, progress_json, + error_json, created_at, updated_at, finished_at + FROM jobs WHERE 1=1", + ); + let mut params_dyn: Vec> = Vec::new(); + if let Some(status) = &filter.status { + sql.push_str(" AND status = ?"); + params_dyn.push(Box::new(job_status_label(status).to_string())); + } + if let Some(kind) = &filter.kind { + sql.push_str(" AND kind = ?"); + params_dyn.push(Box::new(job_kind_label(kind).to_string())); + } + sql.push_str(" ORDER BY created_at ASC"); + + let mut stmt = conn.prepare(&sql).map_err(StoreError::from)?; + let rows = stmt + .query_map( + rusqlite::params_from_iter(params_dyn.iter().map(|b| b.as_ref())), + job_row_from_sql, + ) + .map_err(StoreError::from)?; + let mut out = Vec::new(); + for r in rows { + out.push(r.map_err(StoreError::from)?); + } + Ok(out) + } +} + +/// Mint a JobId over (kind, canonical(payload), nanos). The 32-hex +/// invariant on `kb_core::JobId` is honored by taking the first 32 chars +/// of the blake3 hex. +fn mint_job_id( + kind: &kb_core::JobKind, + payload: &Value, + at: OffsetDateTime, +) -> kb_core::JobId { + // Plain serde_json::to_vec is enough — JobIds are not part of the + // §4.2 ID family and don't need canonical-JSON parity with other IDs. + // The nanosecond suffix is what guarantees uniqueness, not stable + // hashing. + let mut hasher = blake3::Hasher::new(); + hasher.update(job_kind_label(kind).as_bytes()); + if let Ok(bytes) = serde_json::to_vec(payload) { + hasher.update(&bytes); + } + hasher.update(&at.unix_timestamp_nanos().to_be_bytes()); + let hex = hasher.finalize().to_hex().to_string(); + kb_core::JobId(hex[..32].to_string()) +} + +fn job_kind_label(k: &kb_core::JobKind) -> &'static str { + match k { + kb_core::JobKind::Ingest => "ingest", + kb_core::JobKind::Chunk => "chunk", + kb_core::JobKind::Embed => "embed", + kb_core::JobKind::Ocr => "ocr", + kb_core::JobKind::Transcribe => "transcribe", + kb_core::JobKind::Reindex => "reindex", + kb_core::JobKind::Doctor => "doctor", + } +} + +fn job_status_label(s: &kb_core::JobStatus) -> &'static str { + match s { + kb_core::JobStatus::Pending => "pending", + kb_core::JobStatus::Running => "running", + kb_core::JobStatus::Succeeded => "succeeded", + kb_core::JobStatus::Failed => "failed", + kb_core::JobStatus::Canceled => "canceled", + } +} + +fn job_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let job_id: String = row.get(0)?; + let kind_raw: String = row.get(1)?; + let status_raw: String = row.get(2)?; + let payload_json: String = row.get(3)?; + let progress_json: Option = row.get(4)?; + let error_json: Option = row.get(5)?; + let created_at_raw: String = row.get(6)?; + let updated_at_raw: String = row.get(7)?; + let finished_at_raw: Option = row.get(8)?; + + let kind: kb_core::JobKind = + serde_json::from_value(serde_json::Value::String(kind_raw)) + .map_err(conv_err(1))?; + let status: kb_core::JobStatus = + serde_json::from_value(serde_json::Value::String(status_raw)) + .map_err(conv_err(2))?; + let payload: Value = serde_json::from_str(&payload_json).map_err(conv_err(3))?; + let progress: Option = match progress_json { + Some(s) => Some(serde_json::from_str(&s).map_err(conv_err(4))?), + None => None, + }; + // Surface the stored error message back as a plain string per the + // JobRow schema (§7.2). We stored `{"message": "..."}` for forward + // compatibility — pull `message` back out, or fall back to the raw + // JSON if the shape ever drifts. + let error: Option = match error_json { + Some(s) => match serde_json::from_str::(&s) { + Ok(v) => v + .get("message") + .and_then(Value::as_str) + .map(str::to_owned) + .or(Some(s)), + Err(_) => Some(s), + }, + None => None, + }; + + let created_at = OffsetDateTime::parse( + &created_at_raw, + &time::format_description::well_known::Rfc3339, + ) + .map_err(conv_err(6))?; + let updated_at = OffsetDateTime::parse( + &updated_at_raw, + &time::format_description::well_known::Rfc3339, + ) + .map_err(conv_err(7))?; + let finished_at = match finished_at_raw { + Some(s) => Some( + OffsetDateTime::parse(&s, &time::format_description::well_known::Rfc3339) + .map_err(conv_err(8))?, + ), + None => None, + }; + + Ok(kb_core::JobRow { + job_id: kb_core::JobId(job_id), + kind, + status, + payload, + progress, + error, + created_at, + updated_at, + finished_at, + }) +} + +fn conv_err( + col: usize, +) -> impl FnOnce(E) -> rusqlite::Error { + move |e| { + rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e)) + } +} diff --git a/crates/kb-store-sqlite/src/lib.rs b/crates/kb-store-sqlite/src/lib.rs new file mode 100644 index 0000000..1b35ecb --- /dev/null +++ b/crates/kb-store-sqlite/src/lib.rs @@ -0,0 +1,23 @@ +//! `kb-store-sqlite` — SQLite-backed implementations of +//! [`kb_core::DocumentStore`] and [`kb_core::JobRepo`] (§7.2), plus the +//! asset writer that copies (or references) raw bytes per design §5.2. +//! +//! Schema is owned by `migrations/V001__init.sql` (workspace root), which +//! ships the full §5 DDL minus the FTS5 virtual table + triggers (those +//! land in P2-1's `V002`). +//! +//! Allowed deps per task spec: `kb-core`, `kb-config`, `rusqlite`, +//! `refinery`, `serde_json`, `time`, `blake3`, `tracing`, `anyhow`, +//! `thiserror`. NOT allowed: `kb-parse-*`, `kb-normalize`, `kb-chunk`, +//! `kb-store-vector`, `kb-source-fs`, etc. (`kb-parse-md`, `kb-normalize`, +//! `kb-chunk` may appear as **dev-deps** — see `Cargo.toml` — to drive +//! the contract round-trip test off a real Markdown fixture.) + +mod error; +mod schema; +mod store; +mod documents; +mod jobs; + +pub use error::StoreError; +pub use store::SqliteStore; diff --git a/crates/kb-store-sqlite/src/schema.rs b/crates/kb-store-sqlite/src/schema.rs new file mode 100644 index 0000000..68ccb3b --- /dev/null +++ b/crates/kb-store-sqlite/src/schema.rs @@ -0,0 +1,14 @@ +//! Refinery migration bundle. The migrations live at the workspace +//! `migrations/` directory; refinery's `embed_migrations!` macro inlines +//! them at compile time so the binary needs no runtime SQL files. + +// `embed_migrations!` looks under the path relative to the package root +// (the crate's `Cargo.toml`). The workspace migrations dir is two levels +// up: `crates/kb-store-sqlite/Cargo.toml` → `../../migrations`. +refinery::embed_migrations!("../../migrations"); + +/// Re-export the runner under a stable name. Calling +/// `runner().run(&mut conn)?` applies all pending migrations. +pub fn runner() -> refinery::Runner { + migrations::runner() +} diff --git a/crates/kb-store-sqlite/src/store.rs b/crates/kb-store-sqlite/src/store.rs new file mode 100644 index 0000000..72a605e --- /dev/null +++ b/crates/kb-store-sqlite/src/store.rs @@ -0,0 +1,297 @@ +//! `SqliteStore` — open + run_migrations + asset writer. +//! +//! The store wraps a single `rusqlite::Connection` behind a +//! `std::sync::Mutex` so the public trait impls (which take `&self`) can +//! still issue mutating SQL. Concurrency is intentionally coarse for P1; +//! later phases can swap to a connection pool if measurement shows the +//! mutex on the hot path. + +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use anyhow::{Context, Result}; +use rusqlite::Connection; + +use crate::error::StoreError; +use crate::schema; + +/// Default file name under `config.storage.data_dir`. Kept private — the +/// path layout is a §6.3 design decision, not part of the store's public +/// surface. +const SQLITE_FILE: &str = "kb.sqlite"; + +/// Subdirectory under `data_dir` holding shard-prefixed asset bytes +/// (`/`). Mirrors design §6.3. +const ASSETS_SUBDIR: &str = "assets"; + +/// Length of the shard prefix: 2 hex chars → 256 buckets, plenty to keep +/// directory cardinality reasonable on workspaces with thousands of +/// assets without descending into hash-trees. +const ASSET_SHARD_LEN: usize = 2; + +/// Bytes-per-MiB conversion. Used by the asset writer to compare +/// `byte_len` against `storage.copy_threshold_mb`. +const BYTES_PER_MIB: u64 = 1024 * 1024; + +/// SQLite-backed kb store. +/// +/// Construct via [`SqliteStore::open`], then call +/// [`SqliteStore::run_migrations`] to apply the bundled `V001__init.sql` +/// before any read/write call. +pub struct SqliteStore { + /// Resolved absolute path to `data_dir`. Used by the asset writer. + pub(crate) data_dir: PathBuf, + /// Maximum asset size eligible for in-store copy; assets bigger than + /// this are recorded as `reference` and read from their source path. + pub(crate) copy_threshold_bytes: u64, + /// Single mutexed connection — see module docs for rationale. + pub(crate) conn: Mutex, +} + +impl SqliteStore { + /// Open (or create) the SQLite file under `config.storage.data_dir`, + /// apply pragmas (foreign_keys / WAL / synchronous=NORMAL / + /// temp_store=MEMORY), and create parent directories as needed. + /// **Does not run migrations** — call [`Self::run_migrations`] next. + pub fn open(config: &kb_config::Config) -> Result { + let data_dir = expand_data_dir(&config.storage.data_dir); + std::fs::create_dir_all(&data_dir) + .with_context(|| format!("create data_dir {}", data_dir.display()))?; + let db_path = data_dir.join(SQLITE_FILE); + + let conn = Connection::open(&db_path) + .with_context(|| format!("open sqlite at {}", db_path.display()))?; + apply_pragmas(&conn)?; + + tracing::debug!( + target: "kb-store-sqlite", + data_dir = %data_dir.display(), + db = %db_path.display(), + "opened sqlite store" + ); + + Ok(Self { + data_dir, + copy_threshold_bytes: config.storage.copy_threshold_mb * BYTES_PER_MIB, + conn: Mutex::new(conn), + }) + } + + /// Apply all pending migrations bundled at compile time + /// (`migrations/V001__init.sql` and any later additions). + pub fn run_migrations(&self) -> Result<()> { + let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + schema::runner() + .run(&mut *conn) + .map_err(|e| StoreError::Migration(e.to_string()))?; + tracing::debug!(target: "kb-store-sqlite", "migrations applied"); + Ok(()) + } + + /// Persist a `RawAsset` *with its raw bytes*: row goes into `assets`, + /// bytes go to `data_dir/assets//` if `byte_len ≤ + /// copy_threshold_mb`, otherwise the row records the source URI's + /// path and no copy is performed. + /// + /// In either branch, `blake3(bytes)` is recomputed and compared to + /// `asset.checksum.0`. A mismatch returns + /// `StoreError::Conflict` wrapped in `anyhow::Error`. + pub fn put_asset_with_bytes( + &self, + asset: &kb_core::RawAsset, + bytes: &[u8], + ) -> Result<()> { + // 1. Verify the caller's checksum matches what's actually on the + // wire. A drift here means the bytes the parser saw and the bytes + // we're about to durably store disagree — refuse persistence. + let computed = blake3::hash(bytes).to_hex().to_string(); + if computed != asset.checksum.0 { + return Err(StoreError::Conflict(format!( + "checksum mismatch: asset {} declares {} but bytes hash to {}", + asset.asset_id.0, asset.checksum.0, computed + )) + .into()); + } + + // 2. Decide copy vs. reference. The threshold compares the + // declared `byte_len` (caller-vouched) rather than `bytes.len()` + // because some sources stream and `byte_len` is authoritative. + let (storage_kind, storage_path) = if asset.byte_len <= self.copy_threshold_bytes { + let dest = self.assets_path_for(&asset.asset_id); + if let Some(parent) = dest.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("create asset shard dir {}", parent.display()) + })?; + } + std::fs::write(&dest, bytes) + .with_context(|| format!("write asset bytes to {}", dest.display()))?; + // Mirror §6.6: files 0o644. + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&dest)?.permissions(); + perms.set_mode(0o644); + std::fs::set_permissions(&dest, perms).with_context(|| { + format!("chmod 0o644 on {}", dest.display()) + })?; + } + ("copied", dest.to_string_lossy().into_owned()) + } else { + // Reference: caller's source path is recorded verbatim. We + // accept either a `File(path)` or `Kb(uri)` SourceUri; the + // latter stores the raw `kb://...` string. + let path = match &asset.source_uri { + kb_core::SourceUri::File(p) => p.to_string_lossy().into_owned(), + kb_core::SourceUri::Kb(u) => u.clone(), + }; + ("reference", path) + }; + + // 3. UPSERT the assets row. A second `put_asset_with_bytes` for + // the same asset_id (e.g. re-ingest) overwrites in place — the + // row is uniquely keyed by asset_id and re-derived from the + // RawAsset every time. + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + upsert_asset_row(&conn, asset, storage_kind, &storage_path)?; + Ok(()) + } + + /// Compute the `data_dir/assets//` path for an asset. + /// `` is the first [`ASSET_SHARD_LEN`] hex chars of `asset_id`. + pub(crate) fn assets_path_for(&self, asset_id: &kb_core::AssetId) -> PathBuf { + let id = &asset_id.0; + // Defensive: kb-core enforces 32 hex chars on AssetId construction + // (`FromStr` validates). If a future code path bypasses that, we + // fall back to the full id as the shard so we never panic on + // slicing. + let shard = if id.len() >= ASSET_SHARD_LEN { + &id[..ASSET_SHARD_LEN] + } else { + id.as_str() + }; + self.data_dir.join(ASSETS_SUBDIR).join(shard).join(id) + } +} + +/// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes` +/// path (which has bytes + computed `storage_kind/path`) and the +/// `DocumentStore::put_asset` path (which only has the `RawAsset` and +/// reads `storage_kind/path` from `asset.stored`). +pub(crate) fn upsert_asset_row( + conn: &Connection, + asset: &kb_core::RawAsset, + storage_kind: &str, + storage_path: &str, +) -> Result<()> { + let source_uri = match &asset.source_uri { + kb_core::SourceUri::File(p) => format!("file://{}", p.to_string_lossy()), + kb_core::SourceUri::Kb(u) => u.clone(), + }; + let media_type = serde_json::to_string(&asset.media_type) + .context("serialize media_type")?; + let discovered_at = asset + .discovered_at + .format(&time::format_description::well_known::Rfc3339) + .context("format discovered_at")?; + + conn.execute( + "INSERT INTO assets ( + asset_id, source_uri, workspace_path, media_type, byte_len, + checksum, storage_kind, storage_path, discovered_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(asset_id) DO UPDATE SET + source_uri = excluded.source_uri, + workspace_path = excluded.workspace_path, + media_type = excluded.media_type, + byte_len = excluded.byte_len, + checksum = excluded.checksum, + storage_kind = excluded.storage_kind, + storage_path = excluded.storage_path, + discovered_at = excluded.discovered_at", + rusqlite::params![ + asset.asset_id.0, + source_uri, + asset.workspace_path.0, + media_type, + asset.byte_len as i64, + asset.checksum.0, + storage_kind, + storage_path, + discovered_at, + ], + ) + .map_err(StoreError::from)?; + Ok(()) +} + +/// Apply the design §5 / task-spec pragmas. Called once per connection. +/// Note: WAL is persistent (the journal-mode setting is sticky in the DB +/// header) but `foreign_keys`, `synchronous`, and `temp_store` are +/// per-connection — they MUST be re-applied on every open. +fn apply_pragmas(conn: &Connection) -> Result<()> { + conn.pragma_update(None, "foreign_keys", "ON")?; + // `journal_mode = WAL` returns the current mode as a row; use + // `pragma_query_value` semantics via `query_row` to allow that. + conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?; + conn.pragma_update(None, "synchronous", "NORMAL")?; + conn.pragma_update(None, "temp_store", "MEMORY")?; + Ok(()) +} + +/// Expand the placeholders / `~` / env-vars used by `Config::storage.data_dir`. +/// +/// Supported substitutions, in order: +/// - `${XDG_DATA_HOME:-~/.local/share}` (and the bare `${XDG_DATA_HOME}`) +/// - leading `~` → `$HOME` +/// +/// If neither produces an absolute path, the input is returned as-is +/// (relative paths are kept relative to the caller's CWD). +fn expand_data_dir(raw: &str) -> PathBuf { + let mut s = raw.to_string(); + + // ${XDG_DATA_HOME:-~/.local/share}: respect the env override, else + // fall back to the suffix after `:-`. + if let Some(start) = s.find("${XDG_DATA_HOME") { + if let Some(rel_end) = s[start..].find('}') { + let end = start + rel_end + 1; // include trailing '}' + let inner = &s[start + 2..end - 1]; // strip ${ and } + let replacement = match std::env::var("XDG_DATA_HOME") { + Ok(v) if !v.is_empty() => v, + _ => { + // inner is e.g. `XDG_DATA_HOME:-~/.local/share`. + if let Some((_, default)) = inner.split_once(":-") { + default.to_string() + } else { + // No default supplied; mimic Bash and yield "". + String::new() + } + } + }; + s.replace_range(start..end, &replacement); + } + } + + // ~ at the front → $HOME (or `dirs::home_dir`). + if let Some(rest) = s.strip_prefix('~') { + if let Some(home) = std::env::var_os("HOME").map(PathBuf::from).or_else(dirs_home_fallback) + { + return home.join(rest.trim_start_matches('/')); + } + } + + PathBuf::from(s) +} + +/// Tiny shim to avoid pulling in the `dirs` crate as a direct dep — we +/// only fall back when `$HOME` is unset, which is exotic on the platforms +/// we target. Returns `None` so the caller keeps the literal `~`. +fn dirs_home_fallback() -> Option { + None +} + +/// Returns the root of the assets shard tree (`data_dir/assets/`). Used +/// by tests; kept crate-private otherwise. +#[allow(dead_code)] +pub(crate) fn assets_root(data_dir: &Path) -> PathBuf { + data_dir.join(ASSETS_SUBDIR) +} diff --git a/migrations/V001__init.sql b/migrations/V001__init.sql index 2db2d5e..96fe4ee 100644 --- a/migrations/V001__init.sql +++ b/migrations/V001__init.sql @@ -1,7 +1,10 @@ --- V001__init.sql — schema bootstrap. --- Per design §5.1 + §5.9. Only the meta + migrations tables land here; --- data tables (assets, documents, blocks, chunks, fts5, …) ship in later --- phase-specific migrations (P1-6 / P2-1 / P3-3). +-- V001__init.sql — full P1 schema bootstrap. +-- Per design §5.1 (meta), §5.2 (assets), §5.3 (documents/document_tags), +-- §5.4 (blocks), §5.5 (chunks — FTS5 virtual table + triggers DEFERRED to +-- V002 in P2-1), §5.6 (embedding_records), §5.7 (jobs / ingest_runs / +-- answers / eval_runs / eval_query_results). + +-- §5.1 Migrations meta ------------------------------------------------------- CREATE TABLE schema_meta ( key TEXT PRIMARY KEY, @@ -13,3 +16,167 @@ CREATE TABLE migrations ( applied_at TEXT NOT NULL, description TEXT NOT NULL ); + +-- §5.2 Assets ---------------------------------------------------------------- + +CREATE TABLE assets ( + asset_id TEXT PRIMARY KEY, + source_uri TEXT NOT NULL, + workspace_path TEXT NOT NULL, + media_type TEXT NOT NULL, + byte_len INTEGER NOT NULL, + checksum TEXT NOT NULL, + storage_kind TEXT NOT NULL CHECK (storage_kind IN ('copied','reference')), + storage_path TEXT NOT NULL, + discovered_at TEXT NOT NULL +); +CREATE UNIQUE INDEX idx_assets_workspace_path ON assets(workspace_path); +CREATE INDEX idx_assets_media_type ON assets(media_type); + +-- §5.3 Documents ------------------------------------------------------------- + +CREATE TABLE documents ( + doc_id TEXT PRIMARY KEY, + asset_id TEXT NOT NULL REFERENCES assets(asset_id) ON DELETE RESTRICT, + workspace_path TEXT NOT NULL, + title TEXT, + lang TEXT, + source_type TEXT NOT NULL, + trust_level TEXT NOT NULL, + parser_version TEXT NOT NULL, + doc_version INTEGER NOT NULL, + schema_version INTEGER NOT NULL, + metadata_json TEXT NOT NULL, + provenance_json TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); +CREATE UNIQUE INDEX idx_docs_workspace_path ON documents(workspace_path); +CREATE INDEX idx_docs_lang ON documents(lang); +CREATE INDEX idx_docs_source_type ON documents(source_type); + +CREATE TABLE document_tags ( + doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE, + tag TEXT NOT NULL, + PRIMARY KEY (doc_id, tag) +); +CREATE INDEX idx_document_tags_tag ON document_tags(tag); + +-- §5.4 Blocks ---------------------------------------------------------------- + +CREATE TABLE blocks ( + block_id TEXT PRIMARY KEY, + doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE, + kind TEXT NOT NULL, + heading_path_json TEXT NOT NULL, + ordinal INTEGER NOT NULL, + source_span_json TEXT NOT NULL, + payload_json TEXT NOT NULL +); +CREATE INDEX idx_blocks_doc_id ON blocks(doc_id); + +-- §5.5 Chunks (FTS5 virtual table + triggers deferred to V002 / P2-1) ------- + +CREATE TABLE chunks ( + chunk_id TEXT PRIMARY KEY, + doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE, + text TEXT NOT NULL, + heading_path_json TEXT NOT NULL, + section_label TEXT, + source_spans_json TEXT NOT NULL, + token_estimate INTEGER NOT NULL, + chunker_version TEXT NOT NULL, + policy_hash TEXT NOT NULL, + block_ids_json TEXT NOT NULL, + created_at TEXT NOT NULL +); +CREATE INDEX idx_chunks_doc_id ON chunks(doc_id); +CREATE INDEX idx_chunks_chunker_version ON chunks(chunker_version); + +-- §5.6 Embedding records (P3 — table empty in P1, present for forward compat) - + +CREATE TABLE embedding_records ( + embedding_id TEXT PRIMARY KEY, + chunk_id TEXT NOT NULL REFERENCES chunks(chunk_id) ON DELETE CASCADE, + model_id TEXT NOT NULL, + model_version TEXT NOT NULL, + dimensions INTEGER NOT NULL, + lance_table TEXT NOT NULL, + created_at TEXT NOT NULL, + UNIQUE(chunk_id, model_id, model_version, dimensions) +); +CREATE INDEX idx_embed_chunk ON embedding_records(chunk_id); +CREATE INDEX idx_embed_model ON embedding_records(model_id, model_version, dimensions); + +-- §5.7 Jobs / IngestRuns / Answers / EvalRuns ------------------------------- + +CREATE TABLE jobs ( + job_id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN ('pending','running','succeeded','failed','canceled')), + payload_json TEXT NOT NULL, + progress_json TEXT, + error_json TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + finished_at TEXT +); +CREATE INDEX idx_jobs_status ON jobs(status); +CREATE INDEX idx_jobs_kind ON jobs(kind); + +CREATE TABLE ingest_runs ( + run_id TEXT PRIMARY KEY, + scope_json TEXT NOT NULL, + scanned INTEGER NOT NULL, + new_count INTEGER NOT NULL, + updated_count INTEGER NOT NULL, + skipped_count INTEGER NOT NULL, + error_count INTEGER NOT NULL, + duration_ms INTEGER NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT NOT NULL, + items_json TEXT +); + +CREATE TABLE answers ( + trace_id TEXT PRIMARY KEY, + query TEXT NOT NULL, + answer TEXT NOT NULL, + grounded INTEGER NOT NULL, + refusal_reason TEXT, + model_id TEXT NOT NULL, + model_provider TEXT NOT NULL, + embedding_model_id TEXT, + embedding_dimensions INTEGER, + prompt_template_version TEXT NOT NULL, + retrieval_mode TEXT NOT NULL, + retrieval_k INTEGER NOT NULL, + score_gate REAL NOT NULL, + top_score REAL NOT NULL, + chunks_returned INTEGER NOT NULL, + chunks_used INTEGER NOT NULL, + citations_json TEXT NOT NULL, + packed_chunks_json TEXT, + prompt_tokens INTEGER, + completion_tokens INTEGER, + latency_ms INTEGER, + created_at TEXT NOT NULL +); +CREATE INDEX idx_answers_created_at ON answers(created_at); +CREATE INDEX idx_answers_grounded ON answers(grounded); + +CREATE TABLE eval_runs ( + run_id TEXT PRIMARY KEY, + suite TEXT NOT NULL, + config_snapshot_json TEXT NOT NULL, + aggregate_json TEXT NOT NULL, + commit_hash TEXT, + created_at TEXT NOT NULL +); + +CREATE TABLE eval_query_results ( + run_id TEXT NOT NULL REFERENCES eval_runs(run_id) ON DELETE CASCADE, + query_id TEXT NOT NULL, + result_json TEXT NOT NULL, + PRIMARY KEY (run_id, query_id) +);