p1-6: scaffold kb-store-sqlite crate + V001 full §5 DDL

New workspace member crate `kb-store-sqlite` (allowed deps only:
kb-core, kb-config, rusqlite[bundled], refinery, serde, serde_json,
time, blake3, tracing, anyhow, thiserror; dev-deps add kb-parse-md /
kb-normalize / kb-chunk for the contract round-trip test).

Migration V001 replaces the P0-1 stub with the full §5 DDL (assets,
documents, document_tags, blocks, chunks with policy_hash,
embedding_records, jobs, ingest_runs, answers, eval_runs,
eval_query_results) plus the §5 indexes. FTS5 virtual table + triggers
remain deferred to V002 (P2-1).

Public surface per task spec:
  SqliteStore::open / run_migrations / put_asset_with_bytes
  impl DocumentStore for SqliteStore (7 trait methods)
  impl JobRepo for SqliteStore (4 trait methods)
  StoreError { Sqlx, Migration, Conflict }

Behavior:
- Pragmas at open: foreign_keys=ON, journal_mode=WAL,
  synchronous=NORMAL, temp_store=MEMORY.
- Asset writer: byte_len ≤ copy_threshold_mb * 1MiB → copy to
  data_dir/assets/<aa>/<asset_id> (mode 0o644 on Unix), else
  reference. blake3(bytes) verified against asset.checksum; mismatch →
  Conflict.
- Idempotency: put_document UPSERTs and bumps doc_version + 1 on
  conflict; put_blocks / put_chunks DELETE-then-INSERT; document_tags
  re-derived per put_document.
- get_document rehydrates blocks via payload_json ordered by stream
  ordinal.
- list_documents builds dynamic WHERE from DocFilter (lang / trust_min
  / path_glob via GLOB / tags_any via document_tags subquery).
- JobRepo: jobs.kind/status are stored as lowercase enum tags; create
  mints a 32-hex JobId via blake3(kind || payload || nanos).

Tests follow in subsequent commits.
This commit is contained in:
2026-04-30 17:08:36 +00:00
parent 207a0ff61e
commit a3390d5171
10 changed files with 1906 additions and 4 deletions

452
Cargo.lock generated
View File

@@ -2,6 +2,18 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@@ -79,6 +91,17 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.5.0"
@@ -286,6 +309,17 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "either"
version = "1.15.0"
@@ -308,6 +342,18 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "2.4.1"
@@ -329,6 +375,15 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf"
dependencies = [
"percent-encoding",
]
[[package]]
name = "fst"
version = "0.4.7"
@@ -415,6 +470,9 @@ name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
@@ -431,18 +489,129 @@ version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51"
[[package]]
name = "hashlink"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "icu_collections"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43"
dependencies = [
"displaydoc",
"potential_utf",
"yoke",
"zerofrom",
"zerovec",
]
[[package]]
name = "icu_locale_core"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6"
dependencies = [
"displaydoc",
"litemap",
"tinystr",
"writeable",
"zerovec",
]
[[package]]
name = "icu_normalizer"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599"
dependencies = [
"icu_collections",
"icu_normalizer_data",
"icu_properties",
"icu_provider",
"smallvec",
"zerovec",
]
[[package]]
name = "icu_normalizer_data"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a"
[[package]]
name = "icu_properties"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec"
dependencies = [
"icu_collections",
"icu_locale_core",
"icu_properties_data",
"icu_provider",
"zerotrie",
"zerovec",
]
[[package]]
name = "icu_properties_data"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af"
[[package]]
name = "icu_provider"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614"
dependencies = [
"displaydoc",
"icu_locale_core",
"writeable",
"yoke",
"zerofrom",
"zerotrie",
"zerovec",
]
[[package]]
name = "id-arena"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954"
[[package]]
name = "idna"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de"
dependencies = [
"idna_adapter",
"smallvec",
"utf8_iter",
]
[[package]]
name = "idna_adapter"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344"
dependencies = [
"icu_normalizer",
"icu_properties",
]
[[package]]
name = "ignore"
version = "0.4.25"
@@ -649,6 +818,27 @@ dependencies = [
"walkdir",
]
[[package]]
name = "kb-store-sqlite"
version = "0.1.0"
dependencies = [
"anyhow",
"blake3",
"kb-chunk",
"kb-config",
"kb-core",
"kb-normalize",
"kb-parse-md",
"refinery",
"rusqlite",
"serde",
"serde_json",
"tempfile",
"thiserror 2.0.18",
"time",
"tracing",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -676,6 +866,17 @@ dependencies = [
"libc",
]
[[package]]
name = "libsqlite3-sys"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "lingua"
version = "1.8.0"
@@ -744,6 +945,12 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]]
name = "litemap"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0"
[[package]]
name = "lock_api"
version = "0.4.14"
@@ -835,12 +1042,33 @@ dependencies = [
"windows-link",
]
[[package]]
name = "percent-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "pin-project-lite"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pkg-config"
version = "0.3.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
[[package]]
name = "potential_utf"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564"
dependencies = [
"zerovec",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -938,6 +1166,50 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "refinery"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ba5d693abf62492c37268512ff35b77655d2e957ca53dab85bf993fe9172d15"
dependencies = [
"refinery-core",
"refinery-macros",
]
[[package]]
name = "refinery-core"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a83581f18c1a4c3a6ebd7a174bdc665f17f618d79f7edccb6a0ac67e660b319"
dependencies = [
"async-trait",
"cfg-if",
"log",
"regex",
"rusqlite",
"serde",
"siphasher",
"thiserror 1.0.69",
"time",
"toml",
"url",
"walkdir",
]
[[package]]
name = "refinery-macros"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72c225407d8e52ef8cf094393781ecda9a99d6544ec28d90a6915751de259264"
dependencies = [
"heck",
"proc-macro2",
"quote",
"refinery-core",
"regex",
"syn",
]
[[package]]
name = "regex"
version = "1.12.3"
@@ -967,6 +1239,20 @@ version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
name = "rusqlite"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e"
dependencies = [
"bitflags",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"smallvec",
]
[[package]]
name = "rustix"
version = "1.1.4"
@@ -1121,6 +1407,12 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "siphasher"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e"
[[package]]
name = "slab"
version = "0.4.12"
@@ -1133,6 +1425,12 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "stable_deref_trait"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
[[package]]
name = "strsim"
version = "0.11.1"
@@ -1174,6 +1472,17 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synstructure"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tempfile"
version = "3.27.0"
@@ -1267,6 +1576,16 @@ dependencies = [
"time-core",
]
[[package]]
name = "tinystr"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d"
dependencies = [
"displaydoc",
"zerovec",
]
[[package]]
name = "tinyvec"
version = "1.11.0"
@@ -1443,6 +1762,24 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]]
name = "url"
version = "2.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
"serde",
]
[[package]]
name = "utf8_iter"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
@@ -1455,6 +1792,18 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
@@ -1761,6 +2110,109 @@ dependencies = [
"wasmparser",
]
[[package]]
name = "writeable"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4"
[[package]]
name = "yoke"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca"
dependencies = [
"stable_deref_trait",
"yoke-derive",
"zerofrom",
]
[[package]]
name = "yoke-derive"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "zerocopy"
version = "0.8.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zerofrom"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "zerotrie"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf"
dependencies = [
"displaydoc",
"yoke",
"zerofrom",
]
[[package]]
name = "zerovec"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239"
dependencies = [
"yoke",
"zerofrom",
"zerovec-derive",
]
[[package]]
name = "zerovec-derive"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zmij"
version = "1.0.21"

View File

@@ -8,6 +8,7 @@ members = [
"crates/kb-parse-md",
"crates/kb-normalize",
"crates/kb-chunk",
"crates/kb-store-sqlite",
"crates/kb-app",
"crates/kb-cli",
]

View File

@@ -0,0 +1,35 @@
[package]
name = "kb-store-sqlite"
version = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
description = "SQLite-backed DocumentStore + JobRepo for kb (§5 DDL, §7.2 traits)"
[dependencies]
kb-core = { path = "../kb-core" }
kb-config = { path = "../kb-config" }
# `bundled` ships SQLite source + builds in-tree (no system libsqlite3).
# Explicitly NOT `bundled-sqlcipher` per task allowed-deps list.
rusqlite = { version = "0.32", features = ["bundled"] }
refinery = { version = "0.8", features = ["rusqlite"] }
serde = { workspace = true }
serde_json = { workspace = true }
time = { workspace = true }
blake3 = { workspace = true }
tracing = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
[dev-dependencies]
tempfile = "3"
serde_json = { workspace = true }
# kb-parse-md / kb-normalize / kb-chunk are dev-only — used to build a
# CanonicalDocument + Vec<Chunk> from a fixture in the contract round-trip
# test. Forbidden as regular deps per design §8 (store consumes domain
# types from kb-core only); `cargo tree -p kb-store-sqlite --depth 1`
# (default scope, excludes dev-deps) confirms this.
kb-parse-md = { path = "../kb-parse-md" }
kb-normalize = { path = "../kb-normalize" }
kb-chunk = { path = "../kb-chunk" }

View File

@@ -0,0 +1,641 @@
//! `DocumentStore` impl: assets, documents, document_tags, blocks, chunks.
//!
//! Transactions: per design §5.8, one ingest of one document is one
//! transaction. We expose the raw trait methods at fine granularity (so
//! `kb-app` can compose), and each one wraps its own short transaction.
//! A higher-level `ingest_document` helper that wraps put_document +
//! put_blocks + put_chunks in a single tx is intentionally NOT shipped in
//! P1-6 — `kb-app` (P1's caller layer) is the right place to compose.
//!
//! Idempotency: re-ingesting `(workspace_path, asset_id, parser_version)`
//! UPSERTs the documents row, bumps `doc_version`, and replaces all
//! blocks / chunks / document_tags. No row duplication.
use anyhow::{Context, Result};
use rusqlite::params;
use time::OffsetDateTime;
use crate::error::StoreError;
use crate::store::{SqliteStore, upsert_asset_row};
impl kb_core::DocumentStore for SqliteStore {
fn put_asset(&self, asset: &kb_core::RawAsset) -> Result<()> {
// No bytes here — read storage_kind/storage_path from the
// RawAsset's `stored` field per its convention (§3.3). Callers
// that have raw bytes go through `put_asset_with_bytes` instead;
// this branch is for the case where bytes were already persisted
// (or referenced) and we just want to record the row.
let (storage_kind, storage_path) = match &asset.stored {
kb_core::AssetStorage::Copied { path } => {
("copied", path.to_string_lossy().into_owned())
}
kb_core::AssetStorage::Reference { path, .. } => {
("reference", path.to_string_lossy().into_owned())
}
};
let conn = self.conn.lock().expect("sqlite mutex poisoned");
upsert_asset_row(&conn, asset, storage_kind, &storage_path)
}
fn put_document(&self, doc: &kb_core::CanonicalDocument) -> Result<()> {
let mut conn = self.conn.lock().expect("sqlite mutex poisoned");
let tx = conn.transaction().map_err(StoreError::from)?;
upsert_document(&tx, doc)?;
replace_document_tags(&tx, &doc.doc_id, &doc.metadata.tags)?;
tx.commit().map_err(StoreError::from)?;
Ok(())
}
fn put_blocks(
&self,
doc: &kb_core::DocumentId,
blocks: &[kb_core::Block],
) -> Result<()> {
let mut conn = self.conn.lock().expect("sqlite mutex poisoned");
let tx = conn.transaction().map_err(StoreError::from)?;
// DELETE-then-INSERT: §5.4 has no UNIQUE on (doc_id, ordinal)
// so we cannot rely on UPSERT to surface block_id collisions. The
// simplest correct path is to wipe and re-insert; the §5.8
// per-document transaction wraps both halves so a partial state
// never lands.
tx.execute("DELETE FROM blocks WHERE doc_id = ?", params![doc.0])
.map_err(StoreError::from)?;
let mut stmt = tx
.prepare(
"INSERT INTO blocks (
block_id, doc_id, kind, heading_path_json,
ordinal, source_span_json, payload_json
) VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.map_err(StoreError::from)?;
// Ordinal here is the position of the block in the document's
// overall block stream — used for sort-on-load, not the §4.3
// (heading_path, kind)-scoped ordinal that fed `block_id`.
for (i, block) in blocks.iter().enumerate() {
let row = block_to_row(doc, block, i as i64)?;
stmt.execute(params![
row.block_id,
row.doc_id,
row.kind,
row.heading_path_json,
row.ordinal,
row.source_span_json,
row.payload_json,
])
.map_err(StoreError::from)?;
}
drop(stmt);
tx.commit().map_err(StoreError::from)?;
Ok(())
}
fn put_chunks(
&self,
doc: &kb_core::DocumentId,
chunks: &[kb_core::Chunk],
) -> Result<()> {
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format chunk created_at")?;
let mut conn = self.conn.lock().expect("sqlite mutex poisoned");
let tx = conn.transaction().map_err(StoreError::from)?;
tx.execute("DELETE FROM chunks WHERE doc_id = ?", params![doc.0])
.map_err(StoreError::from)?;
let mut stmt = tx
.prepare(
"INSERT INTO chunks (
chunk_id, doc_id, text, heading_path_json,
section_label, source_spans_json, token_estimate,
chunker_version, policy_hash, block_ids_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.map_err(StoreError::from)?;
for chunk in chunks {
let heading_path = serde_json::to_string(&chunk.heading_path)
.context("serialize chunk.heading_path")?;
let source_spans = serde_json::to_string(&chunk.source_spans)
.context("serialize chunk.source_spans")?;
let block_ids = serde_json::to_string(&chunk.block_ids)
.context("serialize chunk.block_ids")?;
// §5.5 has a `section_label` column but the in-memory Chunk
// struct does not carry it (nor does the wire schema §2.6).
// Persist NULL until a future bump introduces the field.
stmt.execute(params![
chunk.chunk_id.0,
chunk.doc_id.0,
chunk.text,
heading_path,
Option::<String>::None,
source_spans,
chunk.token_estimate as i64,
chunk.chunker_version.0,
chunk.policy_hash,
block_ids,
now,
])
.map_err(StoreError::from)?;
}
drop(stmt);
tx.commit().map_err(StoreError::from)?;
Ok(())
}
fn get_document(
&self,
id: &kb_core::DocumentId,
) -> Result<Option<kb_core::CanonicalDocument>> {
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let row: Option<DocumentRow> = conn
.query_row(
"SELECT
doc_id, asset_id, workspace_path, title, lang,
source_type, trust_level, parser_version,
doc_version, schema_version, metadata_json,
provenance_json, created_at, updated_at
FROM documents WHERE doc_id = ?",
params![id.0],
document_row_from_sql,
)
.map(Some)
.or_else(rows_optional)
.map_err(StoreError::from)?;
let Some(row) = row else { return Ok(None) };
// Rehydrate blocks. Sort by stream-ordinal so the returned
// CanonicalDocument matches the order originally persisted.
let mut blocks_stmt = conn
.prepare(
"SELECT payload_json FROM blocks
WHERE doc_id = ? ORDER BY ordinal ASC",
)
.map_err(StoreError::from)?;
let block_rows = blocks_stmt
.query_map(params![id.0], |r| {
let payload_json: String = r.get(0)?;
Ok(payload_json)
})
.map_err(StoreError::from)?;
let mut blocks: Vec<kb_core::Block> = Vec::new();
for row in block_rows {
let payload_json = row.map_err(StoreError::from)?;
let block: kb_core::Block = serde_json::from_str(&payload_json)
.context("deserialize block payload_json")?;
blocks.push(block);
}
let metadata: kb_core::Metadata = serde_json::from_str(&row.metadata_json)
.context("deserialize metadata_json")?;
let provenance: kb_core::Provenance =
serde_json::from_str(&row.provenance_json)
.context("deserialize provenance_json")?;
Ok(Some(kb_core::CanonicalDocument {
doc_id: kb_core::DocumentId(row.doc_id),
source_asset_id: kb_core::AssetId(row.asset_id),
workspace_path: kb_core::WorkspacePath(row.workspace_path),
title: row.title.unwrap_or_default(),
lang: kb_core::Lang(row.lang.unwrap_or_default()),
blocks,
metadata,
provenance,
parser_version: kb_core::ParserVersion(row.parser_version),
schema_version: row.schema_version as u32,
doc_version: row.doc_version as u32,
}))
}
fn get_chunk(&self, id: &kb_core::ChunkId) -> Result<Option<kb_core::Chunk>> {
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let row = conn
.query_row(
"SELECT
chunk_id, doc_id, text, heading_path_json,
source_spans_json, token_estimate, chunker_version,
policy_hash, block_ids_json
FROM chunks WHERE chunk_id = ?",
params![id.0],
chunk_row_from_sql,
)
.map(Some)
.or_else(rows_optional)
.map_err(StoreError::from)?;
let Some(row) = row else { return Ok(None) };
let heading_path: Vec<String> = serde_json::from_str(&row.heading_path_json)
.context("deserialize chunk.heading_path_json")?;
let source_spans: Vec<kb_core::SourceSpan> =
serde_json::from_str(&row.source_spans_json)
.context("deserialize chunk.source_spans_json")?;
let block_ids: Vec<kb_core::BlockId> =
serde_json::from_str(&row.block_ids_json)
.context("deserialize chunk.block_ids_json")?;
Ok(Some(kb_core::Chunk {
chunk_id: kb_core::ChunkId(row.chunk_id),
doc_id: kb_core::DocumentId(row.doc_id),
block_ids,
text: row.text,
heading_path,
source_spans,
token_estimate: row.token_estimate as usize,
chunker_version: kb_core::ChunkerVersion(row.chunker_version),
policy_hash: row.policy_hash,
}))
}
fn list_documents(
&self,
filter: &kb_core::DocFilter,
) -> Result<Vec<kb_core::DocSummary>> {
// Build a dynamic WHERE clause from the filter. Each condition
// appends one positional `?` placeholder and one `Box<dyn
// ToSql>` to `params` so order stays in sync.
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let mut sql = String::from(
"SELECT d.doc_id, d.workspace_path, d.title, d.lang,
d.source_type, d.trust_level, d.parser_version,
d.created_at, d.updated_at,
a.byte_len,
(SELECT COUNT(*) FROM chunks c WHERE c.doc_id = d.doc_id) AS chunk_count,
-- chunker_version: assume one chunker per doc; pick
-- any row's value. NULL if no chunks yet.
(SELECT chunker_version FROM chunks c2
WHERE c2.doc_id = d.doc_id LIMIT 1) AS chunker_version
FROM documents d
JOIN assets a ON a.asset_id = d.asset_id
WHERE 1=1",
);
let mut params_dyn: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(lang) = &filter.lang {
sql.push_str(" AND d.lang = ?");
params_dyn.push(Box::new(lang.0.clone()));
}
if let Some(trust_min) = &filter.trust_min {
// Map the enum to its rank: Generated < Secondary < Primary.
// (Higher trust strictly contains lower trust.)
sql.push_str(" AND CASE d.trust_level
WHEN 'primary' THEN 3
WHEN 'secondary' THEN 2
WHEN 'generated' THEN 1
ELSE 0
END >= ?");
let rank: i64 = match trust_min {
kb_core::TrustLevel::Primary => 3,
kb_core::TrustLevel::Secondary => 2,
kb_core::TrustLevel::Generated => 1,
};
params_dyn.push(Box::new(rank));
}
if let Some(glob) = &filter.path_glob {
sql.push_str(" AND d.workspace_path GLOB ?");
params_dyn.push(Box::new(glob.clone()));
}
if !filter.tags_any.is_empty() {
// INTERSECT-style filter: doc must own at least one of the
// requested tags. Use IN with a placeholder list.
sql.push_str(" AND d.doc_id IN (SELECT doc_id FROM document_tags WHERE tag IN (");
for (i, tag) in filter.tags_any.iter().enumerate() {
if i > 0 {
sql.push(',');
}
sql.push('?');
params_dyn.push(Box::new(tag.clone()));
}
sql.push_str("))");
}
sql.push_str(" ORDER BY d.workspace_path");
let mut stmt = conn.prepare(&sql).map_err(StoreError::from)?;
let rows = stmt
.query_map(
rusqlite::params_from_iter(params_dyn.iter().map(|b| b.as_ref())),
doc_summary_from_sql,
)
.map_err(StoreError::from)?;
let mut out = Vec::new();
for r in rows {
let summary = r.map_err(StoreError::from)?;
// tags filter at row-load time: pull the tag list per doc.
let mut tag_stmt = conn
.prepare("SELECT tag FROM document_tags WHERE doc_id = ? ORDER BY tag")
.map_err(StoreError::from)?;
let tag_iter = tag_stmt
.query_map(params![summary.doc_id.0], |r| r.get::<_, String>(0))
.map_err(StoreError::from)?;
let tags: Vec<String> = tag_iter
.collect::<rusqlite::Result<Vec<_>>>()
.map_err(StoreError::from)?;
out.push(kb_core::DocSummary { tags, ..summary });
}
Ok(out)
}
}
// ── Internal row + (de)serialization helpers ─────────────────────────────
struct DocumentRow {
doc_id: String,
asset_id: String,
workspace_path: String,
title: Option<String>,
lang: Option<String>,
parser_version: String,
doc_version: i64,
schema_version: i64,
metadata_json: String,
provenance_json: String,
// source_type / trust_level are loaded back via metadata_json round-trip,
// so we do not need separate fields here for `get_document`.
}
fn document_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
Ok(DocumentRow {
doc_id: row.get(0)?,
asset_id: row.get(1)?,
workspace_path: row.get(2)?,
title: row.get(3)?,
lang: row.get(4)?,
// 5: source_type, 6: trust_level — read but unused here (metadata_json
// is authoritative). Keeping them in the SELECT makes the column
// ordering match the INSERT and allows future fields without
// shifting indexes.
parser_version: row.get(7)?,
doc_version: row.get(8)?,
schema_version: row.get(9)?,
metadata_json: row.get(10)?,
provenance_json: row.get(11)?,
})
}
struct ChunkRow {
chunk_id: String,
doc_id: String,
text: String,
heading_path_json: String,
source_spans_json: String,
token_estimate: i64,
chunker_version: String,
policy_hash: String,
block_ids_json: String,
}
fn chunk_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
Ok(ChunkRow {
chunk_id: row.get(0)?,
doc_id: row.get(1)?,
text: row.get(2)?,
heading_path_json: row.get(3)?,
source_spans_json: row.get(4)?,
token_estimate: row.get(5)?,
chunker_version: row.get(6)?,
policy_hash: row.get(7)?,
block_ids_json: row.get(8)?,
})
}
fn doc_summary_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<kb_core::DocSummary> {
let doc_id: String = row.get(0)?;
let workspace_path: String = row.get(1)?;
let title: Option<String> = row.get(2)?;
let lang: Option<String> = row.get(3)?;
let source_type_raw: String = row.get(4)?;
let trust_level_raw: String = row.get(5)?;
let parser_version: String = row.get(6)?;
let created_at_raw: String = row.get(7)?;
let updated_at_raw: String = row.get(8)?;
let byte_len: i64 = row.get(9)?;
let chunk_count: i64 = row.get(10)?;
let chunker_version: Option<String> = row.get(11)?;
// De-serialize the lowercase string forms that match
// `#[serde(rename_all = "lowercase")]` on each enum.
let source_type: kb_core::SourceType =
serde_json::from_value(serde_json::Value::String(source_type_raw))
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?;
let trust_level: kb_core::TrustLevel =
serde_json::from_value(serde_json::Value::String(trust_level_raw))
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))?;
let created_at = OffsetDateTime::parse(
&created_at_raw,
&time::format_description::well_known::Rfc3339,
)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e)))?;
let updated_at = OffsetDateTime::parse(
&updated_at_raw,
&time::format_description::well_known::Rfc3339,
)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(8, rusqlite::types::Type::Text, Box::new(e)))?;
Ok(kb_core::DocSummary {
doc_id: kb_core::DocumentId(doc_id),
doc_path: kb_core::WorkspacePath(workspace_path),
title: title.unwrap_or_default(),
lang: kb_core::Lang(lang.unwrap_or_default()),
// tags filled in by caller after a per-doc fetch.
tags: Vec::new(),
trust_level,
source_type,
byte_len: byte_len as u64,
chunk_count: chunk_count as u32,
created_at,
updated_at,
parser_version: kb_core::ParserVersion(parser_version),
// chunker_version may be NULL when the doc has no chunks yet.
// Empty string is the cleanest fallback consistent with the wire
// schema's required `chunker_version` field on DocSummary v1.
chunker_version: kb_core::ChunkerVersion(chunker_version.unwrap_or_default()),
})
}
/// Map a `QueryReturnedNoRows` into `Ok(None)` so the trait returns
/// `Option<T>` rather than an error for the common "missing" case.
fn rows_optional<T>(err: rusqlite::Error) -> rusqlite::Result<Option<T>> {
match err {
rusqlite::Error::QueryReturnedNoRows => Ok(None),
e => Err(e),
}
}
/// UPSERT the documents row and bump `doc_version` on conflict.
fn upsert_document(
tx: &rusqlite::Transaction<'_>,
doc: &kb_core::CanonicalDocument,
) -> Result<()> {
let metadata_json = serde_json::to_string(&doc.metadata)
.context("serialize metadata")?;
let provenance_json = serde_json::to_string(&doc.provenance)
.context("serialize provenance")?;
// String form of the lowercase serde representation. We avoid
// embedding `serde_json::to_string` quotes (`"markdown"` → just
// `markdown` for the column).
let source_type = source_type_label(&doc.metadata.source_type);
let trust_level = trust_level_label(&doc.metadata.trust_level);
let created_at = doc
.metadata
.created_at
.format(&time::format_description::well_known::Rfc3339)
.context("format created_at")?;
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format updated_at")?;
tx.execute(
"INSERT INTO documents (
doc_id, asset_id, workspace_path, title, lang,
source_type, trust_level, parser_version,
doc_version, schema_version, metadata_json,
provenance_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(doc_id) DO UPDATE SET
asset_id = excluded.asset_id,
workspace_path = excluded.workspace_path,
title = excluded.title,
lang = excluded.lang,
source_type = excluded.source_type,
trust_level = excluded.trust_level,
parser_version = excluded.parser_version,
-- doc_version: bump on update. excluded.doc_version is the
-- caller's submitted value; we ignore it and add 1 to the
-- existing column so each re-ingest cleanly increments.
doc_version = documents.doc_version + 1,
schema_version = excluded.schema_version,
metadata_json = excluded.metadata_json,
provenance_json = excluded.provenance_json,
updated_at = excluded.updated_at",
params![
doc.doc_id.0,
doc.source_asset_id.0,
doc.workspace_path.0,
doc.title,
doc.lang.0,
source_type,
trust_level,
doc.parser_version.0,
doc.doc_version as i64,
doc.schema_version as i64,
metadata_json,
provenance_json,
created_at,
now,
],
)
.map_err(StoreError::from)?;
Ok(())
}
fn source_type_label(s: &kb_core::SourceType) -> &'static str {
match s {
kb_core::SourceType::Markdown => "markdown",
kb_core::SourceType::Note => "note",
kb_core::SourceType::Paper => "paper",
kb_core::SourceType::Reference => "reference",
kb_core::SourceType::Inbox => "inbox",
}
}
fn trust_level_label(s: &kb_core::TrustLevel) -> &'static str {
match s {
kb_core::TrustLevel::Primary => "primary",
kb_core::TrustLevel::Secondary => "secondary",
kb_core::TrustLevel::Generated => "generated",
}
}
fn replace_document_tags(
tx: &rusqlite::Transaction<'_>,
doc_id: &kb_core::DocumentId,
tags: &[String],
) -> Result<()> {
tx.execute("DELETE FROM document_tags WHERE doc_id = ?", params![doc_id.0])
.map_err(StoreError::from)?;
let mut stmt = tx
.prepare(
"INSERT INTO document_tags (doc_id, tag) VALUES (?, ?)
ON CONFLICT(doc_id, tag) DO NOTHING",
)
.map_err(StoreError::from)?;
for tag in tags {
stmt.execute(params![doc_id.0, tag])
.map_err(StoreError::from)?;
}
Ok(())
}
struct BlockRow {
block_id: String,
doc_id: String,
kind: &'static str,
heading_path_json: String,
ordinal: i64,
source_span_json: String,
/// The full Block JSON — round-trip path for `get_document`. Also
/// future-proofs new variants without schema churn.
payload_json: String,
}
fn block_to_row(
doc: &kb_core::DocumentId,
block: &kb_core::Block,
stream_ordinal: i64,
) -> Result<BlockRow> {
let (block_id, kind, heading_path_json, source_span_json) = match block {
kb_core::Block::Heading(b) => (
b.common.block_id.0.clone(),
"heading",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::Paragraph(b) | kb_core::Block::Quote(b) => (
b.common.block_id.0.clone(),
// Discriminate Paragraph vs Quote on the enum tag: payload
// round-trip carries the variant, but the column needs a
// stable label for filtering.
if matches!(block, kb_core::Block::Paragraph(_)) {
"paragraph"
} else {
"quote"
},
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::List(b) => (
b.common.block_id.0.clone(),
"list",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::Code(b) => (
b.common.block_id.0.clone(),
"code",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::Table(b) => (
b.common.block_id.0.clone(),
"table",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::ImageRef(b) => (
b.common.block_id.0.clone(),
"imageref",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::AudioRef(b) => (
b.common.block_id.0.clone(),
"audioref",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
};
let payload_json = serde_json::to_string(block).context("serialize block")?;
Ok(BlockRow {
block_id,
doc_id: doc.0.clone(),
kind,
heading_path_json,
ordinal: stream_ordinal,
source_span_json,
payload_json,
})
}

View File

@@ -0,0 +1,20 @@
//! Crate-local error type per design §10.
//!
//! Boundary code (`kb-app`, `kb-cli`) flattens these into `anyhow::Error`,
//! so the trait impls return `anyhow::Result` directly. Internally we
//! still distinguish `Conflict` (e.g. checksum mismatch) from `Sqlx` /
//! `Migration` so callers that downcast can route refusal-style flows.
use thiserror::Error;
#[derive(Debug, Error)]
pub enum StoreError {
#[error("sqlite error: {0}")]
Sqlx(#[from] rusqlite::Error),
#[error("migration error: {0}")]
Migration(String),
#[error("conflict: {0}")]
Conflict(String),
}

View File

@@ -0,0 +1,252 @@
//! `JobRepo` impl per design §7.2.
//!
//! The `jobs` table is the §5.7 schema. JobIds are minted via blake3 over
//! `(now, kind, payload)` so two `create` calls in the same millisecond
//! still distinguish.
use anyhow::{Context, Result};
use rusqlite::params;
use serde_json::Value;
use time::OffsetDateTime;
use crate::error::StoreError;
use crate::store::SqliteStore;
impl kb_core::JobRepo for SqliteStore {
fn create(
&self,
kind: kb_core::JobKind,
payload: Value,
) -> Result<kb_core::JobId> {
let now_dt = OffsetDateTime::now_utc();
let now = now_dt
.format(&time::format_description::well_known::Rfc3339)
.context("format job created_at")?;
// JobId recipe: stable hex over (kind, payload_canonical, ns).
// The nanosecond timestamp is included so two `create` calls with
// identical `(kind, payload)` still get distinct IDs.
let job_id = mint_job_id(&kind, &payload, now_dt);
let kind_label = job_kind_label(&kind);
let payload_json = serde_json::to_string(&payload)
.context("serialize job payload")?;
let conn = self.conn.lock().expect("sqlite mutex poisoned");
conn.execute(
"INSERT INTO jobs (
job_id, kind, status, payload_json, progress_json,
error_json, created_at, updated_at, finished_at
) VALUES (?, ?, 'pending', ?, NULL, NULL, ?, ?, NULL)",
params![job_id.0, kind_label, payload_json, now, now],
)
.map_err(StoreError::from)?;
Ok(job_id)
}
fn update_progress(
&self,
id: &kb_core::JobId,
progress: Value,
) -> Result<()> {
let progress_json = serde_json::to_string(&progress)
.context("serialize job progress")?;
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format job updated_at")?;
let conn = self.conn.lock().expect("sqlite mutex poisoned");
// status='pending' → 'running' on first progress update; later
// progress calls keep status='running' until finish().
conn.execute(
"UPDATE jobs SET
progress_json = ?,
status = CASE status WHEN 'pending' THEN 'running' ELSE status END,
updated_at = ?
WHERE job_id = ?",
params![progress_json, now, id.0],
)
.map_err(StoreError::from)?;
Ok(())
}
fn finish(
&self,
id: &kb_core::JobId,
status: kb_core::JobStatus,
error: Option<&str>,
) -> Result<()> {
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format job finished_at")?;
let status_label = job_status_label(&status);
let error_json = error
.map(|e| serde_json::to_string(&serde_json::json!({ "message": e })))
.transpose()
.context("serialize job error")?;
let conn = self.conn.lock().expect("sqlite mutex poisoned");
conn.execute(
"UPDATE jobs SET
status = ?,
error_json = ?,
updated_at = ?,
finished_at = ?
WHERE job_id = ?",
params![status_label, error_json, now, now, id.0],
)
.map_err(StoreError::from)?;
Ok(())
}
fn list(
&self,
filter: &kb_core::JobFilter,
) -> Result<Vec<kb_core::JobRow>> {
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let mut sql = String::from(
"SELECT job_id, kind, status, payload_json, progress_json,
error_json, created_at, updated_at, finished_at
FROM jobs WHERE 1=1",
);
let mut params_dyn: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(status) = &filter.status {
sql.push_str(" AND status = ?");
params_dyn.push(Box::new(job_status_label(status).to_string()));
}
if let Some(kind) = &filter.kind {
sql.push_str(" AND kind = ?");
params_dyn.push(Box::new(job_kind_label(kind).to_string()));
}
sql.push_str(" ORDER BY created_at ASC");
let mut stmt = conn.prepare(&sql).map_err(StoreError::from)?;
let rows = stmt
.query_map(
rusqlite::params_from_iter(params_dyn.iter().map(|b| b.as_ref())),
job_row_from_sql,
)
.map_err(StoreError::from)?;
let mut out = Vec::new();
for r in rows {
out.push(r.map_err(StoreError::from)?);
}
Ok(out)
}
}
/// Mint a JobId over (kind, canonical(payload), nanos). The 32-hex
/// invariant on `kb_core::JobId` is honored by taking the first 32 chars
/// of the blake3 hex.
fn mint_job_id(
kind: &kb_core::JobKind,
payload: &Value,
at: OffsetDateTime,
) -> kb_core::JobId {
// Plain serde_json::to_vec is enough — JobIds are not part of the
// §4.2 ID family and don't need canonical-JSON parity with other IDs.
// The nanosecond suffix is what guarantees uniqueness, not stable
// hashing.
let mut hasher = blake3::Hasher::new();
hasher.update(job_kind_label(kind).as_bytes());
if let Ok(bytes) = serde_json::to_vec(payload) {
hasher.update(&bytes);
}
hasher.update(&at.unix_timestamp_nanos().to_be_bytes());
let hex = hasher.finalize().to_hex().to_string();
kb_core::JobId(hex[..32].to_string())
}
fn job_kind_label(k: &kb_core::JobKind) -> &'static str {
match k {
kb_core::JobKind::Ingest => "ingest",
kb_core::JobKind::Chunk => "chunk",
kb_core::JobKind::Embed => "embed",
kb_core::JobKind::Ocr => "ocr",
kb_core::JobKind::Transcribe => "transcribe",
kb_core::JobKind::Reindex => "reindex",
kb_core::JobKind::Doctor => "doctor",
}
}
fn job_status_label(s: &kb_core::JobStatus) -> &'static str {
match s {
kb_core::JobStatus::Pending => "pending",
kb_core::JobStatus::Running => "running",
kb_core::JobStatus::Succeeded => "succeeded",
kb_core::JobStatus::Failed => "failed",
kb_core::JobStatus::Canceled => "canceled",
}
}
fn job_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<kb_core::JobRow> {
let job_id: String = row.get(0)?;
let kind_raw: String = row.get(1)?;
let status_raw: String = row.get(2)?;
let payload_json: String = row.get(3)?;
let progress_json: Option<String> = row.get(4)?;
let error_json: Option<String> = row.get(5)?;
let created_at_raw: String = row.get(6)?;
let updated_at_raw: String = row.get(7)?;
let finished_at_raw: Option<String> = row.get(8)?;
let kind: kb_core::JobKind =
serde_json::from_value(serde_json::Value::String(kind_raw))
.map_err(conv_err(1))?;
let status: kb_core::JobStatus =
serde_json::from_value(serde_json::Value::String(status_raw))
.map_err(conv_err(2))?;
let payload: Value = serde_json::from_str(&payload_json).map_err(conv_err(3))?;
let progress: Option<Value> = match progress_json {
Some(s) => Some(serde_json::from_str(&s).map_err(conv_err(4))?),
None => None,
};
// Surface the stored error message back as a plain string per the
// JobRow schema (§7.2). We stored `{"message": "..."}` for forward
// compatibility — pull `message` back out, or fall back to the raw
// JSON if the shape ever drifts.
let error: Option<String> = match error_json {
Some(s) => match serde_json::from_str::<Value>(&s) {
Ok(v) => v
.get("message")
.and_then(Value::as_str)
.map(str::to_owned)
.or(Some(s)),
Err(_) => Some(s),
},
None => None,
};
let created_at = OffsetDateTime::parse(
&created_at_raw,
&time::format_description::well_known::Rfc3339,
)
.map_err(conv_err(6))?;
let updated_at = OffsetDateTime::parse(
&updated_at_raw,
&time::format_description::well_known::Rfc3339,
)
.map_err(conv_err(7))?;
let finished_at = match finished_at_raw {
Some(s) => Some(
OffsetDateTime::parse(&s, &time::format_description::well_known::Rfc3339)
.map_err(conv_err(8))?,
),
None => None,
};
Ok(kb_core::JobRow {
job_id: kb_core::JobId(job_id),
kind,
status,
payload,
progress,
error,
created_at,
updated_at,
finished_at,
})
}
fn conv_err<E: std::error::Error + Send + Sync + 'static>(
col: usize,
) -> impl FnOnce(E) -> rusqlite::Error {
move |e| {
rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e))
}
}

View File

@@ -0,0 +1,23 @@
//! `kb-store-sqlite` — SQLite-backed implementations of
//! [`kb_core::DocumentStore`] and [`kb_core::JobRepo`] (§7.2), plus the
//! asset writer that copies (or references) raw bytes per design §5.2.
//!
//! Schema is owned by `migrations/V001__init.sql` (workspace root), which
//! ships the full §5 DDL minus the FTS5 virtual table + triggers (those
//! land in P2-1's `V002`).
//!
//! Allowed deps per task spec: `kb-core`, `kb-config`, `rusqlite`,
//! `refinery`, `serde_json`, `time`, `blake3`, `tracing`, `anyhow`,
//! `thiserror`. NOT allowed: `kb-parse-*`, `kb-normalize`, `kb-chunk`,
//! `kb-store-vector`, `kb-source-fs`, etc. (`kb-parse-md`, `kb-normalize`,
//! `kb-chunk` may appear as **dev-deps** — see `Cargo.toml` — to drive
//! the contract round-trip test off a real Markdown fixture.)
mod error;
mod schema;
mod store;
mod documents;
mod jobs;
pub use error::StoreError;
pub use store::SqliteStore;

View File

@@ -0,0 +1,14 @@
//! Refinery migration bundle. The migrations live at the workspace
//! `migrations/` directory; refinery's `embed_migrations!` macro inlines
//! them at compile time so the binary needs no runtime SQL files.
// `embed_migrations!` looks under the path relative to the package root
// (the crate's `Cargo.toml`). The workspace migrations dir is two levels
// up: `crates/kb-store-sqlite/Cargo.toml` → `../../migrations`.
refinery::embed_migrations!("../../migrations");
/// Re-export the runner under a stable name. Calling
/// `runner().run(&mut conn)?` applies all pending migrations.
pub fn runner() -> refinery::Runner {
migrations::runner()
}

View File

@@ -0,0 +1,297 @@
//! `SqliteStore` — open + run_migrations + asset writer.
//!
//! The store wraps a single `rusqlite::Connection` behind a
//! `std::sync::Mutex` so the public trait impls (which take `&self`) can
//! still issue mutating SQL. Concurrency is intentionally coarse for P1;
//! later phases can swap to a connection pool if measurement shows the
//! mutex on the hot path.
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use anyhow::{Context, Result};
use rusqlite::Connection;
use crate::error::StoreError;
use crate::schema;
/// Default file name under `config.storage.data_dir`. Kept private — the
/// path layout is a §6.3 design decision, not part of the store's public
/// surface.
const SQLITE_FILE: &str = "kb.sqlite";
/// Subdirectory under `data_dir` holding shard-prefixed asset bytes
/// (`<aa>/<asset_id>`). Mirrors design §6.3.
const ASSETS_SUBDIR: &str = "assets";
/// Length of the shard prefix: 2 hex chars → 256 buckets, plenty to keep
/// directory cardinality reasonable on workspaces with thousands of
/// assets without descending into hash-trees.
const ASSET_SHARD_LEN: usize = 2;
/// Bytes-per-MiB conversion. Used by the asset writer to compare
/// `byte_len` against `storage.copy_threshold_mb`.
const BYTES_PER_MIB: u64 = 1024 * 1024;
/// SQLite-backed kb store.
///
/// Construct via [`SqliteStore::open`], then call
/// [`SqliteStore::run_migrations`] to apply the bundled `V001__init.sql`
/// before any read/write call.
pub struct SqliteStore {
/// Resolved absolute path to `data_dir`. Used by the asset writer.
pub(crate) data_dir: PathBuf,
/// Maximum asset size eligible for in-store copy; assets bigger than
/// this are recorded as `reference` and read from their source path.
pub(crate) copy_threshold_bytes: u64,
/// Single mutexed connection — see module docs for rationale.
pub(crate) conn: Mutex<Connection>,
}
impl SqliteStore {
/// Open (or create) the SQLite file under `config.storage.data_dir`,
/// apply pragmas (foreign_keys / WAL / synchronous=NORMAL /
/// temp_store=MEMORY), and create parent directories as needed.
/// **Does not run migrations** — call [`Self::run_migrations`] next.
pub fn open(config: &kb_config::Config) -> Result<Self> {
let data_dir = expand_data_dir(&config.storage.data_dir);
std::fs::create_dir_all(&data_dir)
.with_context(|| format!("create data_dir {}", data_dir.display()))?;
let db_path = data_dir.join(SQLITE_FILE);
let conn = Connection::open(&db_path)
.with_context(|| format!("open sqlite at {}", db_path.display()))?;
apply_pragmas(&conn)?;
tracing::debug!(
target: "kb-store-sqlite",
data_dir = %data_dir.display(),
db = %db_path.display(),
"opened sqlite store"
);
Ok(Self {
data_dir,
copy_threshold_bytes: config.storage.copy_threshold_mb * BYTES_PER_MIB,
conn: Mutex::new(conn),
})
}
/// Apply all pending migrations bundled at compile time
/// (`migrations/V001__init.sql` and any later additions).
pub fn run_migrations(&self) -> Result<()> {
let mut conn = self.conn.lock().expect("sqlite mutex poisoned");
schema::runner()
.run(&mut *conn)
.map_err(|e| StoreError::Migration(e.to_string()))?;
tracing::debug!(target: "kb-store-sqlite", "migrations applied");
Ok(())
}
/// Persist a `RawAsset` *with its raw bytes*: row goes into `assets`,
/// bytes go to `data_dir/assets/<aa>/<asset_id>` if `byte_len ≤
/// copy_threshold_mb`, otherwise the row records the source URI's
/// path and no copy is performed.
///
/// In either branch, `blake3(bytes)` is recomputed and compared to
/// `asset.checksum.0`. A mismatch returns
/// `StoreError::Conflict` wrapped in `anyhow::Error`.
pub fn put_asset_with_bytes(
&self,
asset: &kb_core::RawAsset,
bytes: &[u8],
) -> Result<()> {
// 1. Verify the caller's checksum matches what's actually on the
// wire. A drift here means the bytes the parser saw and the bytes
// we're about to durably store disagree — refuse persistence.
let computed = blake3::hash(bytes).to_hex().to_string();
if computed != asset.checksum.0 {
return Err(StoreError::Conflict(format!(
"checksum mismatch: asset {} declares {} but bytes hash to {}",
asset.asset_id.0, asset.checksum.0, computed
))
.into());
}
// 2. Decide copy vs. reference. The threshold compares the
// declared `byte_len` (caller-vouched) rather than `bytes.len()`
// because some sources stream and `byte_len` is authoritative.
let (storage_kind, storage_path) = if asset.byte_len <= self.copy_threshold_bytes {
let dest = self.assets_path_for(&asset.asset_id);
if let Some(parent) = dest.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("create asset shard dir {}", parent.display())
})?;
}
std::fs::write(&dest, bytes)
.with_context(|| format!("write asset bytes to {}", dest.display()))?;
// Mirror §6.6: files 0o644.
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&dest)?.permissions();
perms.set_mode(0o644);
std::fs::set_permissions(&dest, perms).with_context(|| {
format!("chmod 0o644 on {}", dest.display())
})?;
}
("copied", dest.to_string_lossy().into_owned())
} else {
// Reference: caller's source path is recorded verbatim. We
// accept either a `File(path)` or `Kb(uri)` SourceUri; the
// latter stores the raw `kb://...` string.
let path = match &asset.source_uri {
kb_core::SourceUri::File(p) => p.to_string_lossy().into_owned(),
kb_core::SourceUri::Kb(u) => u.clone(),
};
("reference", path)
};
// 3. UPSERT the assets row. A second `put_asset_with_bytes` for
// the same asset_id (e.g. re-ingest) overwrites in place — the
// row is uniquely keyed by asset_id and re-derived from the
// RawAsset every time.
let conn = self.conn.lock().expect("sqlite mutex poisoned");
upsert_asset_row(&conn, asset, storage_kind, &storage_path)?;
Ok(())
}
/// Compute the `data_dir/assets/<aa>/<asset_id>` path for an asset.
/// `<aa>` is the first [`ASSET_SHARD_LEN`] hex chars of `asset_id`.
pub(crate) fn assets_path_for(&self, asset_id: &kb_core::AssetId) -> PathBuf {
let id = &asset_id.0;
// Defensive: kb-core enforces 32 hex chars on AssetId construction
// (`FromStr` validates). If a future code path bypasses that, we
// fall back to the full id as the shard so we never panic on
// slicing.
let shard = if id.len() >= ASSET_SHARD_LEN {
&id[..ASSET_SHARD_LEN]
} else {
id.as_str()
};
self.data_dir.join(ASSETS_SUBDIR).join(shard).join(id)
}
}
/// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes`
/// path (which has bytes + computed `storage_kind/path`) and the
/// `DocumentStore::put_asset` path (which only has the `RawAsset` and
/// reads `storage_kind/path` from `asset.stored`).
pub(crate) fn upsert_asset_row(
conn: &Connection,
asset: &kb_core::RawAsset,
storage_kind: &str,
storage_path: &str,
) -> Result<()> {
let source_uri = match &asset.source_uri {
kb_core::SourceUri::File(p) => format!("file://{}", p.to_string_lossy()),
kb_core::SourceUri::Kb(u) => u.clone(),
};
let media_type = serde_json::to_string(&asset.media_type)
.context("serialize media_type")?;
let discovered_at = asset
.discovered_at
.format(&time::format_description::well_known::Rfc3339)
.context("format discovered_at")?;
conn.execute(
"INSERT INTO assets (
asset_id, source_uri, workspace_path, media_type, byte_len,
checksum, storage_kind, storage_path, discovered_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(asset_id) DO UPDATE SET
source_uri = excluded.source_uri,
workspace_path = excluded.workspace_path,
media_type = excluded.media_type,
byte_len = excluded.byte_len,
checksum = excluded.checksum,
storage_kind = excluded.storage_kind,
storage_path = excluded.storage_path,
discovered_at = excluded.discovered_at",
rusqlite::params![
asset.asset_id.0,
source_uri,
asset.workspace_path.0,
media_type,
asset.byte_len as i64,
asset.checksum.0,
storage_kind,
storage_path,
discovered_at,
],
)
.map_err(StoreError::from)?;
Ok(())
}
/// Apply the design §5 / task-spec pragmas. Called once per connection.
/// Note: WAL is persistent (the journal-mode setting is sticky in the DB
/// header) but `foreign_keys`, `synchronous`, and `temp_store` are
/// per-connection — they MUST be re-applied on every open.
fn apply_pragmas(conn: &Connection) -> Result<()> {
conn.pragma_update(None, "foreign_keys", "ON")?;
// `journal_mode = WAL` returns the current mode as a row; use
// `pragma_query_value` semantics via `query_row` to allow that.
conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "temp_store", "MEMORY")?;
Ok(())
}
/// Expand the placeholders / `~` / env-vars used by `Config::storage.data_dir`.
///
/// Supported substitutions, in order:
/// - `${XDG_DATA_HOME:-~/.local/share}` (and the bare `${XDG_DATA_HOME}`)
/// - leading `~` → `$HOME`
///
/// If neither produces an absolute path, the input is returned as-is
/// (relative paths are kept relative to the caller's CWD).
fn expand_data_dir(raw: &str) -> PathBuf {
let mut s = raw.to_string();
// ${XDG_DATA_HOME:-~/.local/share}: respect the env override, else
// fall back to the suffix after `:-`.
if let Some(start) = s.find("${XDG_DATA_HOME") {
if let Some(rel_end) = s[start..].find('}') {
let end = start + rel_end + 1; // include trailing '}'
let inner = &s[start + 2..end - 1]; // strip ${ and }
let replacement = match std::env::var("XDG_DATA_HOME") {
Ok(v) if !v.is_empty() => v,
_ => {
// inner is e.g. `XDG_DATA_HOME:-~/.local/share`.
if let Some((_, default)) = inner.split_once(":-") {
default.to_string()
} else {
// No default supplied; mimic Bash and yield "".
String::new()
}
}
};
s.replace_range(start..end, &replacement);
}
}
// ~ at the front → $HOME (or `dirs::home_dir`).
if let Some(rest) = s.strip_prefix('~') {
if let Some(home) = std::env::var_os("HOME").map(PathBuf::from).or_else(dirs_home_fallback)
{
return home.join(rest.trim_start_matches('/'));
}
}
PathBuf::from(s)
}
/// Tiny shim to avoid pulling in the `dirs` crate as a direct dep — we
/// only fall back when `$HOME` is unset, which is exotic on the platforms
/// we target. Returns `None` so the caller keeps the literal `~`.
fn dirs_home_fallback() -> Option<PathBuf> {
None
}
/// Returns the root of the assets shard tree (`data_dir/assets/`). Used
/// by tests; kept crate-private otherwise.
#[allow(dead_code)]
pub(crate) fn assets_root(data_dir: &Path) -> PathBuf {
data_dir.join(ASSETS_SUBDIR)
}

View File

@@ -1,7 +1,10 @@
-- V001__init.sql — schema bootstrap.
-- Per design §5.1 + §5.9. Only the meta + migrations tables land here;
-- data tables (assets, documents, blocks, chunks, fts5, …) ship in later
-- phase-specific migrations (P1-6 / P2-1 / P3-3).
-- V001__init.sql — full P1 schema bootstrap.
-- Per design §5.1 (meta), §5.2 (assets), §5.3 (documents/document_tags),
-- §5.4 (blocks), §5.5 (chunks — FTS5 virtual table + triggers DEFERRED to
-- V002 in P2-1), §5.6 (embedding_records), §5.7 (jobs / ingest_runs /
-- answers / eval_runs / eval_query_results).
-- §5.1 Migrations meta -------------------------------------------------------
CREATE TABLE schema_meta (
key TEXT PRIMARY KEY,
@@ -13,3 +16,167 @@ CREATE TABLE migrations (
applied_at TEXT NOT NULL,
description TEXT NOT NULL
);
-- §5.2 Assets ----------------------------------------------------------------
CREATE TABLE assets (
asset_id TEXT PRIMARY KEY,
source_uri TEXT NOT NULL,
workspace_path TEXT NOT NULL,
media_type TEXT NOT NULL,
byte_len INTEGER NOT NULL,
checksum TEXT NOT NULL,
storage_kind TEXT NOT NULL CHECK (storage_kind IN ('copied','reference')),
storage_path TEXT NOT NULL,
discovered_at TEXT NOT NULL
);
CREATE UNIQUE INDEX idx_assets_workspace_path ON assets(workspace_path);
CREATE INDEX idx_assets_media_type ON assets(media_type);
-- §5.3 Documents -------------------------------------------------------------
CREATE TABLE documents (
doc_id TEXT PRIMARY KEY,
asset_id TEXT NOT NULL REFERENCES assets(asset_id) ON DELETE RESTRICT,
workspace_path TEXT NOT NULL,
title TEXT,
lang TEXT,
source_type TEXT NOT NULL,
trust_level TEXT NOT NULL,
parser_version TEXT NOT NULL,
doc_version INTEGER NOT NULL,
schema_version INTEGER NOT NULL,
metadata_json TEXT NOT NULL,
provenance_json TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE UNIQUE INDEX idx_docs_workspace_path ON documents(workspace_path);
CREATE INDEX idx_docs_lang ON documents(lang);
CREATE INDEX idx_docs_source_type ON documents(source_type);
CREATE TABLE document_tags (
doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE,
tag TEXT NOT NULL,
PRIMARY KEY (doc_id, tag)
);
CREATE INDEX idx_document_tags_tag ON document_tags(tag);
-- §5.4 Blocks ----------------------------------------------------------------
CREATE TABLE blocks (
block_id TEXT PRIMARY KEY,
doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE,
kind TEXT NOT NULL,
heading_path_json TEXT NOT NULL,
ordinal INTEGER NOT NULL,
source_span_json TEXT NOT NULL,
payload_json TEXT NOT NULL
);
CREATE INDEX idx_blocks_doc_id ON blocks(doc_id);
-- §5.5 Chunks (FTS5 virtual table + triggers deferred to V002 / P2-1) -------
CREATE TABLE chunks (
chunk_id TEXT PRIMARY KEY,
doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE,
text TEXT NOT NULL,
heading_path_json TEXT NOT NULL,
section_label TEXT,
source_spans_json TEXT NOT NULL,
token_estimate INTEGER NOT NULL,
chunker_version TEXT NOT NULL,
policy_hash TEXT NOT NULL,
block_ids_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX idx_chunks_doc_id ON chunks(doc_id);
CREATE INDEX idx_chunks_chunker_version ON chunks(chunker_version);
-- §5.6 Embedding records (P3 — table empty in P1, present for forward compat) -
CREATE TABLE embedding_records (
embedding_id TEXT PRIMARY KEY,
chunk_id TEXT NOT NULL REFERENCES chunks(chunk_id) ON DELETE CASCADE,
model_id TEXT NOT NULL,
model_version TEXT NOT NULL,
dimensions INTEGER NOT NULL,
lance_table TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(chunk_id, model_id, model_version, dimensions)
);
CREATE INDEX idx_embed_chunk ON embedding_records(chunk_id);
CREATE INDEX idx_embed_model ON embedding_records(model_id, model_version, dimensions);
-- §5.7 Jobs / IngestRuns / Answers / EvalRuns -------------------------------
CREATE TABLE jobs (
job_id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('pending','running','succeeded','failed','canceled')),
payload_json TEXT NOT NULL,
progress_json TEXT,
error_json TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
finished_at TEXT
);
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_kind ON jobs(kind);
CREATE TABLE ingest_runs (
run_id TEXT PRIMARY KEY,
scope_json TEXT NOT NULL,
scanned INTEGER NOT NULL,
new_count INTEGER NOT NULL,
updated_count INTEGER NOT NULL,
skipped_count INTEGER NOT NULL,
error_count INTEGER NOT NULL,
duration_ms INTEGER NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT NOT NULL,
items_json TEXT
);
CREATE TABLE answers (
trace_id TEXT PRIMARY KEY,
query TEXT NOT NULL,
answer TEXT NOT NULL,
grounded INTEGER NOT NULL,
refusal_reason TEXT,
model_id TEXT NOT NULL,
model_provider TEXT NOT NULL,
embedding_model_id TEXT,
embedding_dimensions INTEGER,
prompt_template_version TEXT NOT NULL,
retrieval_mode TEXT NOT NULL,
retrieval_k INTEGER NOT NULL,
score_gate REAL NOT NULL,
top_score REAL NOT NULL,
chunks_returned INTEGER NOT NULL,
chunks_used INTEGER NOT NULL,
citations_json TEXT NOT NULL,
packed_chunks_json TEXT,
prompt_tokens INTEGER,
completion_tokens INTEGER,
latency_ms INTEGER,
created_at TEXT NOT NULL
);
CREATE INDEX idx_answers_created_at ON answers(created_at);
CREATE INDEX idx_answers_grounded ON answers(grounded);
CREATE TABLE eval_runs (
run_id TEXT PRIMARY KEY,
suite TEXT NOT NULL,
config_snapshot_json TEXT NOT NULL,
aggregate_json TEXT NOT NULL,
commit_hash TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE eval_query_results (
run_id TEXT NOT NULL REFERENCES eval_runs(run_id) ON DELETE CASCADE,
query_id TEXT NOT NULL,
result_json TEXT NOT NULL,
PRIMARY KEY (run_id, query_id)
);