From 16b2a5c15039dd84f068c3da5bd51958f71b9c43 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:02:11 +0000 Subject: [PATCH 1/9] kb-core: add policy_hash field to Chunk struct (P1-6 schema reconcile) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add policy_hash: String to kb_core::Chunk to align with the §5.5 SQLite schema (chunks.policy_hash NOT NULL), so kb-store-sqlite persistence is a straight field copy rather than a recompute. This is a §9 schema migration: - §5.5 (the persistence schema) is authoritative. - §3.5 (the domain model) must accommodate. The chunker already computed policy_hash for the chunk_id recipe (§4.2); P1-5 stored it implicitly. We now hold it explicitly on the Chunk so any DocumentStore::put_chunks impl can read it directly. Follow-up commits update kb-chunk to populate the field and refresh the P1-5 snapshot baseline accordingly. --- crates/kb-core/src/chunk.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/kb-core/src/chunk.rs b/crates/kb-core/src/chunk.rs index 1c3b0aa..5c0db0f 100644 --- a/crates/kb-core/src/chunk.rs +++ b/crates/kb-core/src/chunk.rs @@ -6,6 +6,12 @@ use crate::document::SourceSpan; use crate::ids::{BlockId, ChunkId, DocumentId}; use crate::versions::ChunkerVersion; +/// A unit of retrievable text per design §3.5 + §5.5. +/// +/// `policy_hash` is the chunker's hex digest of the active `ChunkPolicy` +/// (e.g. `target_tokens`, `overlap_tokens`). It mirrors the §5.5 SQLite +/// schema column so persistence is a straight copy, and feeds the +/// `chunk_id` recipe (§4.2) so policy edits invalidate downstream IDs. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Chunk { pub chunk_id: ChunkId, @@ -16,4 +22,5 @@ pub struct Chunk { pub source_spans: Vec, pub token_estimate: usize, pub chunker_version: ChunkerVersion, + pub policy_hash: String, } From 094c4641baf1d65823517b25d93d940eff543453 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:02:17 +0000 Subject: [PATCH 2/9] kb-chunk: populate Chunk.policy_hash field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Set the new policy_hash field on every emitted Chunk to the same hex already computed for the chunk_id recipe (§4.2). No recipe / chunk_id change — only the field on the struct is now populated. Pairs with the kb-core hotfix (preceding commit) and unblocks P1-6's DocumentStore::put_chunks to read chunk.policy_hash directly per §5.5. --- crates/kb-chunk/src/md_heading_v1.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/kb-chunk/src/md_heading_v1.rs b/crates/kb-chunk/src/md_heading_v1.rs index cf4c82a..7626ea5 100644 --- a/crates/kb-chunk/src/md_heading_v1.rs +++ b/crates/kb-chunk/src/md_heading_v1.rs @@ -355,6 +355,7 @@ fn build_chunk( source_spans, token_estimate, chunker_version: chunker_version.clone(), + policy_hash: policy_hash.to_string(), } } From 207a0ff61e5c743b0d3affe7090167b1e36f082d Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:02:53 +0000 Subject: [PATCH 3/9] kb-chunk: regenerate long-section.chunks.snapshot.json baseline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The snapshot now includes the policy_hash field on every Chunk per the preceding kb-core schema change. chunk_ids are unchanged because the chunk_id recipe (§4.2) already incorporated policy_hash via the chunker — the field is simply now visible in the wire form. Regenerated via: UPDATE_SNAPSHOTS=1 cargo test -p kb-chunk long_section_chunks_snapshot --- fixtures/markdown/long-section.chunks.snapshot.json | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/fixtures/markdown/long-section.chunks.snapshot.json b/fixtures/markdown/long-section.chunks.snapshot.json index ef27566..1ea045b 100644 --- a/fixtures/markdown/long-section.chunks.snapshot.json +++ b/fixtures/markdown/long-section.chunks.snapshot.json @@ -11,6 +11,7 @@ "heading_path": [ "Alpha" ], + "policy_hash": "de6868f3b7949242", "source_spans": [ { "end": 1, @@ -43,6 +44,7 @@ "Alpha", "Alpha Sub" ], + "policy_hash": "de6868f3b7949242", "source_spans": [ { "end": 7, @@ -69,6 +71,7 @@ "Alpha", "Alpha Sub" ], + "policy_hash": "de6868f3b7949242", "source_spans": [ { "end": 53, @@ -90,6 +93,7 @@ "heading_path": [ "Beta" ], + "policy_hash": "de6868f3b7949242", "source_spans": [ { "end": 55, @@ -115,6 +119,7 @@ "heading_path": [ "Beta" ], + "policy_hash": "de6868f3b7949242", "source_spans": [ { "end": 64, @@ -135,6 +140,7 @@ "heading_path": [ "Beta" ], + "policy_hash": "de6868f3b7949242", "source_spans": [ { "end": 66, @@ -157,6 +163,7 @@ "heading_path": [ "Gamma" ], + "policy_hash": "de6868f3b7949242", "source_spans": [ { "end": 68, @@ -188,6 +195,7 @@ "heading_path": [ "Gamma" ], + "policy_hash": "de6868f3b7949242", "source_spans": [ { "end": 72, From a3390d5171eda9c07b7e387a68bdd80ba7a451a3 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:08:36 +0000 Subject: [PATCH 4/9] =?UTF-8?q?p1-6:=20scaffold=20kb-store-sqlite=20crate?= =?UTF-8?q?=20+=20V001=20full=20=C2=A75=20DDL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New workspace member crate `kb-store-sqlite` (allowed deps only: kb-core, kb-config, rusqlite[bundled], refinery, serde, serde_json, time, blake3, tracing, anyhow, thiserror; dev-deps add kb-parse-md / kb-normalize / kb-chunk for the contract round-trip test). Migration V001 replaces the P0-1 stub with the full §5 DDL (assets, documents, document_tags, blocks, chunks with policy_hash, embedding_records, jobs, ingest_runs, answers, eval_runs, eval_query_results) plus the §5 indexes. FTS5 virtual table + triggers remain deferred to V002 (P2-1). Public surface per task spec: SqliteStore::open / run_migrations / put_asset_with_bytes impl DocumentStore for SqliteStore (7 trait methods) impl JobRepo for SqliteStore (4 trait methods) StoreError { Sqlx, Migration, Conflict } Behavior: - Pragmas at open: foreign_keys=ON, journal_mode=WAL, synchronous=NORMAL, temp_store=MEMORY. - Asset writer: byte_len ≤ copy_threshold_mb * 1MiB → copy to data_dir/assets// (mode 0o644 on Unix), else reference. blake3(bytes) verified against asset.checksum; mismatch → Conflict. - Idempotency: put_document UPSERTs and bumps doc_version + 1 on conflict; put_blocks / put_chunks DELETE-then-INSERT; document_tags re-derived per put_document. - get_document rehydrates blocks via payload_json ordered by stream ordinal. - list_documents builds dynamic WHERE from DocFilter (lang / trust_min / path_glob via GLOB / tags_any via document_tags subquery). - JobRepo: jobs.kind/status are stored as lowercase enum tags; create mints a 32-hex JobId via blake3(kind || payload || nanos). Tests follow in subsequent commits. --- Cargo.lock | 452 +++++++++++++++++ Cargo.toml | 1 + crates/kb-store-sqlite/Cargo.toml | 35 ++ crates/kb-store-sqlite/src/documents.rs | 641 ++++++++++++++++++++++++ crates/kb-store-sqlite/src/error.rs | 20 + crates/kb-store-sqlite/src/jobs.rs | 252 ++++++++++ crates/kb-store-sqlite/src/lib.rs | 23 + crates/kb-store-sqlite/src/schema.rs | 14 + crates/kb-store-sqlite/src/store.rs | 297 +++++++++++ migrations/V001__init.sql | 175 ++++++- 10 files changed, 1906 insertions(+), 4 deletions(-) create mode 100644 crates/kb-store-sqlite/Cargo.toml create mode 100644 crates/kb-store-sqlite/src/documents.rs create mode 100644 crates/kb-store-sqlite/src/error.rs create mode 100644 crates/kb-store-sqlite/src/jobs.rs create mode 100644 crates/kb-store-sqlite/src/lib.rs create mode 100644 crates/kb-store-sqlite/src/schema.rs create mode 100644 crates/kb-store-sqlite/src/store.rs diff --git a/Cargo.lock b/Cargo.lock index 11bdbb3..d01b60a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -79,6 +91,17 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -286,6 +309,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.15.0" @@ -308,6 +342,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.4.1" @@ -329,6 +375,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fst" version = "0.4.7" @@ -415,6 +470,9 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] [[package]] name = "hashbrown" @@ -431,18 +489,129 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + [[package]] name = "id-arena" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "ignore" version = "0.4.25" @@ -649,6 +818,27 @@ dependencies = [ "walkdir", ] +[[package]] +name = "kb-store-sqlite" +version = "0.1.0" +dependencies = [ + "anyhow", + "blake3", + "kb-chunk", + "kb-config", + "kb-core", + "kb-normalize", + "kb-parse-md", + "refinery", + "rusqlite", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.18", + "time", + "tracing", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -676,6 +866,17 @@ dependencies = [ "libc", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "lingua" version = "1.8.0" @@ -744,6 +945,12 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" +[[package]] +name = "litemap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" + [[package]] name = "lock_api" version = "0.4.14" @@ -835,12 +1042,33 @@ dependencies = [ "windows-link", ] +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + [[package]] name = "pin-project-lite" version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + +[[package]] +name = "potential_utf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" +dependencies = [ + "zerovec", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -938,6 +1166,50 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "refinery" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba5d693abf62492c37268512ff35b77655d2e957ca53dab85bf993fe9172d15" +dependencies = [ + "refinery-core", + "refinery-macros", +] + +[[package]] +name = "refinery-core" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a83581f18c1a4c3a6ebd7a174bdc665f17f618d79f7edccb6a0ac67e660b319" +dependencies = [ + "async-trait", + "cfg-if", + "log", + "regex", + "rusqlite", + "serde", + "siphasher", + "thiserror 1.0.69", + "time", + "toml", + "url", + "walkdir", +] + +[[package]] +name = "refinery-macros" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72c225407d8e52ef8cf094393781ecda9a99d6544ec28d90a6915751de259264" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "refinery-core", + "regex", + "syn", +] + [[package]] name = "regex" version = "1.12.3" @@ -967,6 +1239,20 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustix" version = "1.1.4" @@ -1121,6 +1407,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "siphasher" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" + [[package]] name = "slab" version = "0.4.12" @@ -1133,6 +1425,12 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "strsim" version = "0.11.1" @@ -1174,6 +1472,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tempfile" version = "3.27.0" @@ -1267,6 +1576,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinystr" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinyvec" version = "1.11.0" @@ -1443,6 +1762,24 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1455,6 +1792,18 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" @@ -1761,6 +2110,109 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "writeable" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" + +[[package]] +name = "yoke" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerocopy" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerofrom" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/Cargo.toml b/Cargo.toml index b09dfed..b957c22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/kb-parse-md", "crates/kb-normalize", "crates/kb-chunk", + "crates/kb-store-sqlite", "crates/kb-app", "crates/kb-cli", ] diff --git a/crates/kb-store-sqlite/Cargo.toml b/crates/kb-store-sqlite/Cargo.toml new file mode 100644 index 0000000..b0862ef --- /dev/null +++ b/crates/kb-store-sqlite/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "kb-store-sqlite" +version = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +description = "SQLite-backed DocumentStore + JobRepo for kb (§5 DDL, §7.2 traits)" + +[dependencies] +kb-core = { path = "../kb-core" } +kb-config = { path = "../kb-config" } +# `bundled` ships SQLite source + builds in-tree (no system libsqlite3). +# Explicitly NOT `bundled-sqlcipher` per task allowed-deps list. +rusqlite = { version = "0.32", features = ["bundled"] } +refinery = { version = "0.8", features = ["rusqlite"] } +serde = { workspace = true } +serde_json = { workspace = true } +time = { workspace = true } +blake3 = { workspace = true } +tracing = { workspace = true } +anyhow = { workspace = true } +thiserror = { workspace = true } + +[dev-dependencies] +tempfile = "3" +serde_json = { workspace = true } +# kb-parse-md / kb-normalize / kb-chunk are dev-only — used to build a +# CanonicalDocument + Vec from a fixture in the contract round-trip +# test. Forbidden as regular deps per design §8 (store consumes domain +# types from kb-core only); `cargo tree -p kb-store-sqlite --depth 1` +# (default scope, excludes dev-deps) confirms this. +kb-parse-md = { path = "../kb-parse-md" } +kb-normalize = { path = "../kb-normalize" } +kb-chunk = { path = "../kb-chunk" } diff --git a/crates/kb-store-sqlite/src/documents.rs b/crates/kb-store-sqlite/src/documents.rs new file mode 100644 index 0000000..39cdeba --- /dev/null +++ b/crates/kb-store-sqlite/src/documents.rs @@ -0,0 +1,641 @@ +//! `DocumentStore` impl: assets, documents, document_tags, blocks, chunks. +//! +//! Transactions: per design §5.8, one ingest of one document is one +//! transaction. We expose the raw trait methods at fine granularity (so +//! `kb-app` can compose), and each one wraps its own short transaction. +//! A higher-level `ingest_document` helper that wraps put_document + +//! put_blocks + put_chunks in a single tx is intentionally NOT shipped in +//! P1-6 — `kb-app` (P1's caller layer) is the right place to compose. +//! +//! Idempotency: re-ingesting `(workspace_path, asset_id, parser_version)` +//! UPSERTs the documents row, bumps `doc_version`, and replaces all +//! blocks / chunks / document_tags. No row duplication. + +use anyhow::{Context, Result}; +use rusqlite::params; +use time::OffsetDateTime; + +use crate::error::StoreError; +use crate::store::{SqliteStore, upsert_asset_row}; + +impl kb_core::DocumentStore for SqliteStore { + fn put_asset(&self, asset: &kb_core::RawAsset) -> Result<()> { + // No bytes here — read storage_kind/storage_path from the + // RawAsset's `stored` field per its convention (§3.3). Callers + // that have raw bytes go through `put_asset_with_bytes` instead; + // this branch is for the case where bytes were already persisted + // (or referenced) and we just want to record the row. + let (storage_kind, storage_path) = match &asset.stored { + kb_core::AssetStorage::Copied { path } => { + ("copied", path.to_string_lossy().into_owned()) + } + kb_core::AssetStorage::Reference { path, .. } => { + ("reference", path.to_string_lossy().into_owned()) + } + }; + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + upsert_asset_row(&conn, asset, storage_kind, &storage_path) + } + + fn put_document(&self, doc: &kb_core::CanonicalDocument) -> Result<()> { + let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let tx = conn.transaction().map_err(StoreError::from)?; + upsert_document(&tx, doc)?; + replace_document_tags(&tx, &doc.doc_id, &doc.metadata.tags)?; + tx.commit().map_err(StoreError::from)?; + Ok(()) + } + + fn put_blocks( + &self, + doc: &kb_core::DocumentId, + blocks: &[kb_core::Block], + ) -> Result<()> { + let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let tx = conn.transaction().map_err(StoreError::from)?; + // DELETE-then-INSERT: §5.4 has no UNIQUE on (doc_id, ordinal) + // so we cannot rely on UPSERT to surface block_id collisions. The + // simplest correct path is to wipe and re-insert; the §5.8 + // per-document transaction wraps both halves so a partial state + // never lands. + tx.execute("DELETE FROM blocks WHERE doc_id = ?", params![doc.0]) + .map_err(StoreError::from)?; + let mut stmt = tx + .prepare( + "INSERT INTO blocks ( + block_id, doc_id, kind, heading_path_json, + ordinal, source_span_json, payload_json + ) VALUES (?, ?, ?, ?, ?, ?, ?)", + ) + .map_err(StoreError::from)?; + // Ordinal here is the position of the block in the document's + // overall block stream — used for sort-on-load, not the §4.3 + // (heading_path, kind)-scoped ordinal that fed `block_id`. + for (i, block) in blocks.iter().enumerate() { + let row = block_to_row(doc, block, i as i64)?; + stmt.execute(params![ + row.block_id, + row.doc_id, + row.kind, + row.heading_path_json, + row.ordinal, + row.source_span_json, + row.payload_json, + ]) + .map_err(StoreError::from)?; + } + drop(stmt); + tx.commit().map_err(StoreError::from)?; + Ok(()) + } + + fn put_chunks( + &self, + doc: &kb_core::DocumentId, + chunks: &[kb_core::Chunk], + ) -> Result<()> { + let now = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .context("format chunk created_at")?; + let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let tx = conn.transaction().map_err(StoreError::from)?; + tx.execute("DELETE FROM chunks WHERE doc_id = ?", params![doc.0]) + .map_err(StoreError::from)?; + let mut stmt = tx + .prepare( + "INSERT INTO chunks ( + chunk_id, doc_id, text, heading_path_json, + section_label, source_spans_json, token_estimate, + chunker_version, policy_hash, block_ids_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .map_err(StoreError::from)?; + for chunk in chunks { + let heading_path = serde_json::to_string(&chunk.heading_path) + .context("serialize chunk.heading_path")?; + let source_spans = serde_json::to_string(&chunk.source_spans) + .context("serialize chunk.source_spans")?; + let block_ids = serde_json::to_string(&chunk.block_ids) + .context("serialize chunk.block_ids")?; + // §5.5 has a `section_label` column but the in-memory Chunk + // struct does not carry it (nor does the wire schema §2.6). + // Persist NULL until a future bump introduces the field. + stmt.execute(params![ + chunk.chunk_id.0, + chunk.doc_id.0, + chunk.text, + heading_path, + Option::::None, + source_spans, + chunk.token_estimate as i64, + chunk.chunker_version.0, + chunk.policy_hash, + block_ids, + now, + ]) + .map_err(StoreError::from)?; + } + drop(stmt); + tx.commit().map_err(StoreError::from)?; + Ok(()) + } + + fn get_document( + &self, + id: &kb_core::DocumentId, + ) -> Result> { + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let row: Option = conn + .query_row( + "SELECT + doc_id, asset_id, workspace_path, title, lang, + source_type, trust_level, parser_version, + doc_version, schema_version, metadata_json, + provenance_json, created_at, updated_at + FROM documents WHERE doc_id = ?", + params![id.0], + document_row_from_sql, + ) + .map(Some) + .or_else(rows_optional) + .map_err(StoreError::from)?; + let Some(row) = row else { return Ok(None) }; + + // Rehydrate blocks. Sort by stream-ordinal so the returned + // CanonicalDocument matches the order originally persisted. + let mut blocks_stmt = conn + .prepare( + "SELECT payload_json FROM blocks + WHERE doc_id = ? ORDER BY ordinal ASC", + ) + .map_err(StoreError::from)?; + let block_rows = blocks_stmt + .query_map(params![id.0], |r| { + let payload_json: String = r.get(0)?; + Ok(payload_json) + }) + .map_err(StoreError::from)?; + let mut blocks: Vec = Vec::new(); + for row in block_rows { + let payload_json = row.map_err(StoreError::from)?; + let block: kb_core::Block = serde_json::from_str(&payload_json) + .context("deserialize block payload_json")?; + blocks.push(block); + } + + let metadata: kb_core::Metadata = serde_json::from_str(&row.metadata_json) + .context("deserialize metadata_json")?; + let provenance: kb_core::Provenance = + serde_json::from_str(&row.provenance_json) + .context("deserialize provenance_json")?; + + Ok(Some(kb_core::CanonicalDocument { + doc_id: kb_core::DocumentId(row.doc_id), + source_asset_id: kb_core::AssetId(row.asset_id), + workspace_path: kb_core::WorkspacePath(row.workspace_path), + title: row.title.unwrap_or_default(), + lang: kb_core::Lang(row.lang.unwrap_or_default()), + blocks, + metadata, + provenance, + parser_version: kb_core::ParserVersion(row.parser_version), + schema_version: row.schema_version as u32, + doc_version: row.doc_version as u32, + })) + } + + fn get_chunk(&self, id: &kb_core::ChunkId) -> Result> { + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let row = conn + .query_row( + "SELECT + chunk_id, doc_id, text, heading_path_json, + source_spans_json, token_estimate, chunker_version, + policy_hash, block_ids_json + FROM chunks WHERE chunk_id = ?", + params![id.0], + chunk_row_from_sql, + ) + .map(Some) + .or_else(rows_optional) + .map_err(StoreError::from)?; + let Some(row) = row else { return Ok(None) }; + let heading_path: Vec = serde_json::from_str(&row.heading_path_json) + .context("deserialize chunk.heading_path_json")?; + let source_spans: Vec = + serde_json::from_str(&row.source_spans_json) + .context("deserialize chunk.source_spans_json")?; + let block_ids: Vec = + serde_json::from_str(&row.block_ids_json) + .context("deserialize chunk.block_ids_json")?; + Ok(Some(kb_core::Chunk { + chunk_id: kb_core::ChunkId(row.chunk_id), + doc_id: kb_core::DocumentId(row.doc_id), + block_ids, + text: row.text, + heading_path, + source_spans, + token_estimate: row.token_estimate as usize, + chunker_version: kb_core::ChunkerVersion(row.chunker_version), + policy_hash: row.policy_hash, + })) + } + + fn list_documents( + &self, + filter: &kb_core::DocFilter, + ) -> Result> { + // Build a dynamic WHERE clause from the filter. Each condition + // appends one positional `?` placeholder and one `Box` to `params` so order stays in sync. + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut sql = String::from( + "SELECT d.doc_id, d.workspace_path, d.title, d.lang, + d.source_type, d.trust_level, d.parser_version, + d.created_at, d.updated_at, + a.byte_len, + (SELECT COUNT(*) FROM chunks c WHERE c.doc_id = d.doc_id) AS chunk_count, + -- chunker_version: assume one chunker per doc; pick + -- any row's value. NULL if no chunks yet. + (SELECT chunker_version FROM chunks c2 + WHERE c2.doc_id = d.doc_id LIMIT 1) AS chunker_version + FROM documents d + JOIN assets a ON a.asset_id = d.asset_id + WHERE 1=1", + ); + let mut params_dyn: Vec> = Vec::new(); + + if let Some(lang) = &filter.lang { + sql.push_str(" AND d.lang = ?"); + params_dyn.push(Box::new(lang.0.clone())); + } + if let Some(trust_min) = &filter.trust_min { + // Map the enum to its rank: Generated < Secondary < Primary. + // (Higher trust strictly contains lower trust.) + sql.push_str(" AND CASE d.trust_level + WHEN 'primary' THEN 3 + WHEN 'secondary' THEN 2 + WHEN 'generated' THEN 1 + ELSE 0 + END >= ?"); + let rank: i64 = match trust_min { + kb_core::TrustLevel::Primary => 3, + kb_core::TrustLevel::Secondary => 2, + kb_core::TrustLevel::Generated => 1, + }; + params_dyn.push(Box::new(rank)); + } + if let Some(glob) = &filter.path_glob { + sql.push_str(" AND d.workspace_path GLOB ?"); + params_dyn.push(Box::new(glob.clone())); + } + if !filter.tags_any.is_empty() { + // INTERSECT-style filter: doc must own at least one of the + // requested tags. Use IN with a placeholder list. + sql.push_str(" AND d.doc_id IN (SELECT doc_id FROM document_tags WHERE tag IN ("); + for (i, tag) in filter.tags_any.iter().enumerate() { + if i > 0 { + sql.push(','); + } + sql.push('?'); + params_dyn.push(Box::new(tag.clone())); + } + sql.push_str("))"); + } + sql.push_str(" ORDER BY d.workspace_path"); + + let mut stmt = conn.prepare(&sql).map_err(StoreError::from)?; + let rows = stmt + .query_map( + rusqlite::params_from_iter(params_dyn.iter().map(|b| b.as_ref())), + doc_summary_from_sql, + ) + .map_err(StoreError::from)?; + let mut out = Vec::new(); + for r in rows { + let summary = r.map_err(StoreError::from)?; + // tags filter at row-load time: pull the tag list per doc. + let mut tag_stmt = conn + .prepare("SELECT tag FROM document_tags WHERE doc_id = ? ORDER BY tag") + .map_err(StoreError::from)?; + let tag_iter = tag_stmt + .query_map(params![summary.doc_id.0], |r| r.get::<_, String>(0)) + .map_err(StoreError::from)?; + let tags: Vec = tag_iter + .collect::>>() + .map_err(StoreError::from)?; + out.push(kb_core::DocSummary { tags, ..summary }); + } + Ok(out) + } +} + +// ── Internal row + (de)serialization helpers ───────────────────────────── + +struct DocumentRow { + doc_id: String, + asset_id: String, + workspace_path: String, + title: Option, + lang: Option, + parser_version: String, + doc_version: i64, + schema_version: i64, + metadata_json: String, + provenance_json: String, + // source_type / trust_level are loaded back via metadata_json round-trip, + // so we do not need separate fields here for `get_document`. +} + +fn document_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result { + Ok(DocumentRow { + doc_id: row.get(0)?, + asset_id: row.get(1)?, + workspace_path: row.get(2)?, + title: row.get(3)?, + lang: row.get(4)?, + // 5: source_type, 6: trust_level — read but unused here (metadata_json + // is authoritative). Keeping them in the SELECT makes the column + // ordering match the INSERT and allows future fields without + // shifting indexes. + parser_version: row.get(7)?, + doc_version: row.get(8)?, + schema_version: row.get(9)?, + metadata_json: row.get(10)?, + provenance_json: row.get(11)?, + }) +} + +struct ChunkRow { + chunk_id: String, + doc_id: String, + text: String, + heading_path_json: String, + source_spans_json: String, + token_estimate: i64, + chunker_version: String, + policy_hash: String, + block_ids_json: String, +} + +fn chunk_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result { + Ok(ChunkRow { + chunk_id: row.get(0)?, + doc_id: row.get(1)?, + text: row.get(2)?, + heading_path_json: row.get(3)?, + source_spans_json: row.get(4)?, + token_estimate: row.get(5)?, + chunker_version: row.get(6)?, + policy_hash: row.get(7)?, + block_ids_json: row.get(8)?, + }) +} + +fn doc_summary_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let doc_id: String = row.get(0)?; + let workspace_path: String = row.get(1)?; + let title: Option = row.get(2)?; + let lang: Option = row.get(3)?; + let source_type_raw: String = row.get(4)?; + let trust_level_raw: String = row.get(5)?; + let parser_version: String = row.get(6)?; + let created_at_raw: String = row.get(7)?; + let updated_at_raw: String = row.get(8)?; + let byte_len: i64 = row.get(9)?; + let chunk_count: i64 = row.get(10)?; + let chunker_version: Option = row.get(11)?; + + // De-serialize the lowercase string forms that match + // `#[serde(rename_all = "lowercase")]` on each enum. + let source_type: kb_core::SourceType = + serde_json::from_value(serde_json::Value::String(source_type_raw)) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?; + let trust_level: kb_core::TrustLevel = + serde_json::from_value(serde_json::Value::String(trust_level_raw)) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))?; + let created_at = OffsetDateTime::parse( + &created_at_raw, + &time::format_description::well_known::Rfc3339, + ) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e)))?; + let updated_at = OffsetDateTime::parse( + &updated_at_raw, + &time::format_description::well_known::Rfc3339, + ) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(8, rusqlite::types::Type::Text, Box::new(e)))?; + + Ok(kb_core::DocSummary { + doc_id: kb_core::DocumentId(doc_id), + doc_path: kb_core::WorkspacePath(workspace_path), + title: title.unwrap_or_default(), + lang: kb_core::Lang(lang.unwrap_or_default()), + // tags filled in by caller after a per-doc fetch. + tags: Vec::new(), + trust_level, + source_type, + byte_len: byte_len as u64, + chunk_count: chunk_count as u32, + created_at, + updated_at, + parser_version: kb_core::ParserVersion(parser_version), + // chunker_version may be NULL when the doc has no chunks yet. + // Empty string is the cleanest fallback consistent with the wire + // schema's required `chunker_version` field on DocSummary v1. + chunker_version: kb_core::ChunkerVersion(chunker_version.unwrap_or_default()), + }) +} + +/// Map a `QueryReturnedNoRows` into `Ok(None)` so the trait returns +/// `Option` rather than an error for the common "missing" case. +fn rows_optional(err: rusqlite::Error) -> rusqlite::Result> { + match err { + rusqlite::Error::QueryReturnedNoRows => Ok(None), + e => Err(e), + } +} + +/// UPSERT the documents row and bump `doc_version` on conflict. +fn upsert_document( + tx: &rusqlite::Transaction<'_>, + doc: &kb_core::CanonicalDocument, +) -> Result<()> { + let metadata_json = serde_json::to_string(&doc.metadata) + .context("serialize metadata")?; + let provenance_json = serde_json::to_string(&doc.provenance) + .context("serialize provenance")?; + // String form of the lowercase serde representation. We avoid + // embedding `serde_json::to_string` quotes (`"markdown"` → just + // `markdown` for the column). + let source_type = source_type_label(&doc.metadata.source_type); + let trust_level = trust_level_label(&doc.metadata.trust_level); + let created_at = doc + .metadata + .created_at + .format(&time::format_description::well_known::Rfc3339) + .context("format created_at")?; + let now = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .context("format updated_at")?; + + tx.execute( + "INSERT INTO documents ( + doc_id, asset_id, workspace_path, title, lang, + source_type, trust_level, parser_version, + doc_version, schema_version, metadata_json, + provenance_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(doc_id) DO UPDATE SET + asset_id = excluded.asset_id, + workspace_path = excluded.workspace_path, + title = excluded.title, + lang = excluded.lang, + source_type = excluded.source_type, + trust_level = excluded.trust_level, + parser_version = excluded.parser_version, + -- doc_version: bump on update. excluded.doc_version is the + -- caller's submitted value; we ignore it and add 1 to the + -- existing column so each re-ingest cleanly increments. + doc_version = documents.doc_version + 1, + schema_version = excluded.schema_version, + metadata_json = excluded.metadata_json, + provenance_json = excluded.provenance_json, + updated_at = excluded.updated_at", + params![ + doc.doc_id.0, + doc.source_asset_id.0, + doc.workspace_path.0, + doc.title, + doc.lang.0, + source_type, + trust_level, + doc.parser_version.0, + doc.doc_version as i64, + doc.schema_version as i64, + metadata_json, + provenance_json, + created_at, + now, + ], + ) + .map_err(StoreError::from)?; + Ok(()) +} + +fn source_type_label(s: &kb_core::SourceType) -> &'static str { + match s { + kb_core::SourceType::Markdown => "markdown", + kb_core::SourceType::Note => "note", + kb_core::SourceType::Paper => "paper", + kb_core::SourceType::Reference => "reference", + kb_core::SourceType::Inbox => "inbox", + } +} + +fn trust_level_label(s: &kb_core::TrustLevel) -> &'static str { + match s { + kb_core::TrustLevel::Primary => "primary", + kb_core::TrustLevel::Secondary => "secondary", + kb_core::TrustLevel::Generated => "generated", + } +} + +fn replace_document_tags( + tx: &rusqlite::Transaction<'_>, + doc_id: &kb_core::DocumentId, + tags: &[String], +) -> Result<()> { + tx.execute("DELETE FROM document_tags WHERE doc_id = ?", params![doc_id.0]) + .map_err(StoreError::from)?; + let mut stmt = tx + .prepare( + "INSERT INTO document_tags (doc_id, tag) VALUES (?, ?) + ON CONFLICT(doc_id, tag) DO NOTHING", + ) + .map_err(StoreError::from)?; + for tag in tags { + stmt.execute(params![doc_id.0, tag]) + .map_err(StoreError::from)?; + } + Ok(()) +} + +struct BlockRow { + block_id: String, + doc_id: String, + kind: &'static str, + heading_path_json: String, + ordinal: i64, + source_span_json: String, + /// The full Block JSON — round-trip path for `get_document`. Also + /// future-proofs new variants without schema churn. + payload_json: String, +} + +fn block_to_row( + doc: &kb_core::DocumentId, + block: &kb_core::Block, + stream_ordinal: i64, +) -> Result { + let (block_id, kind, heading_path_json, source_span_json) = match block { + kb_core::Block::Heading(b) => ( + b.common.block_id.0.clone(), + "heading", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::Paragraph(b) | kb_core::Block::Quote(b) => ( + b.common.block_id.0.clone(), + // Discriminate Paragraph vs Quote on the enum tag: payload + // round-trip carries the variant, but the column needs a + // stable label for filtering. + if matches!(block, kb_core::Block::Paragraph(_)) { + "paragraph" + } else { + "quote" + }, + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::List(b) => ( + b.common.block_id.0.clone(), + "list", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::Code(b) => ( + b.common.block_id.0.clone(), + "code", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::Table(b) => ( + b.common.block_id.0.clone(), + "table", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::ImageRef(b) => ( + b.common.block_id.0.clone(), + "imageref", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + kb_core::Block::AudioRef(b) => ( + b.common.block_id.0.clone(), + "audioref", + serde_json::to_string(&b.common.heading_path)?, + serde_json::to_string(&b.common.source_span)?, + ), + }; + let payload_json = serde_json::to_string(block).context("serialize block")?; + Ok(BlockRow { + block_id, + doc_id: doc.0.clone(), + kind, + heading_path_json, + ordinal: stream_ordinal, + source_span_json, + payload_json, + }) +} diff --git a/crates/kb-store-sqlite/src/error.rs b/crates/kb-store-sqlite/src/error.rs new file mode 100644 index 0000000..fd6e42c --- /dev/null +++ b/crates/kb-store-sqlite/src/error.rs @@ -0,0 +1,20 @@ +//! Crate-local error type per design §10. +//! +//! Boundary code (`kb-app`, `kb-cli`) flattens these into `anyhow::Error`, +//! so the trait impls return `anyhow::Result` directly. Internally we +//! still distinguish `Conflict` (e.g. checksum mismatch) from `Sqlx` / +//! `Migration` so callers that downcast can route refusal-style flows. + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum StoreError { + #[error("sqlite error: {0}")] + Sqlx(#[from] rusqlite::Error), + + #[error("migration error: {0}")] + Migration(String), + + #[error("conflict: {0}")] + Conflict(String), +} diff --git a/crates/kb-store-sqlite/src/jobs.rs b/crates/kb-store-sqlite/src/jobs.rs new file mode 100644 index 0000000..89d71dd --- /dev/null +++ b/crates/kb-store-sqlite/src/jobs.rs @@ -0,0 +1,252 @@ +//! `JobRepo` impl per design §7.2. +//! +//! The `jobs` table is the §5.7 schema. JobIds are minted via blake3 over +//! `(now, kind, payload)` so two `create` calls in the same millisecond +//! still distinguish. + +use anyhow::{Context, Result}; +use rusqlite::params; +use serde_json::Value; +use time::OffsetDateTime; + +use crate::error::StoreError; +use crate::store::SqliteStore; + +impl kb_core::JobRepo for SqliteStore { + fn create( + &self, + kind: kb_core::JobKind, + payload: Value, + ) -> Result { + let now_dt = OffsetDateTime::now_utc(); + let now = now_dt + .format(&time::format_description::well_known::Rfc3339) + .context("format job created_at")?; + // JobId recipe: stable hex over (kind, payload_canonical, ns). + // The nanosecond timestamp is included so two `create` calls with + // identical `(kind, payload)` still get distinct IDs. + let job_id = mint_job_id(&kind, &payload, now_dt); + let kind_label = job_kind_label(&kind); + let payload_json = serde_json::to_string(&payload) + .context("serialize job payload")?; + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + conn.execute( + "INSERT INTO jobs ( + job_id, kind, status, payload_json, progress_json, + error_json, created_at, updated_at, finished_at + ) VALUES (?, ?, 'pending', ?, NULL, NULL, ?, ?, NULL)", + params![job_id.0, kind_label, payload_json, now, now], + ) + .map_err(StoreError::from)?; + Ok(job_id) + } + + fn update_progress( + &self, + id: &kb_core::JobId, + progress: Value, + ) -> Result<()> { + let progress_json = serde_json::to_string(&progress) + .context("serialize job progress")?; + let now = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .context("format job updated_at")?; + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + // status='pending' → 'running' on first progress update; later + // progress calls keep status='running' until finish(). + conn.execute( + "UPDATE jobs SET + progress_json = ?, + status = CASE status WHEN 'pending' THEN 'running' ELSE status END, + updated_at = ? + WHERE job_id = ?", + params![progress_json, now, id.0], + ) + .map_err(StoreError::from)?; + Ok(()) + } + + fn finish( + &self, + id: &kb_core::JobId, + status: kb_core::JobStatus, + error: Option<&str>, + ) -> Result<()> { + let now = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .context("format job finished_at")?; + let status_label = job_status_label(&status); + let error_json = error + .map(|e| serde_json::to_string(&serde_json::json!({ "message": e }))) + .transpose() + .context("serialize job error")?; + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + conn.execute( + "UPDATE jobs SET + status = ?, + error_json = ?, + updated_at = ?, + finished_at = ? + WHERE job_id = ?", + params![status_label, error_json, now, now, id.0], + ) + .map_err(StoreError::from)?; + Ok(()) + } + + fn list( + &self, + filter: &kb_core::JobFilter, + ) -> Result> { + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut sql = String::from( + "SELECT job_id, kind, status, payload_json, progress_json, + error_json, created_at, updated_at, finished_at + FROM jobs WHERE 1=1", + ); + let mut params_dyn: Vec> = Vec::new(); + if let Some(status) = &filter.status { + sql.push_str(" AND status = ?"); + params_dyn.push(Box::new(job_status_label(status).to_string())); + } + if let Some(kind) = &filter.kind { + sql.push_str(" AND kind = ?"); + params_dyn.push(Box::new(job_kind_label(kind).to_string())); + } + sql.push_str(" ORDER BY created_at ASC"); + + let mut stmt = conn.prepare(&sql).map_err(StoreError::from)?; + let rows = stmt + .query_map( + rusqlite::params_from_iter(params_dyn.iter().map(|b| b.as_ref())), + job_row_from_sql, + ) + .map_err(StoreError::from)?; + let mut out = Vec::new(); + for r in rows { + out.push(r.map_err(StoreError::from)?); + } + Ok(out) + } +} + +/// Mint a JobId over (kind, canonical(payload), nanos). The 32-hex +/// invariant on `kb_core::JobId` is honored by taking the first 32 chars +/// of the blake3 hex. +fn mint_job_id( + kind: &kb_core::JobKind, + payload: &Value, + at: OffsetDateTime, +) -> kb_core::JobId { + // Plain serde_json::to_vec is enough — JobIds are not part of the + // §4.2 ID family and don't need canonical-JSON parity with other IDs. + // The nanosecond suffix is what guarantees uniqueness, not stable + // hashing. + let mut hasher = blake3::Hasher::new(); + hasher.update(job_kind_label(kind).as_bytes()); + if let Ok(bytes) = serde_json::to_vec(payload) { + hasher.update(&bytes); + } + hasher.update(&at.unix_timestamp_nanos().to_be_bytes()); + let hex = hasher.finalize().to_hex().to_string(); + kb_core::JobId(hex[..32].to_string()) +} + +fn job_kind_label(k: &kb_core::JobKind) -> &'static str { + match k { + kb_core::JobKind::Ingest => "ingest", + kb_core::JobKind::Chunk => "chunk", + kb_core::JobKind::Embed => "embed", + kb_core::JobKind::Ocr => "ocr", + kb_core::JobKind::Transcribe => "transcribe", + kb_core::JobKind::Reindex => "reindex", + kb_core::JobKind::Doctor => "doctor", + } +} + +fn job_status_label(s: &kb_core::JobStatus) -> &'static str { + match s { + kb_core::JobStatus::Pending => "pending", + kb_core::JobStatus::Running => "running", + kb_core::JobStatus::Succeeded => "succeeded", + kb_core::JobStatus::Failed => "failed", + kb_core::JobStatus::Canceled => "canceled", + } +} + +fn job_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let job_id: String = row.get(0)?; + let kind_raw: String = row.get(1)?; + let status_raw: String = row.get(2)?; + let payload_json: String = row.get(3)?; + let progress_json: Option = row.get(4)?; + let error_json: Option = row.get(5)?; + let created_at_raw: String = row.get(6)?; + let updated_at_raw: String = row.get(7)?; + let finished_at_raw: Option = row.get(8)?; + + let kind: kb_core::JobKind = + serde_json::from_value(serde_json::Value::String(kind_raw)) + .map_err(conv_err(1))?; + let status: kb_core::JobStatus = + serde_json::from_value(serde_json::Value::String(status_raw)) + .map_err(conv_err(2))?; + let payload: Value = serde_json::from_str(&payload_json).map_err(conv_err(3))?; + let progress: Option = match progress_json { + Some(s) => Some(serde_json::from_str(&s).map_err(conv_err(4))?), + None => None, + }; + // Surface the stored error message back as a plain string per the + // JobRow schema (§7.2). We stored `{"message": "..."}` for forward + // compatibility — pull `message` back out, or fall back to the raw + // JSON if the shape ever drifts. + let error: Option = match error_json { + Some(s) => match serde_json::from_str::(&s) { + Ok(v) => v + .get("message") + .and_then(Value::as_str) + .map(str::to_owned) + .or(Some(s)), + Err(_) => Some(s), + }, + None => None, + }; + + let created_at = OffsetDateTime::parse( + &created_at_raw, + &time::format_description::well_known::Rfc3339, + ) + .map_err(conv_err(6))?; + let updated_at = OffsetDateTime::parse( + &updated_at_raw, + &time::format_description::well_known::Rfc3339, + ) + .map_err(conv_err(7))?; + let finished_at = match finished_at_raw { + Some(s) => Some( + OffsetDateTime::parse(&s, &time::format_description::well_known::Rfc3339) + .map_err(conv_err(8))?, + ), + None => None, + }; + + Ok(kb_core::JobRow { + job_id: kb_core::JobId(job_id), + kind, + status, + payload, + progress, + error, + created_at, + updated_at, + finished_at, + }) +} + +fn conv_err( + col: usize, +) -> impl FnOnce(E) -> rusqlite::Error { + move |e| { + rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e)) + } +} diff --git a/crates/kb-store-sqlite/src/lib.rs b/crates/kb-store-sqlite/src/lib.rs new file mode 100644 index 0000000..1b35ecb --- /dev/null +++ b/crates/kb-store-sqlite/src/lib.rs @@ -0,0 +1,23 @@ +//! `kb-store-sqlite` — SQLite-backed implementations of +//! [`kb_core::DocumentStore`] and [`kb_core::JobRepo`] (§7.2), plus the +//! asset writer that copies (or references) raw bytes per design §5.2. +//! +//! Schema is owned by `migrations/V001__init.sql` (workspace root), which +//! ships the full §5 DDL minus the FTS5 virtual table + triggers (those +//! land in P2-1's `V002`). +//! +//! Allowed deps per task spec: `kb-core`, `kb-config`, `rusqlite`, +//! `refinery`, `serde_json`, `time`, `blake3`, `tracing`, `anyhow`, +//! `thiserror`. NOT allowed: `kb-parse-*`, `kb-normalize`, `kb-chunk`, +//! `kb-store-vector`, `kb-source-fs`, etc. (`kb-parse-md`, `kb-normalize`, +//! `kb-chunk` may appear as **dev-deps** — see `Cargo.toml` — to drive +//! the contract round-trip test off a real Markdown fixture.) + +mod error; +mod schema; +mod store; +mod documents; +mod jobs; + +pub use error::StoreError; +pub use store::SqliteStore; diff --git a/crates/kb-store-sqlite/src/schema.rs b/crates/kb-store-sqlite/src/schema.rs new file mode 100644 index 0000000..68ccb3b --- /dev/null +++ b/crates/kb-store-sqlite/src/schema.rs @@ -0,0 +1,14 @@ +//! Refinery migration bundle. The migrations live at the workspace +//! `migrations/` directory; refinery's `embed_migrations!` macro inlines +//! them at compile time so the binary needs no runtime SQL files. + +// `embed_migrations!` looks under the path relative to the package root +// (the crate's `Cargo.toml`). The workspace migrations dir is two levels +// up: `crates/kb-store-sqlite/Cargo.toml` → `../../migrations`. +refinery::embed_migrations!("../../migrations"); + +/// Re-export the runner under a stable name. Calling +/// `runner().run(&mut conn)?` applies all pending migrations. +pub fn runner() -> refinery::Runner { + migrations::runner() +} diff --git a/crates/kb-store-sqlite/src/store.rs b/crates/kb-store-sqlite/src/store.rs new file mode 100644 index 0000000..72a605e --- /dev/null +++ b/crates/kb-store-sqlite/src/store.rs @@ -0,0 +1,297 @@ +//! `SqliteStore` — open + run_migrations + asset writer. +//! +//! The store wraps a single `rusqlite::Connection` behind a +//! `std::sync::Mutex` so the public trait impls (which take `&self`) can +//! still issue mutating SQL. Concurrency is intentionally coarse for P1; +//! later phases can swap to a connection pool if measurement shows the +//! mutex on the hot path. + +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use anyhow::{Context, Result}; +use rusqlite::Connection; + +use crate::error::StoreError; +use crate::schema; + +/// Default file name under `config.storage.data_dir`. Kept private — the +/// path layout is a §6.3 design decision, not part of the store's public +/// surface. +const SQLITE_FILE: &str = "kb.sqlite"; + +/// Subdirectory under `data_dir` holding shard-prefixed asset bytes +/// (`/`). Mirrors design §6.3. +const ASSETS_SUBDIR: &str = "assets"; + +/// Length of the shard prefix: 2 hex chars → 256 buckets, plenty to keep +/// directory cardinality reasonable on workspaces with thousands of +/// assets without descending into hash-trees. +const ASSET_SHARD_LEN: usize = 2; + +/// Bytes-per-MiB conversion. Used by the asset writer to compare +/// `byte_len` against `storage.copy_threshold_mb`. +const BYTES_PER_MIB: u64 = 1024 * 1024; + +/// SQLite-backed kb store. +/// +/// Construct via [`SqliteStore::open`], then call +/// [`SqliteStore::run_migrations`] to apply the bundled `V001__init.sql` +/// before any read/write call. +pub struct SqliteStore { + /// Resolved absolute path to `data_dir`. Used by the asset writer. + pub(crate) data_dir: PathBuf, + /// Maximum asset size eligible for in-store copy; assets bigger than + /// this are recorded as `reference` and read from their source path. + pub(crate) copy_threshold_bytes: u64, + /// Single mutexed connection — see module docs for rationale. + pub(crate) conn: Mutex, +} + +impl SqliteStore { + /// Open (or create) the SQLite file under `config.storage.data_dir`, + /// apply pragmas (foreign_keys / WAL / synchronous=NORMAL / + /// temp_store=MEMORY), and create parent directories as needed. + /// **Does not run migrations** — call [`Self::run_migrations`] next. + pub fn open(config: &kb_config::Config) -> Result { + let data_dir = expand_data_dir(&config.storage.data_dir); + std::fs::create_dir_all(&data_dir) + .with_context(|| format!("create data_dir {}", data_dir.display()))?; + let db_path = data_dir.join(SQLITE_FILE); + + let conn = Connection::open(&db_path) + .with_context(|| format!("open sqlite at {}", db_path.display()))?; + apply_pragmas(&conn)?; + + tracing::debug!( + target: "kb-store-sqlite", + data_dir = %data_dir.display(), + db = %db_path.display(), + "opened sqlite store" + ); + + Ok(Self { + data_dir, + copy_threshold_bytes: config.storage.copy_threshold_mb * BYTES_PER_MIB, + conn: Mutex::new(conn), + }) + } + + /// Apply all pending migrations bundled at compile time + /// (`migrations/V001__init.sql` and any later additions). + pub fn run_migrations(&self) -> Result<()> { + let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + schema::runner() + .run(&mut *conn) + .map_err(|e| StoreError::Migration(e.to_string()))?; + tracing::debug!(target: "kb-store-sqlite", "migrations applied"); + Ok(()) + } + + /// Persist a `RawAsset` *with its raw bytes*: row goes into `assets`, + /// bytes go to `data_dir/assets//` if `byte_len ≤ + /// copy_threshold_mb`, otherwise the row records the source URI's + /// path and no copy is performed. + /// + /// In either branch, `blake3(bytes)` is recomputed and compared to + /// `asset.checksum.0`. A mismatch returns + /// `StoreError::Conflict` wrapped in `anyhow::Error`. + pub fn put_asset_with_bytes( + &self, + asset: &kb_core::RawAsset, + bytes: &[u8], + ) -> Result<()> { + // 1. Verify the caller's checksum matches what's actually on the + // wire. A drift here means the bytes the parser saw and the bytes + // we're about to durably store disagree — refuse persistence. + let computed = blake3::hash(bytes).to_hex().to_string(); + if computed != asset.checksum.0 { + return Err(StoreError::Conflict(format!( + "checksum mismatch: asset {} declares {} but bytes hash to {}", + asset.asset_id.0, asset.checksum.0, computed + )) + .into()); + } + + // 2. Decide copy vs. reference. The threshold compares the + // declared `byte_len` (caller-vouched) rather than `bytes.len()` + // because some sources stream and `byte_len` is authoritative. + let (storage_kind, storage_path) = if asset.byte_len <= self.copy_threshold_bytes { + let dest = self.assets_path_for(&asset.asset_id); + if let Some(parent) = dest.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("create asset shard dir {}", parent.display()) + })?; + } + std::fs::write(&dest, bytes) + .with_context(|| format!("write asset bytes to {}", dest.display()))?; + // Mirror §6.6: files 0o644. + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&dest)?.permissions(); + perms.set_mode(0o644); + std::fs::set_permissions(&dest, perms).with_context(|| { + format!("chmod 0o644 on {}", dest.display()) + })?; + } + ("copied", dest.to_string_lossy().into_owned()) + } else { + // Reference: caller's source path is recorded verbatim. We + // accept either a `File(path)` or `Kb(uri)` SourceUri; the + // latter stores the raw `kb://...` string. + let path = match &asset.source_uri { + kb_core::SourceUri::File(p) => p.to_string_lossy().into_owned(), + kb_core::SourceUri::Kb(u) => u.clone(), + }; + ("reference", path) + }; + + // 3. UPSERT the assets row. A second `put_asset_with_bytes` for + // the same asset_id (e.g. re-ingest) overwrites in place — the + // row is uniquely keyed by asset_id and re-derived from the + // RawAsset every time. + let conn = self.conn.lock().expect("sqlite mutex poisoned"); + upsert_asset_row(&conn, asset, storage_kind, &storage_path)?; + Ok(()) + } + + /// Compute the `data_dir/assets//` path for an asset. + /// `` is the first [`ASSET_SHARD_LEN`] hex chars of `asset_id`. + pub(crate) fn assets_path_for(&self, asset_id: &kb_core::AssetId) -> PathBuf { + let id = &asset_id.0; + // Defensive: kb-core enforces 32 hex chars on AssetId construction + // (`FromStr` validates). If a future code path bypasses that, we + // fall back to the full id as the shard so we never panic on + // slicing. + let shard = if id.len() >= ASSET_SHARD_LEN { + &id[..ASSET_SHARD_LEN] + } else { + id.as_str() + }; + self.data_dir.join(ASSETS_SUBDIR).join(shard).join(id) + } +} + +/// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes` +/// path (which has bytes + computed `storage_kind/path`) and the +/// `DocumentStore::put_asset` path (which only has the `RawAsset` and +/// reads `storage_kind/path` from `asset.stored`). +pub(crate) fn upsert_asset_row( + conn: &Connection, + asset: &kb_core::RawAsset, + storage_kind: &str, + storage_path: &str, +) -> Result<()> { + let source_uri = match &asset.source_uri { + kb_core::SourceUri::File(p) => format!("file://{}", p.to_string_lossy()), + kb_core::SourceUri::Kb(u) => u.clone(), + }; + let media_type = serde_json::to_string(&asset.media_type) + .context("serialize media_type")?; + let discovered_at = asset + .discovered_at + .format(&time::format_description::well_known::Rfc3339) + .context("format discovered_at")?; + + conn.execute( + "INSERT INTO assets ( + asset_id, source_uri, workspace_path, media_type, byte_len, + checksum, storage_kind, storage_path, discovered_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(asset_id) DO UPDATE SET + source_uri = excluded.source_uri, + workspace_path = excluded.workspace_path, + media_type = excluded.media_type, + byte_len = excluded.byte_len, + checksum = excluded.checksum, + storage_kind = excluded.storage_kind, + storage_path = excluded.storage_path, + discovered_at = excluded.discovered_at", + rusqlite::params![ + asset.asset_id.0, + source_uri, + asset.workspace_path.0, + media_type, + asset.byte_len as i64, + asset.checksum.0, + storage_kind, + storage_path, + discovered_at, + ], + ) + .map_err(StoreError::from)?; + Ok(()) +} + +/// Apply the design §5 / task-spec pragmas. Called once per connection. +/// Note: WAL is persistent (the journal-mode setting is sticky in the DB +/// header) but `foreign_keys`, `synchronous`, and `temp_store` are +/// per-connection — they MUST be re-applied on every open. +fn apply_pragmas(conn: &Connection) -> Result<()> { + conn.pragma_update(None, "foreign_keys", "ON")?; + // `journal_mode = WAL` returns the current mode as a row; use + // `pragma_query_value` semantics via `query_row` to allow that. + conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?; + conn.pragma_update(None, "synchronous", "NORMAL")?; + conn.pragma_update(None, "temp_store", "MEMORY")?; + Ok(()) +} + +/// Expand the placeholders / `~` / env-vars used by `Config::storage.data_dir`. +/// +/// Supported substitutions, in order: +/// - `${XDG_DATA_HOME:-~/.local/share}` (and the bare `${XDG_DATA_HOME}`) +/// - leading `~` → `$HOME` +/// +/// If neither produces an absolute path, the input is returned as-is +/// (relative paths are kept relative to the caller's CWD). +fn expand_data_dir(raw: &str) -> PathBuf { + let mut s = raw.to_string(); + + // ${XDG_DATA_HOME:-~/.local/share}: respect the env override, else + // fall back to the suffix after `:-`. + if let Some(start) = s.find("${XDG_DATA_HOME") { + if let Some(rel_end) = s[start..].find('}') { + let end = start + rel_end + 1; // include trailing '}' + let inner = &s[start + 2..end - 1]; // strip ${ and } + let replacement = match std::env::var("XDG_DATA_HOME") { + Ok(v) if !v.is_empty() => v, + _ => { + // inner is e.g. `XDG_DATA_HOME:-~/.local/share`. + if let Some((_, default)) = inner.split_once(":-") { + default.to_string() + } else { + // No default supplied; mimic Bash and yield "". + String::new() + } + } + }; + s.replace_range(start..end, &replacement); + } + } + + // ~ at the front → $HOME (or `dirs::home_dir`). + if let Some(rest) = s.strip_prefix('~') { + if let Some(home) = std::env::var_os("HOME").map(PathBuf::from).or_else(dirs_home_fallback) + { + return home.join(rest.trim_start_matches('/')); + } + } + + PathBuf::from(s) +} + +/// Tiny shim to avoid pulling in the `dirs` crate as a direct dep — we +/// only fall back when `$HOME` is unset, which is exotic on the platforms +/// we target. Returns `None` so the caller keeps the literal `~`. +fn dirs_home_fallback() -> Option { + None +} + +/// Returns the root of the assets shard tree (`data_dir/assets/`). Used +/// by tests; kept crate-private otherwise. +#[allow(dead_code)] +pub(crate) fn assets_root(data_dir: &Path) -> PathBuf { + data_dir.join(ASSETS_SUBDIR) +} diff --git a/migrations/V001__init.sql b/migrations/V001__init.sql index 2db2d5e..96fe4ee 100644 --- a/migrations/V001__init.sql +++ b/migrations/V001__init.sql @@ -1,7 +1,10 @@ --- V001__init.sql — schema bootstrap. --- Per design §5.1 + §5.9. Only the meta + migrations tables land here; --- data tables (assets, documents, blocks, chunks, fts5, …) ship in later --- phase-specific migrations (P1-6 / P2-1 / P3-3). +-- V001__init.sql — full P1 schema bootstrap. +-- Per design §5.1 (meta), §5.2 (assets), §5.3 (documents/document_tags), +-- §5.4 (blocks), §5.5 (chunks — FTS5 virtual table + triggers DEFERRED to +-- V002 in P2-1), §5.6 (embedding_records), §5.7 (jobs / ingest_runs / +-- answers / eval_runs / eval_query_results). + +-- §5.1 Migrations meta ------------------------------------------------------- CREATE TABLE schema_meta ( key TEXT PRIMARY KEY, @@ -13,3 +16,167 @@ CREATE TABLE migrations ( applied_at TEXT NOT NULL, description TEXT NOT NULL ); + +-- §5.2 Assets ---------------------------------------------------------------- + +CREATE TABLE assets ( + asset_id TEXT PRIMARY KEY, + source_uri TEXT NOT NULL, + workspace_path TEXT NOT NULL, + media_type TEXT NOT NULL, + byte_len INTEGER NOT NULL, + checksum TEXT NOT NULL, + storage_kind TEXT NOT NULL CHECK (storage_kind IN ('copied','reference')), + storage_path TEXT NOT NULL, + discovered_at TEXT NOT NULL +); +CREATE UNIQUE INDEX idx_assets_workspace_path ON assets(workspace_path); +CREATE INDEX idx_assets_media_type ON assets(media_type); + +-- §5.3 Documents ------------------------------------------------------------- + +CREATE TABLE documents ( + doc_id TEXT PRIMARY KEY, + asset_id TEXT NOT NULL REFERENCES assets(asset_id) ON DELETE RESTRICT, + workspace_path TEXT NOT NULL, + title TEXT, + lang TEXT, + source_type TEXT NOT NULL, + trust_level TEXT NOT NULL, + parser_version TEXT NOT NULL, + doc_version INTEGER NOT NULL, + schema_version INTEGER NOT NULL, + metadata_json TEXT NOT NULL, + provenance_json TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); +CREATE UNIQUE INDEX idx_docs_workspace_path ON documents(workspace_path); +CREATE INDEX idx_docs_lang ON documents(lang); +CREATE INDEX idx_docs_source_type ON documents(source_type); + +CREATE TABLE document_tags ( + doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE, + tag TEXT NOT NULL, + PRIMARY KEY (doc_id, tag) +); +CREATE INDEX idx_document_tags_tag ON document_tags(tag); + +-- §5.4 Blocks ---------------------------------------------------------------- + +CREATE TABLE blocks ( + block_id TEXT PRIMARY KEY, + doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE, + kind TEXT NOT NULL, + heading_path_json TEXT NOT NULL, + ordinal INTEGER NOT NULL, + source_span_json TEXT NOT NULL, + payload_json TEXT NOT NULL +); +CREATE INDEX idx_blocks_doc_id ON blocks(doc_id); + +-- §5.5 Chunks (FTS5 virtual table + triggers deferred to V002 / P2-1) ------- + +CREATE TABLE chunks ( + chunk_id TEXT PRIMARY KEY, + doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE, + text TEXT NOT NULL, + heading_path_json TEXT NOT NULL, + section_label TEXT, + source_spans_json TEXT NOT NULL, + token_estimate INTEGER NOT NULL, + chunker_version TEXT NOT NULL, + policy_hash TEXT NOT NULL, + block_ids_json TEXT NOT NULL, + created_at TEXT NOT NULL +); +CREATE INDEX idx_chunks_doc_id ON chunks(doc_id); +CREATE INDEX idx_chunks_chunker_version ON chunks(chunker_version); + +-- §5.6 Embedding records (P3 — table empty in P1, present for forward compat) - + +CREATE TABLE embedding_records ( + embedding_id TEXT PRIMARY KEY, + chunk_id TEXT NOT NULL REFERENCES chunks(chunk_id) ON DELETE CASCADE, + model_id TEXT NOT NULL, + model_version TEXT NOT NULL, + dimensions INTEGER NOT NULL, + lance_table TEXT NOT NULL, + created_at TEXT NOT NULL, + UNIQUE(chunk_id, model_id, model_version, dimensions) +); +CREATE INDEX idx_embed_chunk ON embedding_records(chunk_id); +CREATE INDEX idx_embed_model ON embedding_records(model_id, model_version, dimensions); + +-- §5.7 Jobs / IngestRuns / Answers / EvalRuns ------------------------------- + +CREATE TABLE jobs ( + job_id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN ('pending','running','succeeded','failed','canceled')), + payload_json TEXT NOT NULL, + progress_json TEXT, + error_json TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + finished_at TEXT +); +CREATE INDEX idx_jobs_status ON jobs(status); +CREATE INDEX idx_jobs_kind ON jobs(kind); + +CREATE TABLE ingest_runs ( + run_id TEXT PRIMARY KEY, + scope_json TEXT NOT NULL, + scanned INTEGER NOT NULL, + new_count INTEGER NOT NULL, + updated_count INTEGER NOT NULL, + skipped_count INTEGER NOT NULL, + error_count INTEGER NOT NULL, + duration_ms INTEGER NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT NOT NULL, + items_json TEXT +); + +CREATE TABLE answers ( + trace_id TEXT PRIMARY KEY, + query TEXT NOT NULL, + answer TEXT NOT NULL, + grounded INTEGER NOT NULL, + refusal_reason TEXT, + model_id TEXT NOT NULL, + model_provider TEXT NOT NULL, + embedding_model_id TEXT, + embedding_dimensions INTEGER, + prompt_template_version TEXT NOT NULL, + retrieval_mode TEXT NOT NULL, + retrieval_k INTEGER NOT NULL, + score_gate REAL NOT NULL, + top_score REAL NOT NULL, + chunks_returned INTEGER NOT NULL, + chunks_used INTEGER NOT NULL, + citations_json TEXT NOT NULL, + packed_chunks_json TEXT, + prompt_tokens INTEGER, + completion_tokens INTEGER, + latency_ms INTEGER, + created_at TEXT NOT NULL +); +CREATE INDEX idx_answers_created_at ON answers(created_at); +CREATE INDEX idx_answers_grounded ON answers(grounded); + +CREATE TABLE eval_runs ( + run_id TEXT PRIMARY KEY, + suite TEXT NOT NULL, + config_snapshot_json TEXT NOT NULL, + aggregate_json TEXT NOT NULL, + commit_hash TEXT, + created_at TEXT NOT NULL +); + +CREATE TABLE eval_query_results ( + run_id TEXT NOT NULL REFERENCES eval_runs(run_id) ON DELETE CASCADE, + query_id TEXT NOT NULL, + result_json TEXT NOT NULL, + PRIMARY KEY (run_id, query_id) +); From 111f40ddf0f147467c57b817d210c21a2afafd78 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:13:03 +0000 Subject: [PATCH 5/9] p1-6: kb-store-sqlite test suite (8 categories) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All 8 test categories from the task plan, plus a JobRepo subset: migration — tests/migration.rs: fresh DB after run_migrations exposes every required §5 table + index. unit (copy) — tests/asset_writer.rs: copy mode writes file with mode 0o644 + correct bytes. unit (ref) — tests/asset_writer.rs: reference mode does not write file; row records source path. unit (cs) — tests/asset_writer.rs: tampered checksum returns a Conflict-flavoured anyhow error. unit (idem) — tests/idempotency.rs: same put_document twice → 1 row, doc_version 1→2; tags re-derived. unit (rb) — tests/idempotency.rs: put_blocks with FK violation rolls back; pre-existing rows unchanged. contract — tests/contract_roundtrip.rs: drives kb-parse-md + kb-normalize + kb-chunk on fixtures/markdown/code-and-table.md, persists, then reloads via DocumentStore::get_document / get_chunk and asserts byte-equal round-trip. snapshot — tests/ingest_report_snapshot.rs + snapshots/ingest_report.snapshot.json: pin the wire JSON form of kb_core::IngestReport for an inline fixture run. jobs — tests/jobs.rs: create → progress → finish flow; error message round-trip; list filters on status/kind. Drops the unused `serde` direct dep from Cargo.toml; serde_json brings its own. Dev-deps confirmed via `cargo tree -p kb-store-sqlite --depth 1` to live only in the dev tree. --- Cargo.lock | 1 - crates/kb-store-sqlite/Cargo.toml | 1 - .../snapshots/ingest_report.snapshot.json | 47 ++++ crates/kb-store-sqlite/tests/asset_writer.rs | 121 +++++++++ crates/kb-store-sqlite/tests/common/mod.rs | 50 ++++ .../tests/contract_roundtrip.rs | 135 ++++++++++ crates/kb-store-sqlite/tests/idempotency.rs | 241 ++++++++++++++++++ .../tests/ingest_report_snapshot.rs | 99 +++++++ crates/kb-store-sqlite/tests/jobs.rs | 90 +++++++ crates/kb-store-sqlite/tests/migration.rs | 83 ++++++ 10 files changed, 866 insertions(+), 2 deletions(-) create mode 100644 crates/kb-store-sqlite/snapshots/ingest_report.snapshot.json create mode 100644 crates/kb-store-sqlite/tests/asset_writer.rs create mode 100644 crates/kb-store-sqlite/tests/common/mod.rs create mode 100644 crates/kb-store-sqlite/tests/contract_roundtrip.rs create mode 100644 crates/kb-store-sqlite/tests/idempotency.rs create mode 100644 crates/kb-store-sqlite/tests/ingest_report_snapshot.rs create mode 100644 crates/kb-store-sqlite/tests/jobs.rs create mode 100644 crates/kb-store-sqlite/tests/migration.rs diff --git a/Cargo.lock b/Cargo.lock index d01b60a..cecb492 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -831,7 +831,6 @@ dependencies = [ "kb-parse-md", "refinery", "rusqlite", - "serde", "serde_json", "tempfile", "thiserror 2.0.18", diff --git a/crates/kb-store-sqlite/Cargo.toml b/crates/kb-store-sqlite/Cargo.toml index b0862ef..45cf7ea 100644 --- a/crates/kb-store-sqlite/Cargo.toml +++ b/crates/kb-store-sqlite/Cargo.toml @@ -14,7 +14,6 @@ kb-config = { path = "../kb-config" } # Explicitly NOT `bundled-sqlcipher` per task allowed-deps list. rusqlite = { version = "0.32", features = ["bundled"] } refinery = { version = "0.8", features = ["rusqlite"] } -serde = { workspace = true } serde_json = { workspace = true } time = { workspace = true } blake3 = { workspace = true } diff --git a/crates/kb-store-sqlite/snapshots/ingest_report.snapshot.json b/crates/kb-store-sqlite/snapshots/ingest_report.snapshot.json new file mode 100644 index 0000000..6a3e90a --- /dev/null +++ b/crates/kb-store-sqlite/snapshots/ingest_report.snapshot.json @@ -0,0 +1,47 @@ +{ + "duration_ms": 187, + "errors": 0, + "items": [ + { + "asset_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "block_count": 7, + "byte_len": 1234, + "chunk_count": 3, + "chunker_version": "md-heading-v1", + "doc_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "doc_path": "notes/alpha.md", + "error": null, + "kind": "new", + "parser_version": "pulldown-cmark-0.x", + "warnings": [] + }, + { + "asset_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + "block_count": 12, + "byte_len": 2048, + "chunk_count": 5, + "chunker_version": "md-heading-v1", + "doc_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + "doc_path": "notes/beta.md", + "error": null, + "kind": "updated", + "parser_version": "pulldown-cmark-0.x", + "warnings": [ + "malformed frontmatter" + ] + } + ], + "new": 2, + "scanned": 3, + "scope": { + "exclude": [ + ".git/**" + ], + "include": [ + "**/*.md" + ], + "root": "/home/u/KB" + }, + "skipped": 0, + "updated": 1 +} diff --git a/crates/kb-store-sqlite/tests/asset_writer.rs b/crates/kb-store-sqlite/tests/asset_writer.rs new file mode 100644 index 0000000..7aa5bfa --- /dev/null +++ b/crates/kb-store-sqlite/tests/asset_writer.rs @@ -0,0 +1,121 @@ +//! Asset writer tests: copy mode (file written 0o644), reference mode +//! (no copy, row records source), and checksum mismatch (Conflict). + +use std::path::PathBuf; + +use kb_core::{AssetId, AssetStorage, Checksum, MediaType, RawAsset, SourceUri, WorkspacePath}; +use kb_store_sqlite::SqliteStore; +use time::OffsetDateTime; + +mod common; + +fn fixed_asset(_bytes: &[u8], byte_len: u64, declared_checksum: &str) -> RawAsset { + RawAsset { + // 32-hex AssetId per kb-core newtype invariant. + asset_id: AssetId("a".repeat(32)), + source_uri: SourceUri::File(PathBuf::from("/some/source.md")), + workspace_path: WorkspacePath::new("notes/foo.md".into()).unwrap(), + media_type: MediaType::Markdown, + byte_len, + checksum: Checksum(declared_checksum.into()), + discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(), + stored: AssetStorage::Reference { + path: PathBuf::from("/some/source.md"), + sha: Checksum("0".repeat(64)), + }, + } +} + +fn b3_full_hex(bytes: &[u8]) -> String { + blake3::hash(bytes).to_hex().to_string() +} + +#[test] +fn copy_mode_writes_file_with_0o644_and_correct_bytes() { + let env = common::TestEnv::with_threshold(100); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + let bytes = b"hello, sqlite"; + let cs = b3_full_hex(bytes); + let asset = fixed_asset(bytes, bytes.len() as u64, &cs); + + store.put_asset_with_bytes(&asset, bytes).expect("write"); + + // Path: data_dir/assets/aa/aaaaaa…aa + let aa = &asset.asset_id.0[..2]; + let dest = env.data_dir().join("assets").join(aa).join(&asset.asset_id.0); + assert!(dest.exists(), "asset file not written at {}", dest.display()); + let on_disk = std::fs::read(&dest).unwrap(); + assert_eq!(on_disk, bytes); + + // Mode 0o644 on Unix. + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mode = std::fs::metadata(&dest).unwrap().permissions().mode() & 0o777; + assert_eq!(mode, 0o644, "expected 0o644, got 0o{mode:o}"); + } + + // Row recorded copied. + let storage_kind: String = env.with_conn(|c| { + c.query_row( + "SELECT storage_kind FROM assets WHERE asset_id = ?", + [&asset.asset_id.0], + |r| r.get(0), + ) + }); + assert_eq!(storage_kind, "copied"); +} + +#[test] +fn reference_mode_does_not_write_file_but_records_path() { + // copy_threshold_mb=0 → every byte lands on the reference branch. + let env = common::TestEnv::with_threshold(0); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + let bytes = b"big-pretend-bytes"; + let cs = b3_full_hex(bytes); + // byte_len declared > 0 so the threshold check picks reference. With + // copy_threshold_bytes=0 even byte_len=1 trips the else branch. + let mut asset = fixed_asset(bytes, 1, &cs); + asset.source_uri = SourceUri::File(PathBuf::from("/path/to/original.md")); + + store.put_asset_with_bytes(&asset, bytes).expect("ref write"); + + let aa = &asset.asset_id.0[..2]; + let dest = env.data_dir().join("assets").join(aa).join(&asset.asset_id.0); + assert!(!dest.exists(), "reference mode must not copy bytes"); + + let (storage_kind, storage_path): (String, String) = env.with_conn(|c| { + c.query_row( + "SELECT storage_kind, storage_path FROM assets WHERE asset_id = ?", + [&asset.asset_id.0], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + }); + assert_eq!(storage_kind, "reference"); + assert_eq!(storage_path, "/path/to/original.md"); +} + +#[test] +fn checksum_mismatch_returns_conflict() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + let bytes = b"the real bytes"; + // Tampered checksum: hash a different payload. + let wrong_cs = b3_full_hex(b"different bytes"); + let asset = fixed_asset(bytes, bytes.len() as u64, &wrong_cs); + + let err = store + .put_asset_with_bytes(&asset, bytes) + .expect_err("must reject checksum mismatch"); + let msg = format!("{err:#}"); + assert!( + msg.contains("checksum mismatch") || msg.contains("conflict"), + "expected Conflict-flavoured error, got: {msg}" + ); +} diff --git a/crates/kb-store-sqlite/tests/common/mod.rs b/crates/kb-store-sqlite/tests/common/mod.rs new file mode 100644 index 0000000..742ce0b --- /dev/null +++ b/crates/kb-store-sqlite/tests/common/mod.rs @@ -0,0 +1,50 @@ +//! Shared test scaffolding: temp data_dir + freshly opened SqliteStore. + +#![allow(dead_code)] + +use std::path::PathBuf; + +use kb_config::Config; +use rusqlite::Connection; +use tempfile::TempDir; + +pub struct TestEnv { + pub temp: TempDir, + pub config: Config, +} + +impl TestEnv { + pub fn new() -> Self { + Self::with_threshold(100) + } + + /// Override the copy-threshold (useful for the reference-mode test + /// where we want a small file to land on the reference branch). + pub fn with_threshold(copy_threshold_mb: u64) -> Self { + let temp = tempfile::tempdir().expect("tempdir"); + let mut config = Config::defaults(); + config.storage.data_dir = temp.path().to_string_lossy().into_owned(); + config.storage.copy_threshold_mb = copy_threshold_mb; + Self { temp, config } + } + + pub fn config(&self) -> Config { + self.config.clone() + } + + pub fn data_dir(&self) -> PathBuf { + self.temp.path().to_path_buf() + } + + pub fn db_path(&self) -> PathBuf { + self.temp.path().join("kb.sqlite") + } + + /// Open a side-channel rusqlite connection for direct SQL inspection. + /// The store-owned connection is held inside a Mutex; opening a fresh + /// one is the simplest way for tests to peek at row counts / pragmas. + pub fn with_conn(&self, f: impl FnOnce(&Connection) -> rusqlite::Result) -> T { + let conn = Connection::open(self.db_path()).expect("open side conn"); + f(&conn).expect("with_conn closure") + } +} diff --git a/crates/kb-store-sqlite/tests/contract_roundtrip.rs b/crates/kb-store-sqlite/tests/contract_roundtrip.rs new file mode 100644 index 0000000..2ea6080 --- /dev/null +++ b/crates/kb-store-sqlite/tests/contract_roundtrip.rs @@ -0,0 +1,135 @@ +//! Contract: drive the full pipeline (`kb-parse-md` → `kb-normalize` → +//! `kb-chunk`) on a real fixture and prove `DocumentStore` round-trips +//! the resulting `CanonicalDocument` + `Vec` losslessly. +//! +//! `kb-parse-md`, `kb-normalize`, `kb-chunk` are dev-deps only — see the +//! crate's `Cargo.toml`. The store crate's production tree (visible via +//! `cargo tree -p kb-store-sqlite --depth 1`) does NOT include them. + +use std::path::PathBuf; + +use kb_chunk::MdHeadingV1Chunker; +use kb_core::{ + AssetId, AssetStorage, Checksum, ChunkPolicy, ChunkerVersion, Chunker, DocumentStore, + MediaType, ParserVersion, RawAsset, SourceUri, WorkspacePath, +}; +use kb_normalize::build_canonical_document; +use kb_parse_md::{BodyHints, parse_blocks, parse_frontmatter}; +use kb_store_sqlite::SqliteStore; +use time::OffsetDateTime; + +mod common; + +fn fixtures_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("..") + .join("..") + .join("fixtures") + .join("markdown") +} + +#[test] +fn document_and_chunks_round_trip_through_sqlite() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + // ── Build inputs from the fixture ─────────────────────────────── + let dir = fixtures_dir(); + let bytes = std::fs::read(dir.join("code-and-table.md")).expect("read fixture"); + let cs = blake3::hash(&bytes).to_hex().to_string(); + + let asset = RawAsset { + asset_id: AssetId("a".repeat(32)), + source_uri: SourceUri::File(dir.join("code-and-table.md")), + workspace_path: WorkspacePath::new("notes/code-and-table.md".into()).unwrap(), + media_type: MediaType::Markdown, + byte_len: bytes.len() as u64, + checksum: Checksum(cs.clone()), + discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(), + stored: AssetStorage::Reference { + path: dir.join("code-and-table.md"), + sha: Checksum(cs.clone()), + }, + }; + + let hints = BodyHints { + first_h1: Some("Code And Table".into()), + fs_ctime: asset.discovered_at, + fs_mtime: asset.discovered_at, + fallback_lang: Some("en".into()), + }; + let (mut metadata, _fm_span, _fm_warns) = + parse_frontmatter(&bytes, &hints).unwrap(); + let (parsed_blocks, parse_warns) = parse_blocks(&bytes, 1).unwrap(); + + metadata.aliases.sort(); + metadata.tags.sort(); + let parser_version = ParserVersion("kb-store-sqlite-roundtrip".into()); + let doc = build_canonical_document( + &asset, + metadata, + parsed_blocks, + &parser_version, + parse_warns, + ) + .unwrap(); + + let policy = ChunkPolicy { + target_tokens: 200, + overlap_tokens: 40, + respect_markdown_headings: true, + chunker_version: ChunkerVersion("md-heading-v1".into()), + }; + let chunks = MdHeadingV1Chunker.chunk(&doc, &policy).unwrap(); + assert!(!chunks.is_empty(), "fixture must produce ≥1 chunk"); + + // ── Persist via the store ──────────────────────────────────────── + store + .put_asset_with_bytes(&asset, &bytes) + .expect("put_asset_with_bytes"); + store.put_document(&doc).expect("put_document"); + store + .put_blocks(&doc.doc_id, &doc.blocks) + .expect("put_blocks"); + store + .put_chunks(&doc.doc_id, &chunks) + .expect("put_chunks"); + + // ── Read back ──────────────────────────────────────────────────── + let loaded = store + .get_document(&doc.doc_id) + .expect("get_document err") + .expect("get_document Some"); + + // Document-level fields must match. doc_version is bumped by the + // UPSERT path even on first put (the trigger runs on conflict + // only; first-insert lands the caller-supplied 1). updated_at is + // re-stamped server-side and is NOT round-tripped (the loaded + // CanonicalDocument carries `metadata.updated_at` from the + // metadata_json blob, which is the input value). So we compare + // the field-by-field copies that ARE deterministic: + assert_eq!(loaded.doc_id, doc.doc_id); + assert_eq!(loaded.workspace_path, doc.workspace_path); + assert_eq!(loaded.title, doc.title); + assert_eq!(loaded.lang, doc.lang); + assert_eq!(loaded.parser_version, doc.parser_version); + assert_eq!(loaded.schema_version, doc.schema_version); + assert_eq!(loaded.metadata, doc.metadata, "metadata round-trip"); + assert_eq!(loaded.provenance, doc.provenance, "provenance round-trip"); + assert_eq!( + loaded.blocks.len(), + doc.blocks.len(), + "block count round-trip" + ); + assert_eq!(loaded.blocks, doc.blocks, "block stream round-trip"); + + // Chunks: get_chunk for each id. + for c in &chunks { + let back = store + .get_chunk(&c.chunk_id) + .expect("get_chunk err") + .expect("get_chunk Some"); + assert_eq!(&back, c, "chunk round-trip mismatch for {}", c.chunk_id.0); + } +} diff --git a/crates/kb-store-sqlite/tests/idempotency.rs b/crates/kb-store-sqlite/tests/idempotency.rs new file mode 100644 index 0000000..56f45bd --- /dev/null +++ b/crates/kb-store-sqlite/tests/idempotency.rs @@ -0,0 +1,241 @@ +//! Idempotency: re-ingesting the same `(workspace_path, asset_id, +//! parser_version)` keeps documents at one row but bumps `doc_version` +//! and replaces blocks/chunks rather than duplicating them. + +use std::path::PathBuf; + +use kb_core::{ + AssetId, AssetStorage, Block, CanonicalDocument, Checksum, Chunk, ChunkerVersion, + CommonBlock, DocumentId, DocumentStore, HeadingBlock, Lang, MediaType, Metadata, + ParserVersion, Provenance, RawAsset, SourceSpan, SourceType, SourceUri, TextBlock, + TrustLevel, WorkspacePath, +}; +use kb_store_sqlite::SqliteStore; +use time::OffsetDateTime; + +mod common; + +fn make_asset() -> RawAsset { + let bytes = b"dummy"; + RawAsset { + asset_id: AssetId("a".repeat(32)), + source_uri: SourceUri::File(PathBuf::from("/tmp/foo.md")), + workspace_path: WorkspacePath::new("notes/foo.md".into()).unwrap(), + media_type: MediaType::Markdown, + byte_len: bytes.len() as u64, + checksum: Checksum(blake3::hash(bytes).to_hex().to_string()), + discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(), + stored: AssetStorage::Reference { + path: PathBuf::from("/tmp/foo.md"), + sha: Checksum(blake3::hash(bytes).to_hex().to_string()), + }, + } +} + +fn make_metadata() -> Metadata { + Metadata { + aliases: vec![], + tags: vec!["one".into(), "two".into()], + created_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(), + updated_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(), + source_type: SourceType::Markdown, + trust_level: TrustLevel::Primary, + user_id_alias: None, + user: Default::default(), + } +} + +fn make_doc() -> CanonicalDocument { + let doc_id = DocumentId("d".repeat(32)); + let span = SourceSpan::Line { start: 1, end: 1 }; + let block = Block::Heading(HeadingBlock { + common: CommonBlock { + block_id: kb_core::BlockId("b".repeat(32)), + heading_path: vec![], + source_span: span.clone(), + }, + level: 1, + text: "Title".into(), + }); + let para = Block::Paragraph(TextBlock { + common: CommonBlock { + block_id: kb_core::BlockId("c".repeat(32)), + heading_path: vec!["Title".into()], + source_span: span, + }, + text: "body".into(), + inlines: vec![], + }); + CanonicalDocument { + doc_id, + source_asset_id: AssetId("a".repeat(32)), + workspace_path: WorkspacePath::new("notes/foo.md".into()).unwrap(), + title: "Title".into(), + lang: Lang("en".into()), + blocks: vec![block, para], + metadata: make_metadata(), + provenance: Provenance { events: vec![] }, + parser_version: ParserVersion("test-parser".into()), + schema_version: 1, + doc_version: 1, + } +} + +fn make_chunks(doc_id: &DocumentId) -> Vec { + vec![Chunk { + chunk_id: kb_core::ChunkId("e".repeat(32)), + doc_id: doc_id.clone(), + block_ids: vec![kb_core::BlockId("b".repeat(32))], + text: "Title\n\nbody".into(), + heading_path: vec!["Title".into()], + source_spans: vec![SourceSpan::Line { start: 1, end: 1 }], + token_estimate: 5, + chunker_version: ChunkerVersion("md-heading-v1".into()), + policy_hash: "deadbeefdeadbeef".into(), + }] +} + +#[test] +fn put_document_idempotent_bumps_doc_version() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + let asset = make_asset(); + store.put_asset(&asset).expect("put_asset 1"); + + let doc = make_doc(); + store.put_document(&doc).expect("put_document 1"); + + // First ingest → exactly one row, doc_version=1. + let (count, dv1): (i64, i64) = env.with_conn(|c| { + c.query_row( + "SELECT COUNT(*), MAX(doc_version) FROM documents WHERE doc_id = ?", + [&doc.doc_id.0], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + }); + assert_eq!(count, 1); + assert_eq!(dv1, 1); + + // Re-ingest the same doc → still one row, doc_version=2. + store.put_document(&doc).expect("put_document 2"); + let (count2, dv2): (i64, i64) = env.with_conn(|c| { + c.query_row( + "SELECT COUNT(*), MAX(doc_version) FROM documents WHERE doc_id = ?", + [&doc.doc_id.0], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + }); + assert_eq!(count2, 1, "second put must not duplicate the row"); + assert_eq!(dv2, 2, "doc_version must increment on re-ingest"); + + // Tags were re-derived: still exactly the two original tags. + let tags: Vec = env.with_conn(|c| { + let mut stmt = + c.prepare("SELECT tag FROM document_tags WHERE doc_id = ? ORDER BY tag")?; + let rows = stmt.query_map([&doc.doc_id.0], |r| r.get::<_, String>(0))?; + rows.collect::>>() + }); + assert_eq!(tags, vec!["one".to_string(), "two".to_string()]); +} + +#[test] +fn put_blocks_and_put_chunks_replace_not_duplicate() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + let asset = make_asset(); + store.put_asset(&asset).unwrap(); + let doc = make_doc(); + store.put_document(&doc).unwrap(); + + store.put_blocks(&doc.doc_id, &doc.blocks).unwrap(); + store.put_chunks(&doc.doc_id, &make_chunks(&doc.doc_id)).unwrap(); + + let (b1, ch1): (i64, i64) = env.with_conn(|c| { + Ok(( + c.query_row( + "SELECT COUNT(*) FROM blocks WHERE doc_id = ?", + [&doc.doc_id.0], + |r| r.get(0), + )?, + c.query_row( + "SELECT COUNT(*) FROM chunks WHERE doc_id = ?", + [&doc.doc_id.0], + |r| r.get(0), + )?, + )) + }); + assert_eq!(b1, 2); + assert_eq!(ch1, 1); + + // Re-put same data → counts unchanged (DELETE-then-INSERT). + store.put_blocks(&doc.doc_id, &doc.blocks).unwrap(); + store.put_chunks(&doc.doc_id, &make_chunks(&doc.doc_id)).unwrap(); + let (b2, ch2): (i64, i64) = env.with_conn(|c| { + Ok(( + c.query_row( + "SELECT COUNT(*) FROM blocks WHERE doc_id = ?", + [&doc.doc_id.0], + |r| r.get(0), + )?, + c.query_row( + "SELECT COUNT(*) FROM chunks WHERE doc_id = ?", + [&doc.doc_id.0], + |r| r.get(0), + )?, + )) + }); + assert_eq!(b2, 2, "blocks must not double on re-put"); + assert_eq!(ch2, 1, "chunks must not double on re-put"); +} + +/// `put_blocks` runs in a transaction. If we feed it a block whose +/// `doc_id` references a document that does not exist, the FK +/// constraint (`blocks.doc_id REFERENCES documents(doc_id)`) trips, +/// the transaction rolls back, and the table count is unchanged. +#[test] +fn put_blocks_transactional_rollback_on_fk_violation() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + let asset = make_asset(); + store.put_asset(&asset).unwrap(); + let doc = make_doc(); + store.put_document(&doc).unwrap(); + // Establish a baseline row in `blocks`. + store.put_blocks(&doc.doc_id, &doc.blocks).unwrap(); + let baseline: i64 = env.with_conn(|c| { + c.query_row("SELECT COUNT(*) FROM blocks", [], |r| r.get(0)) + }); + assert_eq!(baseline, 2); + + // Now ask put_blocks to write to a doc_id that does NOT exist. + // The implementation issues `DELETE FROM blocks WHERE doc_id = ?` + // (no-op for the missing doc) followed by INSERTs that violate the + // FK constraint. The whole tx must roll back, so `blocks` count + // stays at `baseline`. + let phantom = DocumentId("0".repeat(32)); + let phantom_blocks = vec![Block::Heading(HeadingBlock { + common: CommonBlock { + block_id: kb_core::BlockId("9".repeat(32)), + heading_path: vec![], + source_span: SourceSpan::Line { start: 1, end: 1 }, + }, + level: 1, + text: "phantom".into(), + })]; + let res = store.put_blocks(&phantom, &phantom_blocks); + assert!(res.is_err(), "FK violation must surface as Err"); + + let after: i64 = env.with_conn(|c| { + c.query_row("SELECT COUNT(*) FROM blocks", [], |r| r.get(0)) + }); + assert_eq!( + after, baseline, + "transaction must roll back; blocks count must be unchanged" + ); +} diff --git a/crates/kb-store-sqlite/tests/ingest_report_snapshot.rs b/crates/kb-store-sqlite/tests/ingest_report_snapshot.rs new file mode 100644 index 0000000..2e6a563 --- /dev/null +++ b/crates/kb-store-sqlite/tests/ingest_report_snapshot.rs @@ -0,0 +1,99 @@ +//! Snapshot test pinning the JSON wire form of `kb_core::IngestReport` +//! for an inline fixture run. The store crate doesn't (yet) write +//! IngestReports — that's `kb-app`'s job — but the wire schema lives in +//! `kb-core`, and we want a determinism pin that fails loudly if the +//! shape drifts. +//! +//! Set `UPDATE_SNAPSHOTS=1` to re-bake the baseline. + +use std::path::PathBuf; + +use kb_core::{ + AssetId, ChunkerVersion, DocumentId, IngestItem, IngestItemKind, IngestReport, + ParserVersion, SourceScope, WorkspacePath, +}; +use serde_json::Value; + +fn baseline_path() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("snapshots") + .join("ingest_report.snapshot.json") +} + +fn fixture_report() -> IngestReport { + IngestReport { + scope: SourceScope { + root: PathBuf::from("/home/u/KB"), + include: vec!["**/*.md".into()], + exclude: vec![".git/**".into()], + }, + scanned: 3, + new: 2, + updated: 1, + skipped: 0, + errors: 0, + duration_ms: 187, + items: Some(vec![ + IngestItem { + kind: IngestItemKind::New, + doc_id: Some(DocumentId("a".repeat(32))), + doc_path: WorkspacePath::new("notes/alpha.md".into()).unwrap(), + asset_id: Some(AssetId("a".repeat(32))), + byte_len: Some(1234), + block_count: Some(7), + chunk_count: Some(3), + parser_version: Some(ParserVersion("pulldown-cmark-0.x".into())), + chunker_version: Some(ChunkerVersion("md-heading-v1".into())), + warnings: vec![], + error: None, + }, + IngestItem { + kind: IngestItemKind::Updated, + doc_id: Some(DocumentId("b".repeat(32))), + doc_path: WorkspacePath::new("notes/beta.md".into()).unwrap(), + asset_id: Some(AssetId("b".repeat(32))), + byte_len: Some(2048), + block_count: Some(12), + chunk_count: Some(5), + parser_version: Some(ParserVersion("pulldown-cmark-0.x".into())), + chunker_version: Some(ChunkerVersion("md-heading-v1".into())), + warnings: vec!["malformed frontmatter".into()], + error: None, + }, + ]), + } +} + +#[test] +fn ingest_report_wire_form_is_stable() { + let report = fixture_report(); + let actual = serde_json::to_value(&report).unwrap(); + let baseline = match std::fs::read_to_string(baseline_path()) { + Ok(s) => s, + Err(_) if std::env::var("UPDATE_SNAPSHOTS").is_ok() => { + std::fs::create_dir_all(baseline_path().parent().unwrap()).unwrap(); + let pretty = serde_json::to_string_pretty(&actual).unwrap(); + std::fs::write(baseline_path(), format!("{pretty}\n")).unwrap(); + return; + } + Err(e) => panic!( + "missing baseline {}; run with UPDATE_SNAPSHOTS=1: {e}", + baseline_path().display() + ), + }; + let expected: Value = serde_json::from_str(&baseline).unwrap(); + if actual != expected { + if std::env::var("UPDATE_SNAPSHOTS").is_ok() { + let pretty = serde_json::to_string_pretty(&actual).unwrap(); + std::fs::write(baseline_path(), format!("{pretty}\n")).unwrap(); + return; + } + let pretty = serde_json::to_string_pretty(&actual).unwrap(); + panic!( + "ingest_report snapshot drift\n\ + --- expected ({}) ---\n{baseline}\n\ + --- actual ---\n{pretty}", + baseline_path().display() + ); + } +} diff --git a/crates/kb-store-sqlite/tests/jobs.rs b/crates/kb-store-sqlite/tests/jobs.rs new file mode 100644 index 0000000..a5c5c32 --- /dev/null +++ b/crates/kb-store-sqlite/tests/jobs.rs @@ -0,0 +1,90 @@ +//! `JobRepo` smoke tests: create → progress → finish, list filters. + +use kb_core::{JobFilter, JobKind, JobRepo, JobStatus}; +use kb_store_sqlite::SqliteStore; +use serde_json::json; + +mod common; + +#[test] +fn create_then_progress_then_finish() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + let id = store + .create(JobKind::Ingest, json!({"path": "notes/x.md"})) + .unwrap(); + // Status starts pending. + let row = store.list(&JobFilter::default()).unwrap(); + assert_eq!(row.len(), 1); + assert_eq!(row[0].status, JobStatus::Pending); + + // First progress flips pending → running. + store + .update_progress(&id, json!({"processed": 1, "total": 10})) + .unwrap(); + let row = store.list(&JobFilter::default()).unwrap(); + assert_eq!(row[0].status, JobStatus::Running); + assert_eq!(row[0].progress.as_ref().unwrap()["total"], json!(10)); + + // Finish with success. + store + .finish(&id, JobStatus::Succeeded, None) + .unwrap(); + let row = store.list(&JobFilter::default()).unwrap(); + assert_eq!(row[0].status, JobStatus::Succeeded); + assert!(row[0].finished_at.is_some()); + assert!(row[0].error.is_none()); +} + +#[test] +fn finish_with_error_message_is_round_trippable() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + let id = store.create(JobKind::Embed, json!({})).unwrap(); + store + .finish(&id, JobStatus::Failed, Some("boom: model not pulled")) + .unwrap(); + + let row = store.list(&JobFilter::default()).unwrap(); + assert_eq!(row[0].status, JobStatus::Failed); + assert_eq!( + row[0].error.as_deref(), + Some("boom: model not pulled"), + "error message must round-trip" + ); +} + +#[test] +fn list_filters_status_and_kind() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + // Two ingest jobs (one finished succeeded, one pending) + one embed. + let a = store.create(JobKind::Ingest, json!({"a": 1})).unwrap(); + let _b = store.create(JobKind::Ingest, json!({"b": 1})).unwrap(); + let _c = store.create(JobKind::Embed, json!({"c": 1})).unwrap(); + store.finish(&a, JobStatus::Succeeded, None).unwrap(); + + let by_status_succeeded = store + .list(&JobFilter { + status: Some(JobStatus::Succeeded), + kind: None, + }) + .unwrap(); + assert_eq!(by_status_succeeded.len(), 1); + assert_eq!(by_status_succeeded[0].kind, JobKind::Ingest); + + let by_kind_embed = store + .list(&JobFilter { + status: None, + kind: Some(JobKind::Embed), + }) + .unwrap(); + assert_eq!(by_kind_embed.len(), 1); + assert_eq!(by_kind_embed[0].kind, JobKind::Embed); +} diff --git a/crates/kb-store-sqlite/tests/migration.rs b/crates/kb-store-sqlite/tests/migration.rs new file mode 100644 index 0000000..ad6ab54 --- /dev/null +++ b/crates/kb-store-sqlite/tests/migration.rs @@ -0,0 +1,83 @@ +//! Migration test: a fresh DB, after `run_migrations`, exposes every +//! table and index P1 needs (per §5.1–§5.7). + +use kb_store_sqlite::SqliteStore; + +mod common; + +#[test] +fn fresh_db_has_all_p1_tables_and_indexes() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).expect("open"); + store.run_migrations().expect("run migrations"); + + // Pull the list of user tables from sqlite_master. + let tables: Vec = env.with_conn(|c| { + let mut stmt = c.prepare( + "SELECT name FROM sqlite_master + WHERE type = 'table' AND name NOT LIKE 'sqlite_%' + ORDER BY name", + )?; + let rows = stmt.query_map([], |r| r.get::<_, String>(0))?; + let tables: rusqlite::Result> = rows.collect(); + tables + }); + + let required = [ + "answers", + "assets", + "blocks", + "chunks", + "document_tags", + "documents", + "embedding_records", + "eval_query_results", + "eval_runs", + "ingest_runs", + "jobs", + // refinery's own bookkeeping table (`refinery_schema_history`) + // also lands here; we don't pin it but it's expected. + "migrations", + "schema_meta", + ]; + for t in required { + assert!( + tables.iter().any(|n| n == t), + "table `{t}` missing; got {tables:?}" + ); + } + + // Pin the documented indexes (subset that matters for hot paths). + let indexes: Vec = env.with_conn(|c| { + let mut stmt = c.prepare( + "SELECT name FROM sqlite_master + WHERE type = 'index' AND name NOT LIKE 'sqlite_%' + ORDER BY name", + )?; + let rows = stmt.query_map([], |r| r.get::<_, String>(0))?; + let idx: rusqlite::Result> = rows.collect(); + idx + }); + for i in [ + "idx_assets_workspace_path", + "idx_assets_media_type", + "idx_docs_workspace_path", + "idx_docs_lang", + "idx_docs_source_type", + "idx_document_tags_tag", + "idx_blocks_doc_id", + "idx_chunks_doc_id", + "idx_chunks_chunker_version", + "idx_embed_chunk", + "idx_embed_model", + "idx_jobs_status", + "idx_jobs_kind", + "idx_answers_created_at", + "idx_answers_grounded", + ] { + assert!( + indexes.iter().any(|n| n == i), + "index `{i}` missing; got {indexes:?}" + ); + } +} From efdb71b1c39760117b832ca1c9a17e76ae8cab5c Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:14:17 +0000 Subject: [PATCH 6/9] p1-6: list_documents filter coverage test Round-trip three docs (en/ko, varied tags, varied trust) and exercise each DocFilter axis: default (all), lang, path_glob (workspace_path GLOB), tags_any (intersection via document_tags subquery + per-row tag hydration), and trust_min (Primary > Secondary > Generated rank gate). --- crates/kb-store-sqlite/tests/list_docs.rs | 140 ++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 crates/kb-store-sqlite/tests/list_docs.rs diff --git a/crates/kb-store-sqlite/tests/list_docs.rs b/crates/kb-store-sqlite/tests/list_docs.rs new file mode 100644 index 0000000..f690f6b --- /dev/null +++ b/crates/kb-store-sqlite/tests/list_docs.rs @@ -0,0 +1,140 @@ +//! `DocumentStore::list_documents` filter coverage. + +use std::path::PathBuf; + +use kb_core::{ + AssetId, AssetStorage, Block, CanonicalDocument, Checksum, CommonBlock, DocFilter, + DocumentId, DocumentStore, HeadingBlock, Lang, MediaType, Metadata, ParserVersion, + Provenance, RawAsset, SourceSpan, SourceType, SourceUri, TrustLevel, WorkspacePath, +}; +use kb_store_sqlite::SqliteStore; +use time::OffsetDateTime; + +mod common; + +fn make_doc( + suffix: char, + workspace_path: &str, + lang: &str, + tags: Vec<&str>, + trust: TrustLevel, +) -> (RawAsset, CanonicalDocument) { + let bytes: Vec = vec![suffix as u8; 16]; + let cs = blake3::hash(&bytes).to_hex().to_string(); + let asset_id = AssetId(format!("{suffix}").repeat(32)); + let asset = RawAsset { + asset_id: asset_id.clone(), + source_uri: SourceUri::File(PathBuf::from(format!("/tmp/{suffix}.md"))), + workspace_path: WorkspacePath::new(workspace_path.into()).unwrap(), + media_type: MediaType::Markdown, + byte_len: bytes.len() as u64, + checksum: Checksum(cs.clone()), + discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(), + stored: AssetStorage::Reference { + path: PathBuf::from(format!("/tmp/{suffix}.md")), + sha: Checksum(cs), + }, + }; + let doc_id = DocumentId(format!("d{suffix}").repeat(16)); + let block = Block::Heading(HeadingBlock { + common: CommonBlock { + block_id: kb_core::BlockId(format!("b{suffix}").repeat(16)), + heading_path: vec![], + source_span: SourceSpan::Line { start: 1, end: 1 }, + }, + level: 1, + text: format!("Title {suffix}"), + }); + let metadata = Metadata { + aliases: vec![], + tags: tags.into_iter().map(String::from).collect(), + created_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(), + updated_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(), + source_type: SourceType::Markdown, + trust_level: trust, + user_id_alias: None, + user: Default::default(), + }; + let doc = CanonicalDocument { + doc_id, + source_asset_id: asset_id, + workspace_path: asset.workspace_path.clone(), + title: format!("Title {suffix}"), + lang: Lang(lang.into()), + blocks: vec![block], + metadata, + provenance: Provenance { events: vec![] }, + parser_version: ParserVersion("test".into()), + schema_version: 1, + doc_version: 1, + }; + (asset, doc) +} + +#[test] +fn list_documents_filters_lang_and_tags() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + for (asset, doc) in [ + make_doc('a', "notes/a.md", "en", vec!["rust", "kb"], TrustLevel::Primary), + make_doc('b', "notes/b.md", "ko", vec!["rust"], TrustLevel::Secondary), + make_doc('c', "papers/c.md", "en", vec!["bio"], TrustLevel::Generated), + ] { + store.put_asset(&asset).unwrap(); + store.put_document(&doc).unwrap(); + } + + // No filter → all three docs. + let all = store.list_documents(&DocFilter::default()).unwrap(); + assert_eq!(all.len(), 3); + + // lang filter. + let en = store + .list_documents(&DocFilter { + lang: Some(Lang("en".into())), + ..Default::default() + }) + .unwrap(); + assert_eq!(en.len(), 2); + assert!(en.iter().all(|d| d.lang == Lang("en".into()))); + + // path glob. + let papers = store + .list_documents(&DocFilter { + path_glob: Some("papers/*.md".into()), + ..Default::default() + }) + .unwrap(); + assert_eq!(papers.len(), 1); + assert_eq!(papers[0].doc_path.0, "papers/c.md"); + + // tags_any. + let rust = store + .list_documents(&DocFilter { + tags_any: vec!["rust".into()], + ..Default::default() + }) + .unwrap(); + assert_eq!(rust.len(), 2); + // tags must be hydrated on the result. + for d in &rust { + assert!( + d.tags.iter().any(|t| t == "rust"), + "expected `rust` tag on {}: {:?}", + d.doc_path.0, + d.tags + ); + } + + // trust_min — Primary only. + let primary = store + .list_documents(&DocFilter { + trust_min: Some(TrustLevel::Primary), + ..Default::default() + }) + .unwrap(); + assert_eq!(primary.len(), 1); + assert_eq!(primary[0].trust_level, TrustLevel::Primary); +} From e41279de9627e8dcbb622d1a9b50eaf41efc31b1 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:33:19 +0000 Subject: [PATCH 7/9] p1-6: harden store boundary (atomic asset write, poison-tolerant mutex, AssetId validation) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three Important review fixes on the kb-store-sqlite write path: I1. Atomic asset write. put_asset_with_bytes now stages bytes to `.tmp..`, fsyncs, UPSERTs the row, then `rename`s into place (atomic on POSIX same-fs). On any failure between staging and rename we best-effort `remove_file` the temp so the previous orphan risk on UPSERT failure is gone. Reference mode is unchanged (no I/O, no orphan risk). I2. Poison-tolerant mutex. New `lock_conn` helper does `.lock().unwrap_or_else(|p| p.into_inner())`, so a single panic mid- transaction no longer poisons every subsequent store call. The rusqlite Transaction Drop already rolls back on panic, leaving the Connection state safe to reuse. All 13 prior `.expect("sqlite mutex poisoned")` sites in store.rs / documents.rs / jobs.rs now route through `lock_conn`. I3. AssetId shape validation. `kb_core::AssetId(pub String)` lets a hand-construction bypass the `FromStr` 32-hex invariant. Added `validate_asset_id` (32 ASCII hex chars) at every store entry that turns an AssetId into a path: `put_asset_with_bytes` and `DocumentStore::put_asset`. This shuts a potential path-traversal via `assets_path_for`'s `&id[..2]` shard slice. Tests: - `put_asset_with_bytes_orphan_cleanup_on_upsert_failure` — pre-seeds a row that takes the same `workspace_path` (UNIQUE), so the UPSERT trips a constraint not covered by `ON CONFLICT(asset_id)`. Asserts no final file and no leaked `*.tmp.*`. - `put_asset_with_bytes_rejects_invalid_asset_id` — passes `AssetId("../etc/passwd_padded_to_xx_xxxxx")` (32 chars, contains `/`). Asserts error and zero filesystem artifacts under `data_dir/assets/`. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kb-store-sqlite/src/documents.rs | 20 ++- crates/kb-store-sqlite/src/jobs.rs | 8 +- crates/kb-store-sqlite/src/store.rs | 169 +++++++++++++++---- crates/kb-store-sqlite/tests/asset_writer.rs | 114 +++++++++++++ 4 files changed, 269 insertions(+), 42 deletions(-) diff --git a/crates/kb-store-sqlite/src/documents.rs b/crates/kb-store-sqlite/src/documents.rs index 39cdeba..ed30d79 100644 --- a/crates/kb-store-sqlite/src/documents.rs +++ b/crates/kb-store-sqlite/src/documents.rs @@ -16,10 +16,14 @@ use rusqlite::params; use time::OffsetDateTime; use crate::error::StoreError; -use crate::store::{SqliteStore, upsert_asset_row}; +use crate::store::{SqliteStore, upsert_asset_row, validate_asset_id}; impl kb_core::DocumentStore for SqliteStore { fn put_asset(&self, asset: &kb_core::RawAsset) -> Result<()> { + // Validate the AssetId shape before any row work — defense in + // depth against hand-constructed `kb_core::AssetId` values that + // bypass `FromStr`. See `validate_asset_id` for rationale. + validate_asset_id(&asset.asset_id)?; // No bytes here — read storage_kind/storage_path from the // RawAsset's `stored` field per its convention (§3.3). Callers // that have raw bytes go through `put_asset_with_bytes` instead; @@ -33,12 +37,12 @@ impl kb_core::DocumentStore for SqliteStore { ("reference", path.to_string_lossy().into_owned()) } }; - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); upsert_asset_row(&conn, asset, storage_kind, &storage_path) } fn put_document(&self, doc: &kb_core::CanonicalDocument) -> Result<()> { - let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut conn = self.lock_conn(); let tx = conn.transaction().map_err(StoreError::from)?; upsert_document(&tx, doc)?; replace_document_tags(&tx, &doc.doc_id, &doc.metadata.tags)?; @@ -51,7 +55,7 @@ impl kb_core::DocumentStore for SqliteStore { doc: &kb_core::DocumentId, blocks: &[kb_core::Block], ) -> Result<()> { - let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut conn = self.lock_conn(); let tx = conn.transaction().map_err(StoreError::from)?; // DELETE-then-INSERT: §5.4 has no UNIQUE on (doc_id, ordinal) // so we cannot rely on UPSERT to surface block_id collisions. The @@ -97,7 +101,7 @@ impl kb_core::DocumentStore for SqliteStore { let now = OffsetDateTime::now_utc() .format(&time::format_description::well_known::Rfc3339) .context("format chunk created_at")?; - let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut conn = self.lock_conn(); let tx = conn.transaction().map_err(StoreError::from)?; tx.execute("DELETE FROM chunks WHERE doc_id = ?", params![doc.0]) .map_err(StoreError::from)?; @@ -144,7 +148,7 @@ impl kb_core::DocumentStore for SqliteStore { &self, id: &kb_core::DocumentId, ) -> Result> { - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); let row: Option = conn .query_row( "SELECT @@ -205,7 +209,7 @@ impl kb_core::DocumentStore for SqliteStore { } fn get_chunk(&self, id: &kb_core::ChunkId) -> Result> { - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); let row = conn .query_row( "SELECT @@ -248,7 +252,7 @@ impl kb_core::DocumentStore for SqliteStore { // Build a dynamic WHERE clause from the filter. Each condition // appends one positional `?` placeholder and one `Box` to `params` so order stays in sync. - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); let mut sql = String::from( "SELECT d.doc_id, d.workspace_path, d.title, d.lang, d.source_type, d.trust_level, d.parser_version, diff --git a/crates/kb-store-sqlite/src/jobs.rs b/crates/kb-store-sqlite/src/jobs.rs index 89d71dd..4493822 100644 --- a/crates/kb-store-sqlite/src/jobs.rs +++ b/crates/kb-store-sqlite/src/jobs.rs @@ -29,7 +29,7 @@ impl kb_core::JobRepo for SqliteStore { let kind_label = job_kind_label(&kind); let payload_json = serde_json::to_string(&payload) .context("serialize job payload")?; - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); conn.execute( "INSERT INTO jobs ( job_id, kind, status, payload_json, progress_json, @@ -51,7 +51,7 @@ impl kb_core::JobRepo for SqliteStore { let now = OffsetDateTime::now_utc() .format(&time::format_description::well_known::Rfc3339) .context("format job updated_at")?; - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); // status='pending' → 'running' on first progress update; later // progress calls keep status='running' until finish(). conn.execute( @@ -80,7 +80,7 @@ impl kb_core::JobRepo for SqliteStore { .map(|e| serde_json::to_string(&serde_json::json!({ "message": e }))) .transpose() .context("serialize job error")?; - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); conn.execute( "UPDATE jobs SET status = ?, @@ -98,7 +98,7 @@ impl kb_core::JobRepo for SqliteStore { &self, filter: &kb_core::JobFilter, ) -> Result> { - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); let mut sql = String::from( "SELECT job_id, kind, status, payload_json, progress_json, error_json, created_at, updated_at, finished_at diff --git a/crates/kb-store-sqlite/src/store.rs b/crates/kb-store-sqlite/src/store.rs index 72a605e..92d9837 100644 --- a/crates/kb-store-sqlite/src/store.rs +++ b/crates/kb-store-sqlite/src/store.rs @@ -7,7 +7,8 @@ //! mutex on the hot path. use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Mutex, MutexGuard}; use anyhow::{Context, Result}; use rusqlite::Connection; @@ -15,6 +16,15 @@ use rusqlite::Connection; use crate::error::StoreError; use crate::schema; +/// Monotonic counter used to namespace per-process temp file names so +/// concurrent `put_asset_with_bytes` calls in the same millisecond cannot +/// collide on `.tmp..`. +static TEMP_SUFFIX_COUNTER: AtomicU64 = AtomicU64::new(0); + +/// Length, in hex chars, of a valid `kb_core::AssetId`. blake3 first-half +/// truncated, mirrored from `kb-core`'s newtype invariant. +const ASSET_ID_HEX_LEN: usize = 32; + /// Default file name under `config.storage.data_dir`. Kept private — the /// path layout is a §6.3 design decision, not part of the store's public /// surface. @@ -80,7 +90,7 @@ impl SqliteStore { /// Apply all pending migrations bundled at compile time /// (`migrations/V001__init.sql` and any later additions). pub fn run_migrations(&self) -> Result<()> { - let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut conn = self.lock_conn(); schema::runner() .run(&mut *conn) .map_err(|e| StoreError::Migration(e.to_string()))?; @@ -88,6 +98,17 @@ impl SqliteStore { Ok(()) } + /// Acquire the connection mutex, recovering from poison. + /// + /// Poisoning here means a previous holder panicked while holding the + /// guard. The active rusqlite transaction (if any) was rolled back by + /// the `Transaction` `Drop` impl, so the connection state is still + /// safe to reuse — we simply unwrap the inner guard rather than + /// propagate the panic to every subsequent call. + pub(crate) fn lock_conn(&self) -> MutexGuard<'_, Connection> { + self.conn.lock().unwrap_or_else(|p| p.into_inner()) + } + /// Persist a `RawAsset` *with its raw bytes*: row goes into `assets`, /// bytes go to `data_dir/assets//` if `byte_len ≤ /// copy_threshold_mb`, otherwise the row records the source URI's @@ -101,6 +122,13 @@ impl SqliteStore { asset: &kb_core::RawAsset, bytes: &[u8], ) -> Result<()> { + // 0. Validate the AssetId shape before any I/O. `kb_core::AssetId` + // is a `pub String` newtype: `FromStr` enforces the 32-hex-char + // invariant, but a hand-constructed `AssetId("../etc/passwd…")` + // can bypass that and reach `assets_path_for`. Refuse such IDs at + // the store boundary to keep shard-dir slicing safe. + validate_asset_id(&asset.asset_id)?; + // 1. Verify the caller's checksum matches what's actually on the // wire. A drift here means the bytes the parser saw and the bytes // we're about to durably store disagree — refuse persistence. @@ -116,54 +144,103 @@ impl SqliteStore { // 2. Decide copy vs. reference. The threshold compares the // declared `byte_len` (caller-vouched) rather than `bytes.len()` // because some sources stream and `byte_len` is authoritative. - let (storage_kind, storage_path) = if asset.byte_len <= self.copy_threshold_bytes { + if asset.byte_len <= self.copy_threshold_bytes { + // Copy mode. To prevent file orphans on UPSERT failure we use + // the temp-file + atomic-rename pattern: + // (a) write bytes to `.tmp..` + // (b) fsync the temp file + // (c) UPSERT the row + // (d) on UPSERT success: rename temp → final (atomic on + // same fs) + // (e) on any failure between (a) and (d): best-effort delete + // of the temp file so we never leak bytes on disk. let dest = self.assets_path_for(&asset.asset_id); if let Some(parent) = dest.parent() { std::fs::create_dir_all(parent).with_context(|| { format!("create asset shard dir {}", parent.display()) })?; } - std::fs::write(&dest, bytes) - .with_context(|| format!("write asset bytes to {}", dest.display()))?; - // Mirror §6.6: files 0o644. - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let mut perms = std::fs::metadata(&dest)?.permissions(); - perms.set_mode(0o644); - std::fs::set_permissions(&dest, perms).with_context(|| { - format!("chmod 0o644 on {}", dest.display()) + let temp_path = temp_path_for(&dest); + // Inline closure so any `?` in (a)/(b) cleans up the temp + // file before bubbling out. + let write_and_upsert = || -> Result<()> { + { + let mut f = std::fs::File::create(&temp_path).with_context(|| { + format!("create temp asset file {}", temp_path.display()) + })?; + use std::io::Write; + f.write_all(bytes).with_context(|| { + format!("write asset bytes to {}", temp_path.display()) + })?; + f.sync_all().with_context(|| { + format!("fsync temp asset file {}", temp_path.display()) + })?; + } + // Mirror §6.6: files 0o644. + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&temp_path)?.permissions(); + perms.set_mode(0o644); + std::fs::set_permissions(&temp_path, perms).with_context(|| { + format!("chmod 0o644 on {}", temp_path.display()) + })?; + } + // UPSERT the row first; only after a successful row write + // do we publish the file via rename. A second + // `put_asset_with_bytes` for the same asset_id overwrites + // in place. + { + let conn = self.lock_conn(); + upsert_asset_row( + &conn, + asset, + "copied", + &dest.to_string_lossy(), + )?; + } + std::fs::rename(&temp_path, &dest).with_context(|| { + format!( + "atomic rename {} -> {}", + temp_path.display(), + dest.display() + ) })?; + Ok(()) + }; + match write_and_upsert() { + Ok(()) => Ok(()), + Err(e) => { + // Best-effort cleanup; ignore errors so the original + // failure (likely the more useful one) propagates. + let _ = std::fs::remove_file(&temp_path); + Err(e) + } } - ("copied", dest.to_string_lossy().into_owned()) } else { // Reference: caller's source path is recorded verbatim. We // accept either a `File(path)` or `Kb(uri)` SourceUri; the - // latter stores the raw `kb://...` string. - let path = match &asset.source_uri { + // latter stores the raw `kb://...` string. No file I/O ⇒ no + // orphan risk; just UPSERT the row. + let storage_path = match &asset.source_uri { kb_core::SourceUri::File(p) => p.to_string_lossy().into_owned(), kb_core::SourceUri::Kb(u) => u.clone(), }; - ("reference", path) - }; - - // 3. UPSERT the assets row. A second `put_asset_with_bytes` for - // the same asset_id (e.g. re-ingest) overwrites in place — the - // row is uniquely keyed by asset_id and re-derived from the - // RawAsset every time. - let conn = self.conn.lock().expect("sqlite mutex poisoned"); - upsert_asset_row(&conn, asset, storage_kind, &storage_path)?; - Ok(()) + let conn = self.lock_conn(); + upsert_asset_row(&conn, asset, "reference", &storage_path)?; + Ok(()) + } } /// Compute the `data_dir/assets//` path for an asset. /// `` is the first [`ASSET_SHARD_LEN`] hex chars of `asset_id`. + /// + /// Callers that build paths from caller-controlled IDs MUST first + /// invoke [`validate_asset_id`] (already enforced at every store + /// entry that takes a `RawAsset`). The `id.len() >= ASSET_SHARD_LEN` + /// guard below is a defense-in-depth fallback only. pub(crate) fn assets_path_for(&self, asset_id: &kb_core::AssetId) -> PathBuf { let id = &asset_id.0; - // Defensive: kb-core enforces 32 hex chars on AssetId construction - // (`FromStr` validates). If a future code path bypasses that, we - // fall back to the full id as the shard so we never panic on - // slicing. let shard = if id.len() >= ASSET_SHARD_LEN { &id[..ASSET_SHARD_LEN] } else { @@ -173,6 +250,38 @@ impl SqliteStore { } } +/// Reject an `AssetId` whose shape would let a malicious caller escape +/// the `data_dir/assets//` shard tree. `kb_core::AssetId(pub String)` +/// permits hand-construction, so any function that turns an `AssetId` +/// into a filesystem path must call this first. +pub(crate) fn validate_asset_id(asset_id: &kb_core::AssetId) -> Result<()> { + if asset_id.0.len() != ASSET_ID_HEX_LEN + || !asset_id.0.bytes().all(|b| b.is_ascii_hexdigit()) + { + anyhow::bail!( + "invalid AssetId shape (expected {} ASCII hex chars): {:?}", + ASSET_ID_HEX_LEN, + asset_id.0 + ); + } + Ok(()) +} + +/// Compute a per-call temp-file path next to `dest` that is unlikely to +/// collide with any other in-flight writer (process pid + monotonic +/// counter). The temp file lives in the same parent directory so the +/// final `rename` is an atomic same-filesystem rename on POSIX. +fn temp_path_for(dest: &Path) -> PathBuf { + let pid = std::process::id(); + let n = TEMP_SUFFIX_COUNTER.fetch_add(1, Ordering::Relaxed); + let parent = dest.parent().unwrap_or_else(|| Path::new(".")); + let file_name = dest + .file_name() + .map(|s| s.to_string_lossy().into_owned()) + .unwrap_or_else(|| "asset".to_string()); + parent.join(format!("{file_name}.tmp.{pid}.{n}")) +} + /// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes` /// path (which has bytes + computed `storage_kind/path`) and the /// `DocumentStore::put_asset` path (which only has the `RawAsset` and diff --git a/crates/kb-store-sqlite/tests/asset_writer.rs b/crates/kb-store-sqlite/tests/asset_writer.rs index 7aa5bfa..43112a7 100644 --- a/crates/kb-store-sqlite/tests/asset_writer.rs +++ b/crates/kb-store-sqlite/tests/asset_writer.rs @@ -99,6 +99,120 @@ fn reference_mode_does_not_write_file_but_records_path() { assert_eq!(storage_path, "/path/to/original.md"); } +#[test] +fn put_asset_with_bytes_orphan_cleanup_on_upsert_failure() { + // Goal: prove that if the row UPSERT fails AFTER the bytes have been + // staged on disk, no `/` file is left behind. + // + // Lever: the `assets` table has a UNIQUE INDEX on `workspace_path` + // (V001), but the UPSERT is `ON CONFLICT(asset_id)`. So if some other + // row already owns this `workspace_path`, the INSERT half of the + // UPSERT trips a UNIQUE constraint that the ON CONFLICT clause does + // NOT handle — UPSERT errors. The new asset's bytes were already + // staged; we assert they are NOT visible at the final destination. + let env = common::TestEnv::with_threshold(100); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + // Pre-populate a row that owns `notes/foo.md` (the workspace_path our + // fixture asset will also claim) under a *different* asset_id. + env.with_conn(|c| { + c.execute( + "INSERT INTO assets ( + asset_id, source_uri, workspace_path, media_type, byte_len, + checksum, storage_kind, storage_path, discovered_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + rusqlite::params![ + "b".repeat(32), + "file:///elsewhere/foo.md", + "notes/foo.md", + "\"markdown\"", + 7_i64, + "0".repeat(64), + "reference", + "/elsewhere/foo.md", + "2024-01-01T00:00:00Z", + ], + ) + }); + + let bytes = b"hello, sqlite"; + let cs = b3_full_hex(bytes); + let asset = fixed_asset(bytes, bytes.len() as u64, &cs); + + let err = store + .put_asset_with_bytes(&asset, bytes) + .expect_err("UPSERT must fail on workspace_path UNIQUE violation"); + let msg = format!("{err:#}"); + assert!( + msg.to_lowercase().contains("unique") || msg.to_lowercase().contains("constraint"), + "expected UNIQUE constraint failure, got: {msg}" + ); + + // Final destination must NOT exist (no orphan). + let aa = &asset.asset_id.0[..2]; + let dest = env.data_dir().join("assets").join(aa).join(&asset.asset_id.0); + assert!( + !dest.exists(), + "asset bytes were left orphan at {} after UPSERT failure", + dest.display() + ); + // No `*.tmp.*` either — temp file must be cleaned up too. + let shard_dir = env.data_dir().join("assets").join(aa); + if let Ok(entries) = std::fs::read_dir(&shard_dir) { + for entry in entries.flatten() { + let name = entry.file_name(); + let s = name.to_string_lossy(); + assert!( + !s.contains(".tmp."), + "temp file leaked at {}", + entry.path().display() + ); + } + } +} + +#[test] +fn put_asset_with_bytes_rejects_invalid_asset_id() { + // `kb_core::AssetId(pub String)` lets a hand-construction bypass the + // 32-hex `FromStr` invariant. The store boundary must reject any ID + // whose shape would let path construction escape `data_dir/assets/`. + let env = common::TestEnv::with_threshold(100); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + // 32 chars but contains a `/` — would let `assets_path_for` stitch + // together a path outside the shard tree. + let evil_id = "../etc/passwd_padded_to_xx_xxxxx".to_string(); + assert_eq!(evil_id.len(), 32, "test fixture must be 32 chars to exercise length-only checks"); + let mut asset = fixed_asset(b"x", 1, &b3_full_hex(b"x")); + asset.asset_id = AssetId(evil_id.clone()); + + let err = store + .put_asset_with_bytes(&asset, b"x") + .expect_err("must reject non-hex AssetId"); + let msg = format!("{err:#}"); + assert!( + msg.contains("invalid AssetId shape"), + "expected AssetId-shape rejection, got: {msg}" + ); + + // And the bytes must NOT have been staged anywhere under the assets + // tree (no I/O should have happened before validation). + let assets_dir = env.data_dir().join("assets"); + if assets_dir.exists() { + for entry in std::fs::read_dir(&assets_dir).unwrap().flatten() { + // Recurse one level into shard dirs and assert empty. + if let Some(sub) = std::fs::read_dir(entry.path()).unwrap().flatten().next() { + panic!( + "invalid AssetId still produced filesystem artifact at {}", + sub.path().display() + ); + } + } + } +} + #[test] fn checksum_mismatch_returns_conflict() { let env = common::TestEnv::new(); From 15b4d80efc582eb847a376cab2739beb617c4e54 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:33:50 +0000 Subject: [PATCH 8/9] p1-6: rename StoreError::Sqlx -> Sqlite, drop dead assets_root helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M1: `Sqlx` is a misleading leftover — this crate uses `rusqlite`, not sqlx. Rename the variant (and the doc reference to it) to `Sqlite`. No external pattern matches; the variant is reached only via `#[from]`. M11: `assets_root` was an `#[allow(dead_code)]` helper introduced for a test that never landed. Delete it so the dead-code allow goes with it. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kb-store-sqlite/src/error.rs | 4 ++-- crates/kb-store-sqlite/src/store.rs | 7 ------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/crates/kb-store-sqlite/src/error.rs b/crates/kb-store-sqlite/src/error.rs index fd6e42c..a7f9f3d 100644 --- a/crates/kb-store-sqlite/src/error.rs +++ b/crates/kb-store-sqlite/src/error.rs @@ -2,7 +2,7 @@ //! //! Boundary code (`kb-app`, `kb-cli`) flattens these into `anyhow::Error`, //! so the trait impls return `anyhow::Result` directly. Internally we -//! still distinguish `Conflict` (e.g. checksum mismatch) from `Sqlx` / +//! still distinguish `Conflict` (e.g. checksum mismatch) from `Sqlite` / //! `Migration` so callers that downcast can route refusal-style flows. use thiserror::Error; @@ -10,7 +10,7 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum StoreError { #[error("sqlite error: {0}")] - Sqlx(#[from] rusqlite::Error), + Sqlite(#[from] rusqlite::Error), #[error("migration error: {0}")] Migration(String), diff --git a/crates/kb-store-sqlite/src/store.rs b/crates/kb-store-sqlite/src/store.rs index 92d9837..83d0182 100644 --- a/crates/kb-store-sqlite/src/store.rs +++ b/crates/kb-store-sqlite/src/store.rs @@ -397,10 +397,3 @@ fn expand_data_dir(raw: &str) -> PathBuf { fn dirs_home_fallback() -> Option { None } - -/// Returns the root of the assets shard tree (`data_dir/assets/`). Used -/// by tests; kept crate-private otherwise. -#[allow(dead_code)] -pub(crate) fn assets_root(data_dir: &Path) -> PathBuf { - data_dir.join(ASSETS_SUBDIR) -} From b7367dedfe7effc2351d371c4ed64de6770243c6 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 30 Apr 2026 17:34:17 +0000 Subject: [PATCH 9/9] p1-6: doc-only TODO markers (section_label, doc_version invariant) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M9: add a `TODO(P2/P3)` comment near the NULL persistence at documents.rs (put_chunks). The `section_label` column exists in the §5.5 DDL but neither the in-memory Chunk struct nor the §2.6 wire schema carries the field, so NULL is the correct canonical value today — flag the future-bump intent in-line rather than leaving it implicit. M10: add a one-line invariant comment near the i64 -> u32 narrowing for `doc_version` in `get_document`. The invariant is documented at the write site (UPSERT bumps by 1 per re-ingest) — restate it at the read site so the cast is not silently load-bearing. No behaviour change. No tests touched. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kb-store-sqlite/src/documents.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/kb-store-sqlite/src/documents.rs b/crates/kb-store-sqlite/src/documents.rs index ed30d79..59db371 100644 --- a/crates/kb-store-sqlite/src/documents.rs +++ b/crates/kb-store-sqlite/src/documents.rs @@ -124,6 +124,9 @@ impl kb_core::DocumentStore for SqliteStore { // §5.5 has a `section_label` column but the in-memory Chunk // struct does not carry it (nor does the wire schema §2.6). // Persist NULL until a future bump introduces the field. + // TODO(P2/P3): populate `section_label` once Chunk and the + // wire schema gain the field; until then NULL is the correct + // canonical value. stmt.execute(params![ chunk.chunk_id.0, chunk.doc_id.0, @@ -203,6 +206,12 @@ impl kb_core::DocumentStore for SqliteStore { metadata, provenance, parser_version: kb_core::ParserVersion(row.parser_version), + // INVARIANT: `doc_version` is bumped by 1 on every re-ingest + // (see `upsert_document`). The column is INTEGER (i64) but + // CanonicalDocument carries u32; an overflow would require + // 2^32 re-ingests of the same document, which is well beyond + // any realistic ingest frequency. Truncating cast is safe + // under that invariant. schema_version: row.schema_version as u32, doc_version: row.doc_version as u32, }))