Merge pull request 'feat(p1-6): kb-store-sqlite — P1 terminal task' (#11) from feat/p1-6-store-sqlite into main

Reviewed-on: altair823-org/kb#11
This commit was merged in pull request #11.
This commit is contained in:
2026-04-30 17:47:45 +00:00
22 changed files with 3155 additions and 4 deletions

451
Cargo.lock generated
View File

@@ -2,6 +2,18 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@@ -79,6 +91,17 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.5.0"
@@ -286,6 +309,17 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "either"
version = "1.15.0"
@@ -308,6 +342,18 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "2.4.1"
@@ -329,6 +375,15 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf"
dependencies = [
"percent-encoding",
]
[[package]]
name = "fst"
version = "0.4.7"
@@ -415,6 +470,9 @@ name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
@@ -431,18 +489,129 @@ version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51"
[[package]]
name = "hashlink"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "icu_collections"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43"
dependencies = [
"displaydoc",
"potential_utf",
"yoke",
"zerofrom",
"zerovec",
]
[[package]]
name = "icu_locale_core"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6"
dependencies = [
"displaydoc",
"litemap",
"tinystr",
"writeable",
"zerovec",
]
[[package]]
name = "icu_normalizer"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599"
dependencies = [
"icu_collections",
"icu_normalizer_data",
"icu_properties",
"icu_provider",
"smallvec",
"zerovec",
]
[[package]]
name = "icu_normalizer_data"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a"
[[package]]
name = "icu_properties"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec"
dependencies = [
"icu_collections",
"icu_locale_core",
"icu_properties_data",
"icu_provider",
"zerotrie",
"zerovec",
]
[[package]]
name = "icu_properties_data"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af"
[[package]]
name = "icu_provider"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614"
dependencies = [
"displaydoc",
"icu_locale_core",
"writeable",
"yoke",
"zerofrom",
"zerotrie",
"zerovec",
]
[[package]]
name = "id-arena"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954"
[[package]]
name = "idna"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de"
dependencies = [
"idna_adapter",
"smallvec",
"utf8_iter",
]
[[package]]
name = "idna_adapter"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344"
dependencies = [
"icu_normalizer",
"icu_properties",
]
[[package]]
name = "ignore"
version = "0.4.25"
@@ -649,6 +818,26 @@ dependencies = [
"walkdir",
]
[[package]]
name = "kb-store-sqlite"
version = "0.1.0"
dependencies = [
"anyhow",
"blake3",
"kb-chunk",
"kb-config",
"kb-core",
"kb-normalize",
"kb-parse-md",
"refinery",
"rusqlite",
"serde_json",
"tempfile",
"thiserror 2.0.18",
"time",
"tracing",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -676,6 +865,17 @@ dependencies = [
"libc",
]
[[package]]
name = "libsqlite3-sys"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "lingua"
version = "1.8.0"
@@ -744,6 +944,12 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]]
name = "litemap"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0"
[[package]]
name = "lock_api"
version = "0.4.14"
@@ -835,12 +1041,33 @@ dependencies = [
"windows-link",
]
[[package]]
name = "percent-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "pin-project-lite"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pkg-config"
version = "0.3.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
[[package]]
name = "potential_utf"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564"
dependencies = [
"zerovec",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -938,6 +1165,50 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "refinery"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ba5d693abf62492c37268512ff35b77655d2e957ca53dab85bf993fe9172d15"
dependencies = [
"refinery-core",
"refinery-macros",
]
[[package]]
name = "refinery-core"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a83581f18c1a4c3a6ebd7a174bdc665f17f618d79f7edccb6a0ac67e660b319"
dependencies = [
"async-trait",
"cfg-if",
"log",
"regex",
"rusqlite",
"serde",
"siphasher",
"thiserror 1.0.69",
"time",
"toml",
"url",
"walkdir",
]
[[package]]
name = "refinery-macros"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72c225407d8e52ef8cf094393781ecda9a99d6544ec28d90a6915751de259264"
dependencies = [
"heck",
"proc-macro2",
"quote",
"refinery-core",
"regex",
"syn",
]
[[package]]
name = "regex"
version = "1.12.3"
@@ -967,6 +1238,20 @@ version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
name = "rusqlite"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e"
dependencies = [
"bitflags",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"smallvec",
]
[[package]]
name = "rustix"
version = "1.1.4"
@@ -1121,6 +1406,12 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "siphasher"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e"
[[package]]
name = "slab"
version = "0.4.12"
@@ -1133,6 +1424,12 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "stable_deref_trait"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
[[package]]
name = "strsim"
version = "0.11.1"
@@ -1174,6 +1471,17 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synstructure"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tempfile"
version = "3.27.0"
@@ -1267,6 +1575,16 @@ dependencies = [
"time-core",
]
[[package]]
name = "tinystr"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d"
dependencies = [
"displaydoc",
"zerovec",
]
[[package]]
name = "tinyvec"
version = "1.11.0"
@@ -1443,6 +1761,24 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]]
name = "url"
version = "2.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
"serde",
]
[[package]]
name = "utf8_iter"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
@@ -1455,6 +1791,18 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
@@ -1761,6 +2109,109 @@ dependencies = [
"wasmparser",
]
[[package]]
name = "writeable"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4"
[[package]]
name = "yoke"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca"
dependencies = [
"stable_deref_trait",
"yoke-derive",
"zerofrom",
]
[[package]]
name = "yoke-derive"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "zerocopy"
version = "0.8.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zerofrom"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "zerotrie"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf"
dependencies = [
"displaydoc",
"yoke",
"zerofrom",
]
[[package]]
name = "zerovec"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239"
dependencies = [
"yoke",
"zerofrom",
"zerovec-derive",
]
[[package]]
name = "zerovec-derive"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zmij"
version = "1.0.21"

View File

@@ -8,6 +8,7 @@ members = [
"crates/kb-parse-md",
"crates/kb-normalize",
"crates/kb-chunk",
"crates/kb-store-sqlite",
"crates/kb-app",
"crates/kb-cli",
]

View File

@@ -355,6 +355,7 @@ fn build_chunk(
source_spans,
token_estimate,
chunker_version: chunker_version.clone(),
policy_hash: policy_hash.to_string(),
}
}

View File

@@ -6,6 +6,12 @@ use crate::document::SourceSpan;
use crate::ids::{BlockId, ChunkId, DocumentId};
use crate::versions::ChunkerVersion;
/// A unit of retrievable text per design §3.5 + §5.5.
///
/// `policy_hash` is the chunker's hex digest of the active `ChunkPolicy`
/// (e.g. `target_tokens`, `overlap_tokens`). It mirrors the §5.5 SQLite
/// schema column so persistence is a straight copy, and feeds the
/// `chunk_id` recipe (§4.2) so policy edits invalidate downstream IDs.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Chunk {
pub chunk_id: ChunkId,
@@ -16,4 +22,5 @@ pub struct Chunk {
pub source_spans: Vec<SourceSpan>,
pub token_estimate: usize,
pub chunker_version: ChunkerVersion,
pub policy_hash: String,
}

View File

@@ -0,0 +1,34 @@
[package]
name = "kb-store-sqlite"
version = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
description = "SQLite-backed DocumentStore + JobRepo for kb (§5 DDL, §7.2 traits)"
[dependencies]
kb-core = { path = "../kb-core" }
kb-config = { path = "../kb-config" }
# `bundled` ships SQLite source + builds in-tree (no system libsqlite3).
# Explicitly NOT `bundled-sqlcipher` per task allowed-deps list.
rusqlite = { version = "0.32", features = ["bundled"] }
refinery = { version = "0.8", features = ["rusqlite"] }
serde_json = { workspace = true }
time = { workspace = true }
blake3 = { workspace = true }
tracing = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
[dev-dependencies]
tempfile = "3"
serde_json = { workspace = true }
# kb-parse-md / kb-normalize / kb-chunk are dev-only — used to build a
# CanonicalDocument + Vec<Chunk> from a fixture in the contract round-trip
# test. Forbidden as regular deps per design §8 (store consumes domain
# types from kb-core only); `cargo tree -p kb-store-sqlite --depth 1`
# (default scope, excludes dev-deps) confirms this.
kb-parse-md = { path = "../kb-parse-md" }
kb-normalize = { path = "../kb-normalize" }
kb-chunk = { path = "../kb-chunk" }

View File

@@ -0,0 +1,47 @@
{
"duration_ms": 187,
"errors": 0,
"items": [
{
"asset_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"block_count": 7,
"byte_len": 1234,
"chunk_count": 3,
"chunker_version": "md-heading-v1",
"doc_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"doc_path": "notes/alpha.md",
"error": null,
"kind": "new",
"parser_version": "pulldown-cmark-0.x",
"warnings": []
},
{
"asset_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
"block_count": 12,
"byte_len": 2048,
"chunk_count": 5,
"chunker_version": "md-heading-v1",
"doc_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
"doc_path": "notes/beta.md",
"error": null,
"kind": "updated",
"parser_version": "pulldown-cmark-0.x",
"warnings": [
"malformed frontmatter"
]
}
],
"new": 2,
"scanned": 3,
"scope": {
"exclude": [
".git/**"
],
"include": [
"**/*.md"
],
"root": "/home/u/KB"
},
"skipped": 0,
"updated": 1
}

View File

@@ -0,0 +1,654 @@
//! `DocumentStore` impl: assets, documents, document_tags, blocks, chunks.
//!
//! Transactions: per design §5.8, one ingest of one document is one
//! transaction. We expose the raw trait methods at fine granularity (so
//! `kb-app` can compose), and each one wraps its own short transaction.
//! A higher-level `ingest_document` helper that wraps put_document +
//! put_blocks + put_chunks in a single tx is intentionally NOT shipped in
//! P1-6 — `kb-app` (P1's caller layer) is the right place to compose.
//!
//! Idempotency: re-ingesting `(workspace_path, asset_id, parser_version)`
//! UPSERTs the documents row, bumps `doc_version`, and replaces all
//! blocks / chunks / document_tags. No row duplication.
use anyhow::{Context, Result};
use rusqlite::params;
use time::OffsetDateTime;
use crate::error::StoreError;
use crate::store::{SqliteStore, upsert_asset_row, validate_asset_id};
impl kb_core::DocumentStore for SqliteStore {
fn put_asset(&self, asset: &kb_core::RawAsset) -> Result<()> {
// Validate the AssetId shape before any row work — defense in
// depth against hand-constructed `kb_core::AssetId` values that
// bypass `FromStr`. See `validate_asset_id` for rationale.
validate_asset_id(&asset.asset_id)?;
// No bytes here — read storage_kind/storage_path from the
// RawAsset's `stored` field per its convention (§3.3). Callers
// that have raw bytes go through `put_asset_with_bytes` instead;
// this branch is for the case where bytes were already persisted
// (or referenced) and we just want to record the row.
let (storage_kind, storage_path) = match &asset.stored {
kb_core::AssetStorage::Copied { path } => {
("copied", path.to_string_lossy().into_owned())
}
kb_core::AssetStorage::Reference { path, .. } => {
("reference", path.to_string_lossy().into_owned())
}
};
let conn = self.lock_conn();
upsert_asset_row(&conn, asset, storage_kind, &storage_path)
}
fn put_document(&self, doc: &kb_core::CanonicalDocument) -> Result<()> {
let mut conn = self.lock_conn();
let tx = conn.transaction().map_err(StoreError::from)?;
upsert_document(&tx, doc)?;
replace_document_tags(&tx, &doc.doc_id, &doc.metadata.tags)?;
tx.commit().map_err(StoreError::from)?;
Ok(())
}
fn put_blocks(
&self,
doc: &kb_core::DocumentId,
blocks: &[kb_core::Block],
) -> Result<()> {
let mut conn = self.lock_conn();
let tx = conn.transaction().map_err(StoreError::from)?;
// DELETE-then-INSERT: §5.4 has no UNIQUE on (doc_id, ordinal)
// so we cannot rely on UPSERT to surface block_id collisions. The
// simplest correct path is to wipe and re-insert; the §5.8
// per-document transaction wraps both halves so a partial state
// never lands.
tx.execute("DELETE FROM blocks WHERE doc_id = ?", params![doc.0])
.map_err(StoreError::from)?;
let mut stmt = tx
.prepare(
"INSERT INTO blocks (
block_id, doc_id, kind, heading_path_json,
ordinal, source_span_json, payload_json
) VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.map_err(StoreError::from)?;
// Ordinal here is the position of the block in the document's
// overall block stream — used for sort-on-load, not the §4.3
// (heading_path, kind)-scoped ordinal that fed `block_id`.
for (i, block) in blocks.iter().enumerate() {
let row = block_to_row(doc, block, i as i64)?;
stmt.execute(params![
row.block_id,
row.doc_id,
row.kind,
row.heading_path_json,
row.ordinal,
row.source_span_json,
row.payload_json,
])
.map_err(StoreError::from)?;
}
drop(stmt);
tx.commit().map_err(StoreError::from)?;
Ok(())
}
fn put_chunks(
&self,
doc: &kb_core::DocumentId,
chunks: &[kb_core::Chunk],
) -> Result<()> {
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format chunk created_at")?;
let mut conn = self.lock_conn();
let tx = conn.transaction().map_err(StoreError::from)?;
tx.execute("DELETE FROM chunks WHERE doc_id = ?", params![doc.0])
.map_err(StoreError::from)?;
let mut stmt = tx
.prepare(
"INSERT INTO chunks (
chunk_id, doc_id, text, heading_path_json,
section_label, source_spans_json, token_estimate,
chunker_version, policy_hash, block_ids_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.map_err(StoreError::from)?;
for chunk in chunks {
let heading_path = serde_json::to_string(&chunk.heading_path)
.context("serialize chunk.heading_path")?;
let source_spans = serde_json::to_string(&chunk.source_spans)
.context("serialize chunk.source_spans")?;
let block_ids = serde_json::to_string(&chunk.block_ids)
.context("serialize chunk.block_ids")?;
// §5.5 has a `section_label` column but the in-memory Chunk
// struct does not carry it (nor does the wire schema §2.6).
// Persist NULL until a future bump introduces the field.
// TODO(P2/P3): populate `section_label` once Chunk and the
// wire schema gain the field; until then NULL is the correct
// canonical value.
stmt.execute(params![
chunk.chunk_id.0,
chunk.doc_id.0,
chunk.text,
heading_path,
Option::<String>::None,
source_spans,
chunk.token_estimate as i64,
chunk.chunker_version.0,
chunk.policy_hash,
block_ids,
now,
])
.map_err(StoreError::from)?;
}
drop(stmt);
tx.commit().map_err(StoreError::from)?;
Ok(())
}
fn get_document(
&self,
id: &kb_core::DocumentId,
) -> Result<Option<kb_core::CanonicalDocument>> {
let conn = self.lock_conn();
let row: Option<DocumentRow> = conn
.query_row(
"SELECT
doc_id, asset_id, workspace_path, title, lang,
source_type, trust_level, parser_version,
doc_version, schema_version, metadata_json,
provenance_json, created_at, updated_at
FROM documents WHERE doc_id = ?",
params![id.0],
document_row_from_sql,
)
.map(Some)
.or_else(rows_optional)
.map_err(StoreError::from)?;
let Some(row) = row else { return Ok(None) };
// Rehydrate blocks. Sort by stream-ordinal so the returned
// CanonicalDocument matches the order originally persisted.
let mut blocks_stmt = conn
.prepare(
"SELECT payload_json FROM blocks
WHERE doc_id = ? ORDER BY ordinal ASC",
)
.map_err(StoreError::from)?;
let block_rows = blocks_stmt
.query_map(params![id.0], |r| {
let payload_json: String = r.get(0)?;
Ok(payload_json)
})
.map_err(StoreError::from)?;
let mut blocks: Vec<kb_core::Block> = Vec::new();
for row in block_rows {
let payload_json = row.map_err(StoreError::from)?;
let block: kb_core::Block = serde_json::from_str(&payload_json)
.context("deserialize block payload_json")?;
blocks.push(block);
}
let metadata: kb_core::Metadata = serde_json::from_str(&row.metadata_json)
.context("deserialize metadata_json")?;
let provenance: kb_core::Provenance =
serde_json::from_str(&row.provenance_json)
.context("deserialize provenance_json")?;
Ok(Some(kb_core::CanonicalDocument {
doc_id: kb_core::DocumentId(row.doc_id),
source_asset_id: kb_core::AssetId(row.asset_id),
workspace_path: kb_core::WorkspacePath(row.workspace_path),
title: row.title.unwrap_or_default(),
lang: kb_core::Lang(row.lang.unwrap_or_default()),
blocks,
metadata,
provenance,
parser_version: kb_core::ParserVersion(row.parser_version),
// INVARIANT: `doc_version` is bumped by 1 on every re-ingest
// (see `upsert_document`). The column is INTEGER (i64) but
// CanonicalDocument carries u32; an overflow would require
// 2^32 re-ingests of the same document, which is well beyond
// any realistic ingest frequency. Truncating cast is safe
// under that invariant.
schema_version: row.schema_version as u32,
doc_version: row.doc_version as u32,
}))
}
fn get_chunk(&self, id: &kb_core::ChunkId) -> Result<Option<kb_core::Chunk>> {
let conn = self.lock_conn();
let row = conn
.query_row(
"SELECT
chunk_id, doc_id, text, heading_path_json,
source_spans_json, token_estimate, chunker_version,
policy_hash, block_ids_json
FROM chunks WHERE chunk_id = ?",
params![id.0],
chunk_row_from_sql,
)
.map(Some)
.or_else(rows_optional)
.map_err(StoreError::from)?;
let Some(row) = row else { return Ok(None) };
let heading_path: Vec<String> = serde_json::from_str(&row.heading_path_json)
.context("deserialize chunk.heading_path_json")?;
let source_spans: Vec<kb_core::SourceSpan> =
serde_json::from_str(&row.source_spans_json)
.context("deserialize chunk.source_spans_json")?;
let block_ids: Vec<kb_core::BlockId> =
serde_json::from_str(&row.block_ids_json)
.context("deserialize chunk.block_ids_json")?;
Ok(Some(kb_core::Chunk {
chunk_id: kb_core::ChunkId(row.chunk_id),
doc_id: kb_core::DocumentId(row.doc_id),
block_ids,
text: row.text,
heading_path,
source_spans,
token_estimate: row.token_estimate as usize,
chunker_version: kb_core::ChunkerVersion(row.chunker_version),
policy_hash: row.policy_hash,
}))
}
fn list_documents(
&self,
filter: &kb_core::DocFilter,
) -> Result<Vec<kb_core::DocSummary>> {
// Build a dynamic WHERE clause from the filter. Each condition
// appends one positional `?` placeholder and one `Box<dyn
// ToSql>` to `params` so order stays in sync.
let conn = self.lock_conn();
let mut sql = String::from(
"SELECT d.doc_id, d.workspace_path, d.title, d.lang,
d.source_type, d.trust_level, d.parser_version,
d.created_at, d.updated_at,
a.byte_len,
(SELECT COUNT(*) FROM chunks c WHERE c.doc_id = d.doc_id) AS chunk_count,
-- chunker_version: assume one chunker per doc; pick
-- any row's value. NULL if no chunks yet.
(SELECT chunker_version FROM chunks c2
WHERE c2.doc_id = d.doc_id LIMIT 1) AS chunker_version
FROM documents d
JOIN assets a ON a.asset_id = d.asset_id
WHERE 1=1",
);
let mut params_dyn: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(lang) = &filter.lang {
sql.push_str(" AND d.lang = ?");
params_dyn.push(Box::new(lang.0.clone()));
}
if let Some(trust_min) = &filter.trust_min {
// Map the enum to its rank: Generated < Secondary < Primary.
// (Higher trust strictly contains lower trust.)
sql.push_str(" AND CASE d.trust_level
WHEN 'primary' THEN 3
WHEN 'secondary' THEN 2
WHEN 'generated' THEN 1
ELSE 0
END >= ?");
let rank: i64 = match trust_min {
kb_core::TrustLevel::Primary => 3,
kb_core::TrustLevel::Secondary => 2,
kb_core::TrustLevel::Generated => 1,
};
params_dyn.push(Box::new(rank));
}
if let Some(glob) = &filter.path_glob {
sql.push_str(" AND d.workspace_path GLOB ?");
params_dyn.push(Box::new(glob.clone()));
}
if !filter.tags_any.is_empty() {
// INTERSECT-style filter: doc must own at least one of the
// requested tags. Use IN with a placeholder list.
sql.push_str(" AND d.doc_id IN (SELECT doc_id FROM document_tags WHERE tag IN (");
for (i, tag) in filter.tags_any.iter().enumerate() {
if i > 0 {
sql.push(',');
}
sql.push('?');
params_dyn.push(Box::new(tag.clone()));
}
sql.push_str("))");
}
sql.push_str(" ORDER BY d.workspace_path");
let mut stmt = conn.prepare(&sql).map_err(StoreError::from)?;
let rows = stmt
.query_map(
rusqlite::params_from_iter(params_dyn.iter().map(|b| b.as_ref())),
doc_summary_from_sql,
)
.map_err(StoreError::from)?;
let mut out = Vec::new();
for r in rows {
let summary = r.map_err(StoreError::from)?;
// tags filter at row-load time: pull the tag list per doc.
let mut tag_stmt = conn
.prepare("SELECT tag FROM document_tags WHERE doc_id = ? ORDER BY tag")
.map_err(StoreError::from)?;
let tag_iter = tag_stmt
.query_map(params![summary.doc_id.0], |r| r.get::<_, String>(0))
.map_err(StoreError::from)?;
let tags: Vec<String> = tag_iter
.collect::<rusqlite::Result<Vec<_>>>()
.map_err(StoreError::from)?;
out.push(kb_core::DocSummary { tags, ..summary });
}
Ok(out)
}
}
// ── Internal row + (de)serialization helpers ─────────────────────────────
struct DocumentRow {
doc_id: String,
asset_id: String,
workspace_path: String,
title: Option<String>,
lang: Option<String>,
parser_version: String,
doc_version: i64,
schema_version: i64,
metadata_json: String,
provenance_json: String,
// source_type / trust_level are loaded back via metadata_json round-trip,
// so we do not need separate fields here for `get_document`.
}
fn document_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
Ok(DocumentRow {
doc_id: row.get(0)?,
asset_id: row.get(1)?,
workspace_path: row.get(2)?,
title: row.get(3)?,
lang: row.get(4)?,
// 5: source_type, 6: trust_level — read but unused here (metadata_json
// is authoritative). Keeping them in the SELECT makes the column
// ordering match the INSERT and allows future fields without
// shifting indexes.
parser_version: row.get(7)?,
doc_version: row.get(8)?,
schema_version: row.get(9)?,
metadata_json: row.get(10)?,
provenance_json: row.get(11)?,
})
}
struct ChunkRow {
chunk_id: String,
doc_id: String,
text: String,
heading_path_json: String,
source_spans_json: String,
token_estimate: i64,
chunker_version: String,
policy_hash: String,
block_ids_json: String,
}
fn chunk_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
Ok(ChunkRow {
chunk_id: row.get(0)?,
doc_id: row.get(1)?,
text: row.get(2)?,
heading_path_json: row.get(3)?,
source_spans_json: row.get(4)?,
token_estimate: row.get(5)?,
chunker_version: row.get(6)?,
policy_hash: row.get(7)?,
block_ids_json: row.get(8)?,
})
}
fn doc_summary_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<kb_core::DocSummary> {
let doc_id: String = row.get(0)?;
let workspace_path: String = row.get(1)?;
let title: Option<String> = row.get(2)?;
let lang: Option<String> = row.get(3)?;
let source_type_raw: String = row.get(4)?;
let trust_level_raw: String = row.get(5)?;
let parser_version: String = row.get(6)?;
let created_at_raw: String = row.get(7)?;
let updated_at_raw: String = row.get(8)?;
let byte_len: i64 = row.get(9)?;
let chunk_count: i64 = row.get(10)?;
let chunker_version: Option<String> = row.get(11)?;
// De-serialize the lowercase string forms that match
// `#[serde(rename_all = "lowercase")]` on each enum.
let source_type: kb_core::SourceType =
serde_json::from_value(serde_json::Value::String(source_type_raw))
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?;
let trust_level: kb_core::TrustLevel =
serde_json::from_value(serde_json::Value::String(trust_level_raw))
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))?;
let created_at = OffsetDateTime::parse(
&created_at_raw,
&time::format_description::well_known::Rfc3339,
)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e)))?;
let updated_at = OffsetDateTime::parse(
&updated_at_raw,
&time::format_description::well_known::Rfc3339,
)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(8, rusqlite::types::Type::Text, Box::new(e)))?;
Ok(kb_core::DocSummary {
doc_id: kb_core::DocumentId(doc_id),
doc_path: kb_core::WorkspacePath(workspace_path),
title: title.unwrap_or_default(),
lang: kb_core::Lang(lang.unwrap_or_default()),
// tags filled in by caller after a per-doc fetch.
tags: Vec::new(),
trust_level,
source_type,
byte_len: byte_len as u64,
chunk_count: chunk_count as u32,
created_at,
updated_at,
parser_version: kb_core::ParserVersion(parser_version),
// chunker_version may be NULL when the doc has no chunks yet.
// Empty string is the cleanest fallback consistent with the wire
// schema's required `chunker_version` field on DocSummary v1.
chunker_version: kb_core::ChunkerVersion(chunker_version.unwrap_or_default()),
})
}
/// Map a `QueryReturnedNoRows` into `Ok(None)` so the trait returns
/// `Option<T>` rather than an error for the common "missing" case.
fn rows_optional<T>(err: rusqlite::Error) -> rusqlite::Result<Option<T>> {
match err {
rusqlite::Error::QueryReturnedNoRows => Ok(None),
e => Err(e),
}
}
/// UPSERT the documents row and bump `doc_version` on conflict.
fn upsert_document(
tx: &rusqlite::Transaction<'_>,
doc: &kb_core::CanonicalDocument,
) -> Result<()> {
let metadata_json = serde_json::to_string(&doc.metadata)
.context("serialize metadata")?;
let provenance_json = serde_json::to_string(&doc.provenance)
.context("serialize provenance")?;
// String form of the lowercase serde representation. We avoid
// embedding `serde_json::to_string` quotes (`"markdown"` → just
// `markdown` for the column).
let source_type = source_type_label(&doc.metadata.source_type);
let trust_level = trust_level_label(&doc.metadata.trust_level);
let created_at = doc
.metadata
.created_at
.format(&time::format_description::well_known::Rfc3339)
.context("format created_at")?;
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format updated_at")?;
tx.execute(
"INSERT INTO documents (
doc_id, asset_id, workspace_path, title, lang,
source_type, trust_level, parser_version,
doc_version, schema_version, metadata_json,
provenance_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(doc_id) DO UPDATE SET
asset_id = excluded.asset_id,
workspace_path = excluded.workspace_path,
title = excluded.title,
lang = excluded.lang,
source_type = excluded.source_type,
trust_level = excluded.trust_level,
parser_version = excluded.parser_version,
-- doc_version: bump on update. excluded.doc_version is the
-- caller's submitted value; we ignore it and add 1 to the
-- existing column so each re-ingest cleanly increments.
doc_version = documents.doc_version + 1,
schema_version = excluded.schema_version,
metadata_json = excluded.metadata_json,
provenance_json = excluded.provenance_json,
updated_at = excluded.updated_at",
params![
doc.doc_id.0,
doc.source_asset_id.0,
doc.workspace_path.0,
doc.title,
doc.lang.0,
source_type,
trust_level,
doc.parser_version.0,
doc.doc_version as i64,
doc.schema_version as i64,
metadata_json,
provenance_json,
created_at,
now,
],
)
.map_err(StoreError::from)?;
Ok(())
}
fn source_type_label(s: &kb_core::SourceType) -> &'static str {
match s {
kb_core::SourceType::Markdown => "markdown",
kb_core::SourceType::Note => "note",
kb_core::SourceType::Paper => "paper",
kb_core::SourceType::Reference => "reference",
kb_core::SourceType::Inbox => "inbox",
}
}
fn trust_level_label(s: &kb_core::TrustLevel) -> &'static str {
match s {
kb_core::TrustLevel::Primary => "primary",
kb_core::TrustLevel::Secondary => "secondary",
kb_core::TrustLevel::Generated => "generated",
}
}
fn replace_document_tags(
tx: &rusqlite::Transaction<'_>,
doc_id: &kb_core::DocumentId,
tags: &[String],
) -> Result<()> {
tx.execute("DELETE FROM document_tags WHERE doc_id = ?", params![doc_id.0])
.map_err(StoreError::from)?;
let mut stmt = tx
.prepare(
"INSERT INTO document_tags (doc_id, tag) VALUES (?, ?)
ON CONFLICT(doc_id, tag) DO NOTHING",
)
.map_err(StoreError::from)?;
for tag in tags {
stmt.execute(params![doc_id.0, tag])
.map_err(StoreError::from)?;
}
Ok(())
}
struct BlockRow {
block_id: String,
doc_id: String,
kind: &'static str,
heading_path_json: String,
ordinal: i64,
source_span_json: String,
/// The full Block JSON — round-trip path for `get_document`. Also
/// future-proofs new variants without schema churn.
payload_json: String,
}
fn block_to_row(
doc: &kb_core::DocumentId,
block: &kb_core::Block,
stream_ordinal: i64,
) -> Result<BlockRow> {
let (block_id, kind, heading_path_json, source_span_json) = match block {
kb_core::Block::Heading(b) => (
b.common.block_id.0.clone(),
"heading",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::Paragraph(b) | kb_core::Block::Quote(b) => (
b.common.block_id.0.clone(),
// Discriminate Paragraph vs Quote on the enum tag: payload
// round-trip carries the variant, but the column needs a
// stable label for filtering.
if matches!(block, kb_core::Block::Paragraph(_)) {
"paragraph"
} else {
"quote"
},
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::List(b) => (
b.common.block_id.0.clone(),
"list",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::Code(b) => (
b.common.block_id.0.clone(),
"code",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::Table(b) => (
b.common.block_id.0.clone(),
"table",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::ImageRef(b) => (
b.common.block_id.0.clone(),
"imageref",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
kb_core::Block::AudioRef(b) => (
b.common.block_id.0.clone(),
"audioref",
serde_json::to_string(&b.common.heading_path)?,
serde_json::to_string(&b.common.source_span)?,
),
};
let payload_json = serde_json::to_string(block).context("serialize block")?;
Ok(BlockRow {
block_id,
doc_id: doc.0.clone(),
kind,
heading_path_json,
ordinal: stream_ordinal,
source_span_json,
payload_json,
})
}

View File

@@ -0,0 +1,20 @@
//! Crate-local error type per design §10.
//!
//! Boundary code (`kb-app`, `kb-cli`) flattens these into `anyhow::Error`,
//! so the trait impls return `anyhow::Result` directly. Internally we
//! still distinguish `Conflict` (e.g. checksum mismatch) from `Sqlite` /
//! `Migration` so callers that downcast can route refusal-style flows.
use thiserror::Error;
#[derive(Debug, Error)]
pub enum StoreError {
#[error("sqlite error: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("migration error: {0}")]
Migration(String),
#[error("conflict: {0}")]
Conflict(String),
}

View File

@@ -0,0 +1,252 @@
//! `JobRepo` impl per design §7.2.
//!
//! The `jobs` table is the §5.7 schema. JobIds are minted via blake3 over
//! `(now, kind, payload)` so two `create` calls in the same millisecond
//! still distinguish.
use anyhow::{Context, Result};
use rusqlite::params;
use serde_json::Value;
use time::OffsetDateTime;
use crate::error::StoreError;
use crate::store::SqliteStore;
impl kb_core::JobRepo for SqliteStore {
fn create(
&self,
kind: kb_core::JobKind,
payload: Value,
) -> Result<kb_core::JobId> {
let now_dt = OffsetDateTime::now_utc();
let now = now_dt
.format(&time::format_description::well_known::Rfc3339)
.context("format job created_at")?;
// JobId recipe: stable hex over (kind, payload_canonical, ns).
// The nanosecond timestamp is included so two `create` calls with
// identical `(kind, payload)` still get distinct IDs.
let job_id = mint_job_id(&kind, &payload, now_dt);
let kind_label = job_kind_label(&kind);
let payload_json = serde_json::to_string(&payload)
.context("serialize job payload")?;
let conn = self.lock_conn();
conn.execute(
"INSERT INTO jobs (
job_id, kind, status, payload_json, progress_json,
error_json, created_at, updated_at, finished_at
) VALUES (?, ?, 'pending', ?, NULL, NULL, ?, ?, NULL)",
params![job_id.0, kind_label, payload_json, now, now],
)
.map_err(StoreError::from)?;
Ok(job_id)
}
fn update_progress(
&self,
id: &kb_core::JobId,
progress: Value,
) -> Result<()> {
let progress_json = serde_json::to_string(&progress)
.context("serialize job progress")?;
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format job updated_at")?;
let conn = self.lock_conn();
// status='pending' → 'running' on first progress update; later
// progress calls keep status='running' until finish().
conn.execute(
"UPDATE jobs SET
progress_json = ?,
status = CASE status WHEN 'pending' THEN 'running' ELSE status END,
updated_at = ?
WHERE job_id = ?",
params![progress_json, now, id.0],
)
.map_err(StoreError::from)?;
Ok(())
}
fn finish(
&self,
id: &kb_core::JobId,
status: kb_core::JobStatus,
error: Option<&str>,
) -> Result<()> {
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format job finished_at")?;
let status_label = job_status_label(&status);
let error_json = error
.map(|e| serde_json::to_string(&serde_json::json!({ "message": e })))
.transpose()
.context("serialize job error")?;
let conn = self.lock_conn();
conn.execute(
"UPDATE jobs SET
status = ?,
error_json = ?,
updated_at = ?,
finished_at = ?
WHERE job_id = ?",
params![status_label, error_json, now, now, id.0],
)
.map_err(StoreError::from)?;
Ok(())
}
fn list(
&self,
filter: &kb_core::JobFilter,
) -> Result<Vec<kb_core::JobRow>> {
let conn = self.lock_conn();
let mut sql = String::from(
"SELECT job_id, kind, status, payload_json, progress_json,
error_json, created_at, updated_at, finished_at
FROM jobs WHERE 1=1",
);
let mut params_dyn: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(status) = &filter.status {
sql.push_str(" AND status = ?");
params_dyn.push(Box::new(job_status_label(status).to_string()));
}
if let Some(kind) = &filter.kind {
sql.push_str(" AND kind = ?");
params_dyn.push(Box::new(job_kind_label(kind).to_string()));
}
sql.push_str(" ORDER BY created_at ASC");
let mut stmt = conn.prepare(&sql).map_err(StoreError::from)?;
let rows = stmt
.query_map(
rusqlite::params_from_iter(params_dyn.iter().map(|b| b.as_ref())),
job_row_from_sql,
)
.map_err(StoreError::from)?;
let mut out = Vec::new();
for r in rows {
out.push(r.map_err(StoreError::from)?);
}
Ok(out)
}
}
/// Mint a JobId over (kind, canonical(payload), nanos). The 32-hex
/// invariant on `kb_core::JobId` is honored by taking the first 32 chars
/// of the blake3 hex.
fn mint_job_id(
kind: &kb_core::JobKind,
payload: &Value,
at: OffsetDateTime,
) -> kb_core::JobId {
// Plain serde_json::to_vec is enough — JobIds are not part of the
// §4.2 ID family and don't need canonical-JSON parity with other IDs.
// The nanosecond suffix is what guarantees uniqueness, not stable
// hashing.
let mut hasher = blake3::Hasher::new();
hasher.update(job_kind_label(kind).as_bytes());
if let Ok(bytes) = serde_json::to_vec(payload) {
hasher.update(&bytes);
}
hasher.update(&at.unix_timestamp_nanos().to_be_bytes());
let hex = hasher.finalize().to_hex().to_string();
kb_core::JobId(hex[..32].to_string())
}
fn job_kind_label(k: &kb_core::JobKind) -> &'static str {
match k {
kb_core::JobKind::Ingest => "ingest",
kb_core::JobKind::Chunk => "chunk",
kb_core::JobKind::Embed => "embed",
kb_core::JobKind::Ocr => "ocr",
kb_core::JobKind::Transcribe => "transcribe",
kb_core::JobKind::Reindex => "reindex",
kb_core::JobKind::Doctor => "doctor",
}
}
fn job_status_label(s: &kb_core::JobStatus) -> &'static str {
match s {
kb_core::JobStatus::Pending => "pending",
kb_core::JobStatus::Running => "running",
kb_core::JobStatus::Succeeded => "succeeded",
kb_core::JobStatus::Failed => "failed",
kb_core::JobStatus::Canceled => "canceled",
}
}
fn job_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<kb_core::JobRow> {
let job_id: String = row.get(0)?;
let kind_raw: String = row.get(1)?;
let status_raw: String = row.get(2)?;
let payload_json: String = row.get(3)?;
let progress_json: Option<String> = row.get(4)?;
let error_json: Option<String> = row.get(5)?;
let created_at_raw: String = row.get(6)?;
let updated_at_raw: String = row.get(7)?;
let finished_at_raw: Option<String> = row.get(8)?;
let kind: kb_core::JobKind =
serde_json::from_value(serde_json::Value::String(kind_raw))
.map_err(conv_err(1))?;
let status: kb_core::JobStatus =
serde_json::from_value(serde_json::Value::String(status_raw))
.map_err(conv_err(2))?;
let payload: Value = serde_json::from_str(&payload_json).map_err(conv_err(3))?;
let progress: Option<Value> = match progress_json {
Some(s) => Some(serde_json::from_str(&s).map_err(conv_err(4))?),
None => None,
};
// Surface the stored error message back as a plain string per the
// JobRow schema (§7.2). We stored `{"message": "..."}` for forward
// compatibility — pull `message` back out, or fall back to the raw
// JSON if the shape ever drifts.
let error: Option<String> = match error_json {
Some(s) => match serde_json::from_str::<Value>(&s) {
Ok(v) => v
.get("message")
.and_then(Value::as_str)
.map(str::to_owned)
.or(Some(s)),
Err(_) => Some(s),
},
None => None,
};
let created_at = OffsetDateTime::parse(
&created_at_raw,
&time::format_description::well_known::Rfc3339,
)
.map_err(conv_err(6))?;
let updated_at = OffsetDateTime::parse(
&updated_at_raw,
&time::format_description::well_known::Rfc3339,
)
.map_err(conv_err(7))?;
let finished_at = match finished_at_raw {
Some(s) => Some(
OffsetDateTime::parse(&s, &time::format_description::well_known::Rfc3339)
.map_err(conv_err(8))?,
),
None => None,
};
Ok(kb_core::JobRow {
job_id: kb_core::JobId(job_id),
kind,
status,
payload,
progress,
error,
created_at,
updated_at,
finished_at,
})
}
fn conv_err<E: std::error::Error + Send + Sync + 'static>(
col: usize,
) -> impl FnOnce(E) -> rusqlite::Error {
move |e| {
rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e))
}
}

View File

@@ -0,0 +1,23 @@
//! `kb-store-sqlite` — SQLite-backed implementations of
//! [`kb_core::DocumentStore`] and [`kb_core::JobRepo`] (§7.2), plus the
//! asset writer that copies (or references) raw bytes per design §5.2.
//!
//! Schema is owned by `migrations/V001__init.sql` (workspace root), which
//! ships the full §5 DDL minus the FTS5 virtual table + triggers (those
//! land in P2-1's `V002`).
//!
//! Allowed deps per task spec: `kb-core`, `kb-config`, `rusqlite`,
//! `refinery`, `serde_json`, `time`, `blake3`, `tracing`, `anyhow`,
//! `thiserror`. NOT allowed: `kb-parse-*`, `kb-normalize`, `kb-chunk`,
//! `kb-store-vector`, `kb-source-fs`, etc. (`kb-parse-md`, `kb-normalize`,
//! `kb-chunk` may appear as **dev-deps** — see `Cargo.toml` — to drive
//! the contract round-trip test off a real Markdown fixture.)
mod error;
mod schema;
mod store;
mod documents;
mod jobs;
pub use error::StoreError;
pub use store::SqliteStore;

View File

@@ -0,0 +1,14 @@
//! Refinery migration bundle. The migrations live at the workspace
//! `migrations/` directory; refinery's `embed_migrations!` macro inlines
//! them at compile time so the binary needs no runtime SQL files.
// `embed_migrations!` looks under the path relative to the package root
// (the crate's `Cargo.toml`). The workspace migrations dir is two levels
// up: `crates/kb-store-sqlite/Cargo.toml` → `../../migrations`.
refinery::embed_migrations!("../../migrations");
/// Re-export the runner under a stable name. Calling
/// `runner().run(&mut conn)?` applies all pending migrations.
pub fn runner() -> refinery::Runner {
migrations::runner()
}

View File

@@ -0,0 +1,399 @@
//! `SqliteStore` — open + run_migrations + asset writer.
//!
//! The store wraps a single `rusqlite::Connection` behind a
//! `std::sync::Mutex` so the public trait impls (which take `&self`) can
//! still issue mutating SQL. Concurrency is intentionally coarse for P1;
//! later phases can swap to a connection pool if measurement shows the
//! mutex on the hot path.
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Mutex, MutexGuard};
use anyhow::{Context, Result};
use rusqlite::Connection;
use crate::error::StoreError;
use crate::schema;
/// Monotonic counter used to namespace per-process temp file names so
/// concurrent `put_asset_with_bytes` calls in the same millisecond cannot
/// collide on `<final>.tmp.<pid>.<n>`.
static TEMP_SUFFIX_COUNTER: AtomicU64 = AtomicU64::new(0);
/// Length, in hex chars, of a valid `kb_core::AssetId`. blake3 first-half
/// truncated, mirrored from `kb-core`'s newtype invariant.
const ASSET_ID_HEX_LEN: usize = 32;
/// Default file name under `config.storage.data_dir`. Kept private — the
/// path layout is a §6.3 design decision, not part of the store's public
/// surface.
const SQLITE_FILE: &str = "kb.sqlite";
/// Subdirectory under `data_dir` holding shard-prefixed asset bytes
/// (`<aa>/<asset_id>`). Mirrors design §6.3.
const ASSETS_SUBDIR: &str = "assets";
/// Length of the shard prefix: 2 hex chars → 256 buckets, plenty to keep
/// directory cardinality reasonable on workspaces with thousands of
/// assets without descending into hash-trees.
const ASSET_SHARD_LEN: usize = 2;
/// Bytes-per-MiB conversion. Used by the asset writer to compare
/// `byte_len` against `storage.copy_threshold_mb`.
const BYTES_PER_MIB: u64 = 1024 * 1024;
/// SQLite-backed kb store.
///
/// Construct via [`SqliteStore::open`], then call
/// [`SqliteStore::run_migrations`] to apply the bundled `V001__init.sql`
/// before any read/write call.
pub struct SqliteStore {
/// Resolved absolute path to `data_dir`. Used by the asset writer.
pub(crate) data_dir: PathBuf,
/// Maximum asset size eligible for in-store copy; assets bigger than
/// this are recorded as `reference` and read from their source path.
pub(crate) copy_threshold_bytes: u64,
/// Single mutexed connection — see module docs for rationale.
pub(crate) conn: Mutex<Connection>,
}
impl SqliteStore {
/// Open (or create) the SQLite file under `config.storage.data_dir`,
/// apply pragmas (foreign_keys / WAL / synchronous=NORMAL /
/// temp_store=MEMORY), and create parent directories as needed.
/// **Does not run migrations** — call [`Self::run_migrations`] next.
pub fn open(config: &kb_config::Config) -> Result<Self> {
let data_dir = expand_data_dir(&config.storage.data_dir);
std::fs::create_dir_all(&data_dir)
.with_context(|| format!("create data_dir {}", data_dir.display()))?;
let db_path = data_dir.join(SQLITE_FILE);
let conn = Connection::open(&db_path)
.with_context(|| format!("open sqlite at {}", db_path.display()))?;
apply_pragmas(&conn)?;
tracing::debug!(
target: "kb-store-sqlite",
data_dir = %data_dir.display(),
db = %db_path.display(),
"opened sqlite store"
);
Ok(Self {
data_dir,
copy_threshold_bytes: config.storage.copy_threshold_mb * BYTES_PER_MIB,
conn: Mutex::new(conn),
})
}
/// Apply all pending migrations bundled at compile time
/// (`migrations/V001__init.sql` and any later additions).
pub fn run_migrations(&self) -> Result<()> {
let mut conn = self.lock_conn();
schema::runner()
.run(&mut *conn)
.map_err(|e| StoreError::Migration(e.to_string()))?;
tracing::debug!(target: "kb-store-sqlite", "migrations applied");
Ok(())
}
/// Acquire the connection mutex, recovering from poison.
///
/// Poisoning here means a previous holder panicked while holding the
/// guard. The active rusqlite transaction (if any) was rolled back by
/// the `Transaction` `Drop` impl, so the connection state is still
/// safe to reuse — we simply unwrap the inner guard rather than
/// propagate the panic to every subsequent call.
pub(crate) fn lock_conn(&self) -> MutexGuard<'_, Connection> {
self.conn.lock().unwrap_or_else(|p| p.into_inner())
}
/// Persist a `RawAsset` *with its raw bytes*: row goes into `assets`,
/// bytes go to `data_dir/assets/<aa>/<asset_id>` if `byte_len ≤
/// copy_threshold_mb`, otherwise the row records the source URI's
/// path and no copy is performed.
///
/// In either branch, `blake3(bytes)` is recomputed and compared to
/// `asset.checksum.0`. A mismatch returns
/// `StoreError::Conflict` wrapped in `anyhow::Error`.
pub fn put_asset_with_bytes(
&self,
asset: &kb_core::RawAsset,
bytes: &[u8],
) -> Result<()> {
// 0. Validate the AssetId shape before any I/O. `kb_core::AssetId`
// is a `pub String` newtype: `FromStr` enforces the 32-hex-char
// invariant, but a hand-constructed `AssetId("../etc/passwd…")`
// can bypass that and reach `assets_path_for`. Refuse such IDs at
// the store boundary to keep shard-dir slicing safe.
validate_asset_id(&asset.asset_id)?;
// 1. Verify the caller's checksum matches what's actually on the
// wire. A drift here means the bytes the parser saw and the bytes
// we're about to durably store disagree — refuse persistence.
let computed = blake3::hash(bytes).to_hex().to_string();
if computed != asset.checksum.0 {
return Err(StoreError::Conflict(format!(
"checksum mismatch: asset {} declares {} but bytes hash to {}",
asset.asset_id.0, asset.checksum.0, computed
))
.into());
}
// 2. Decide copy vs. reference. The threshold compares the
// declared `byte_len` (caller-vouched) rather than `bytes.len()`
// because some sources stream and `byte_len` is authoritative.
if asset.byte_len <= self.copy_threshold_bytes {
// Copy mode. To prevent file orphans on UPSERT failure we use
// the temp-file + atomic-rename pattern:
// (a) write bytes to `<final>.tmp.<pid>.<counter>`
// (b) fsync the temp file
// (c) UPSERT the row
// (d) on UPSERT success: rename temp → final (atomic on
// same fs)
// (e) on any failure between (a) and (d): best-effort delete
// of the temp file so we never leak bytes on disk.
let dest = self.assets_path_for(&asset.asset_id);
if let Some(parent) = dest.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("create asset shard dir {}", parent.display())
})?;
}
let temp_path = temp_path_for(&dest);
// Inline closure so any `?` in (a)/(b) cleans up the temp
// file before bubbling out.
let write_and_upsert = || -> Result<()> {
{
let mut f = std::fs::File::create(&temp_path).with_context(|| {
format!("create temp asset file {}", temp_path.display())
})?;
use std::io::Write;
f.write_all(bytes).with_context(|| {
format!("write asset bytes to {}", temp_path.display())
})?;
f.sync_all().with_context(|| {
format!("fsync temp asset file {}", temp_path.display())
})?;
}
// Mirror §6.6: files 0o644.
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&temp_path)?.permissions();
perms.set_mode(0o644);
std::fs::set_permissions(&temp_path, perms).with_context(|| {
format!("chmod 0o644 on {}", temp_path.display())
})?;
}
// UPSERT the row first; only after a successful row write
// do we publish the file via rename. A second
// `put_asset_with_bytes` for the same asset_id overwrites
// in place.
{
let conn = self.lock_conn();
upsert_asset_row(
&conn,
asset,
"copied",
&dest.to_string_lossy(),
)?;
}
std::fs::rename(&temp_path, &dest).with_context(|| {
format!(
"atomic rename {} -> {}",
temp_path.display(),
dest.display()
)
})?;
Ok(())
};
match write_and_upsert() {
Ok(()) => Ok(()),
Err(e) => {
// Best-effort cleanup; ignore errors so the original
// failure (likely the more useful one) propagates.
let _ = std::fs::remove_file(&temp_path);
Err(e)
}
}
} else {
// Reference: caller's source path is recorded verbatim. We
// accept either a `File(path)` or `Kb(uri)` SourceUri; the
// latter stores the raw `kb://...` string. No file I/O ⇒ no
// orphan risk; just UPSERT the row.
let storage_path = match &asset.source_uri {
kb_core::SourceUri::File(p) => p.to_string_lossy().into_owned(),
kb_core::SourceUri::Kb(u) => u.clone(),
};
let conn = self.lock_conn();
upsert_asset_row(&conn, asset, "reference", &storage_path)?;
Ok(())
}
}
/// Compute the `data_dir/assets/<aa>/<asset_id>` path for an asset.
/// `<aa>` is the first [`ASSET_SHARD_LEN`] hex chars of `asset_id`.
///
/// Callers that build paths from caller-controlled IDs MUST first
/// invoke [`validate_asset_id`] (already enforced at every store
/// entry that takes a `RawAsset`). The `id.len() >= ASSET_SHARD_LEN`
/// guard below is a defense-in-depth fallback only.
pub(crate) fn assets_path_for(&self, asset_id: &kb_core::AssetId) -> PathBuf {
let id = &asset_id.0;
let shard = if id.len() >= ASSET_SHARD_LEN {
&id[..ASSET_SHARD_LEN]
} else {
id.as_str()
};
self.data_dir.join(ASSETS_SUBDIR).join(shard).join(id)
}
}
/// Reject an `AssetId` whose shape would let a malicious caller escape
/// the `data_dir/assets/<aa>/` shard tree. `kb_core::AssetId(pub String)`
/// permits hand-construction, so any function that turns an `AssetId`
/// into a filesystem path must call this first.
pub(crate) fn validate_asset_id(asset_id: &kb_core::AssetId) -> Result<()> {
if asset_id.0.len() != ASSET_ID_HEX_LEN
|| !asset_id.0.bytes().all(|b| b.is_ascii_hexdigit())
{
anyhow::bail!(
"invalid AssetId shape (expected {} ASCII hex chars): {:?}",
ASSET_ID_HEX_LEN,
asset_id.0
);
}
Ok(())
}
/// Compute a per-call temp-file path next to `dest` that is unlikely to
/// collide with any other in-flight writer (process pid + monotonic
/// counter). The temp file lives in the same parent directory so the
/// final `rename` is an atomic same-filesystem rename on POSIX.
fn temp_path_for(dest: &Path) -> PathBuf {
let pid = std::process::id();
let n = TEMP_SUFFIX_COUNTER.fetch_add(1, Ordering::Relaxed);
let parent = dest.parent().unwrap_or_else(|| Path::new("."));
let file_name = dest
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| "asset".to_string());
parent.join(format!("{file_name}.tmp.{pid}.{n}"))
}
/// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes`
/// path (which has bytes + computed `storage_kind/path`) and the
/// `DocumentStore::put_asset` path (which only has the `RawAsset` and
/// reads `storage_kind/path` from `asset.stored`).
pub(crate) fn upsert_asset_row(
conn: &Connection,
asset: &kb_core::RawAsset,
storage_kind: &str,
storage_path: &str,
) -> Result<()> {
let source_uri = match &asset.source_uri {
kb_core::SourceUri::File(p) => format!("file://{}", p.to_string_lossy()),
kb_core::SourceUri::Kb(u) => u.clone(),
};
let media_type = serde_json::to_string(&asset.media_type)
.context("serialize media_type")?;
let discovered_at = asset
.discovered_at
.format(&time::format_description::well_known::Rfc3339)
.context("format discovered_at")?;
conn.execute(
"INSERT INTO assets (
asset_id, source_uri, workspace_path, media_type, byte_len,
checksum, storage_kind, storage_path, discovered_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(asset_id) DO UPDATE SET
source_uri = excluded.source_uri,
workspace_path = excluded.workspace_path,
media_type = excluded.media_type,
byte_len = excluded.byte_len,
checksum = excluded.checksum,
storage_kind = excluded.storage_kind,
storage_path = excluded.storage_path,
discovered_at = excluded.discovered_at",
rusqlite::params![
asset.asset_id.0,
source_uri,
asset.workspace_path.0,
media_type,
asset.byte_len as i64,
asset.checksum.0,
storage_kind,
storage_path,
discovered_at,
],
)
.map_err(StoreError::from)?;
Ok(())
}
/// Apply the design §5 / task-spec pragmas. Called once per connection.
/// Note: WAL is persistent (the journal-mode setting is sticky in the DB
/// header) but `foreign_keys`, `synchronous`, and `temp_store` are
/// per-connection — they MUST be re-applied on every open.
fn apply_pragmas(conn: &Connection) -> Result<()> {
conn.pragma_update(None, "foreign_keys", "ON")?;
// `journal_mode = WAL` returns the current mode as a row; use
// `pragma_query_value` semantics via `query_row` to allow that.
conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "temp_store", "MEMORY")?;
Ok(())
}
/// Expand the placeholders / `~` / env-vars used by `Config::storage.data_dir`.
///
/// Supported substitutions, in order:
/// - `${XDG_DATA_HOME:-~/.local/share}` (and the bare `${XDG_DATA_HOME}`)
/// - leading `~` → `$HOME`
///
/// If neither produces an absolute path, the input is returned as-is
/// (relative paths are kept relative to the caller's CWD).
fn expand_data_dir(raw: &str) -> PathBuf {
let mut s = raw.to_string();
// ${XDG_DATA_HOME:-~/.local/share}: respect the env override, else
// fall back to the suffix after `:-`.
if let Some(start) = s.find("${XDG_DATA_HOME") {
if let Some(rel_end) = s[start..].find('}') {
let end = start + rel_end + 1; // include trailing '}'
let inner = &s[start + 2..end - 1]; // strip ${ and }
let replacement = match std::env::var("XDG_DATA_HOME") {
Ok(v) if !v.is_empty() => v,
_ => {
// inner is e.g. `XDG_DATA_HOME:-~/.local/share`.
if let Some((_, default)) = inner.split_once(":-") {
default.to_string()
} else {
// No default supplied; mimic Bash and yield "".
String::new()
}
}
};
s.replace_range(start..end, &replacement);
}
}
// ~ at the front → $HOME (or `dirs::home_dir`).
if let Some(rest) = s.strip_prefix('~') {
if let Some(home) = std::env::var_os("HOME").map(PathBuf::from).or_else(dirs_home_fallback)
{
return home.join(rest.trim_start_matches('/'));
}
}
PathBuf::from(s)
}
/// Tiny shim to avoid pulling in the `dirs` crate as a direct dep — we
/// only fall back when `$HOME` is unset, which is exotic on the platforms
/// we target. Returns `None` so the caller keeps the literal `~`.
fn dirs_home_fallback() -> Option<PathBuf> {
None
}

View File

@@ -0,0 +1,235 @@
//! Asset writer tests: copy mode (file written 0o644), reference mode
//! (no copy, row records source), and checksum mismatch (Conflict).
use std::path::PathBuf;
use kb_core::{AssetId, AssetStorage, Checksum, MediaType, RawAsset, SourceUri, WorkspacePath};
use kb_store_sqlite::SqliteStore;
use time::OffsetDateTime;
mod common;
fn fixed_asset(_bytes: &[u8], byte_len: u64, declared_checksum: &str) -> RawAsset {
RawAsset {
// 32-hex AssetId per kb-core newtype invariant.
asset_id: AssetId("a".repeat(32)),
source_uri: SourceUri::File(PathBuf::from("/some/source.md")),
workspace_path: WorkspacePath::new("notes/foo.md".into()).unwrap(),
media_type: MediaType::Markdown,
byte_len,
checksum: Checksum(declared_checksum.into()),
discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
stored: AssetStorage::Reference {
path: PathBuf::from("/some/source.md"),
sha: Checksum("0".repeat(64)),
},
}
}
fn b3_full_hex(bytes: &[u8]) -> String {
blake3::hash(bytes).to_hex().to_string()
}
#[test]
fn copy_mode_writes_file_with_0o644_and_correct_bytes() {
let env = common::TestEnv::with_threshold(100);
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
let bytes = b"hello, sqlite";
let cs = b3_full_hex(bytes);
let asset = fixed_asset(bytes, bytes.len() as u64, &cs);
store.put_asset_with_bytes(&asset, bytes).expect("write");
// Path: data_dir/assets/aa/aaaaaa…aa
let aa = &asset.asset_id.0[..2];
let dest = env.data_dir().join("assets").join(aa).join(&asset.asset_id.0);
assert!(dest.exists(), "asset file not written at {}", dest.display());
let on_disk = std::fs::read(&dest).unwrap();
assert_eq!(on_disk, bytes);
// Mode 0o644 on Unix.
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mode = std::fs::metadata(&dest).unwrap().permissions().mode() & 0o777;
assert_eq!(mode, 0o644, "expected 0o644, got 0o{mode:o}");
}
// Row recorded copied.
let storage_kind: String = env.with_conn(|c| {
c.query_row(
"SELECT storage_kind FROM assets WHERE asset_id = ?",
[&asset.asset_id.0],
|r| r.get(0),
)
});
assert_eq!(storage_kind, "copied");
}
#[test]
fn reference_mode_does_not_write_file_but_records_path() {
// copy_threshold_mb=0 → every byte lands on the reference branch.
let env = common::TestEnv::with_threshold(0);
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
let bytes = b"big-pretend-bytes";
let cs = b3_full_hex(bytes);
// byte_len declared > 0 so the threshold check picks reference. With
// copy_threshold_bytes=0 even byte_len=1 trips the else branch.
let mut asset = fixed_asset(bytes, 1, &cs);
asset.source_uri = SourceUri::File(PathBuf::from("/path/to/original.md"));
store.put_asset_with_bytes(&asset, bytes).expect("ref write");
let aa = &asset.asset_id.0[..2];
let dest = env.data_dir().join("assets").join(aa).join(&asset.asset_id.0);
assert!(!dest.exists(), "reference mode must not copy bytes");
let (storage_kind, storage_path): (String, String) = env.with_conn(|c| {
c.query_row(
"SELECT storage_kind, storage_path FROM assets WHERE asset_id = ?",
[&asset.asset_id.0],
|r| Ok((r.get(0)?, r.get(1)?)),
)
});
assert_eq!(storage_kind, "reference");
assert_eq!(storage_path, "/path/to/original.md");
}
#[test]
fn put_asset_with_bytes_orphan_cleanup_on_upsert_failure() {
// Goal: prove that if the row UPSERT fails AFTER the bytes have been
// staged on disk, no `<aa>/<asset_id>` file is left behind.
//
// Lever: the `assets` table has a UNIQUE INDEX on `workspace_path`
// (V001), but the UPSERT is `ON CONFLICT(asset_id)`. So if some other
// row already owns this `workspace_path`, the INSERT half of the
// UPSERT trips a UNIQUE constraint that the ON CONFLICT clause does
// NOT handle — UPSERT errors. The new asset's bytes were already
// staged; we assert they are NOT visible at the final destination.
let env = common::TestEnv::with_threshold(100);
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
// Pre-populate a row that owns `notes/foo.md` (the workspace_path our
// fixture asset will also claim) under a *different* asset_id.
env.with_conn(|c| {
c.execute(
"INSERT INTO assets (
asset_id, source_uri, workspace_path, media_type, byte_len,
checksum, storage_kind, storage_path, discovered_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
rusqlite::params![
"b".repeat(32),
"file:///elsewhere/foo.md",
"notes/foo.md",
"\"markdown\"",
7_i64,
"0".repeat(64),
"reference",
"/elsewhere/foo.md",
"2024-01-01T00:00:00Z",
],
)
});
let bytes = b"hello, sqlite";
let cs = b3_full_hex(bytes);
let asset = fixed_asset(bytes, bytes.len() as u64, &cs);
let err = store
.put_asset_with_bytes(&asset, bytes)
.expect_err("UPSERT must fail on workspace_path UNIQUE violation");
let msg = format!("{err:#}");
assert!(
msg.to_lowercase().contains("unique") || msg.to_lowercase().contains("constraint"),
"expected UNIQUE constraint failure, got: {msg}"
);
// Final destination must NOT exist (no orphan).
let aa = &asset.asset_id.0[..2];
let dest = env.data_dir().join("assets").join(aa).join(&asset.asset_id.0);
assert!(
!dest.exists(),
"asset bytes were left orphan at {} after UPSERT failure",
dest.display()
);
// No `*.tmp.*` either — temp file must be cleaned up too.
let shard_dir = env.data_dir().join("assets").join(aa);
if let Ok(entries) = std::fs::read_dir(&shard_dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let s = name.to_string_lossy();
assert!(
!s.contains(".tmp."),
"temp file leaked at {}",
entry.path().display()
);
}
}
}
#[test]
fn put_asset_with_bytes_rejects_invalid_asset_id() {
// `kb_core::AssetId(pub String)` lets a hand-construction bypass the
// 32-hex `FromStr` invariant. The store boundary must reject any ID
// whose shape would let path construction escape `data_dir/assets/`.
let env = common::TestEnv::with_threshold(100);
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
// 32 chars but contains a `/` — would let `assets_path_for` stitch
// together a path outside the shard tree.
let evil_id = "../etc/passwd_padded_to_xx_xxxxx".to_string();
assert_eq!(evil_id.len(), 32, "test fixture must be 32 chars to exercise length-only checks");
let mut asset = fixed_asset(b"x", 1, &b3_full_hex(b"x"));
asset.asset_id = AssetId(evil_id.clone());
let err = store
.put_asset_with_bytes(&asset, b"x")
.expect_err("must reject non-hex AssetId");
let msg = format!("{err:#}");
assert!(
msg.contains("invalid AssetId shape"),
"expected AssetId-shape rejection, got: {msg}"
);
// And the bytes must NOT have been staged anywhere under the assets
// tree (no I/O should have happened before validation).
let assets_dir = env.data_dir().join("assets");
if assets_dir.exists() {
for entry in std::fs::read_dir(&assets_dir).unwrap().flatten() {
// Recurse one level into shard dirs and assert empty.
if let Some(sub) = std::fs::read_dir(entry.path()).unwrap().flatten().next() {
panic!(
"invalid AssetId still produced filesystem artifact at {}",
sub.path().display()
);
}
}
}
}
#[test]
fn checksum_mismatch_returns_conflict() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
let bytes = b"the real bytes";
// Tampered checksum: hash a different payload.
let wrong_cs = b3_full_hex(b"different bytes");
let asset = fixed_asset(bytes, bytes.len() as u64, &wrong_cs);
let err = store
.put_asset_with_bytes(&asset, bytes)
.expect_err("must reject checksum mismatch");
let msg = format!("{err:#}");
assert!(
msg.contains("checksum mismatch") || msg.contains("conflict"),
"expected Conflict-flavoured error, got: {msg}"
);
}

View File

@@ -0,0 +1,50 @@
//! Shared test scaffolding: temp data_dir + freshly opened SqliteStore.
#![allow(dead_code)]
use std::path::PathBuf;
use kb_config::Config;
use rusqlite::Connection;
use tempfile::TempDir;
pub struct TestEnv {
pub temp: TempDir,
pub config: Config,
}
impl TestEnv {
pub fn new() -> Self {
Self::with_threshold(100)
}
/// Override the copy-threshold (useful for the reference-mode test
/// where we want a small file to land on the reference branch).
pub fn with_threshold(copy_threshold_mb: u64) -> Self {
let temp = tempfile::tempdir().expect("tempdir");
let mut config = Config::defaults();
config.storage.data_dir = temp.path().to_string_lossy().into_owned();
config.storage.copy_threshold_mb = copy_threshold_mb;
Self { temp, config }
}
pub fn config(&self) -> Config {
self.config.clone()
}
pub fn data_dir(&self) -> PathBuf {
self.temp.path().to_path_buf()
}
pub fn db_path(&self) -> PathBuf {
self.temp.path().join("kb.sqlite")
}
/// Open a side-channel rusqlite connection for direct SQL inspection.
/// The store-owned connection is held inside a Mutex; opening a fresh
/// one is the simplest way for tests to peek at row counts / pragmas.
pub fn with_conn<T>(&self, f: impl FnOnce(&Connection) -> rusqlite::Result<T>) -> T {
let conn = Connection::open(self.db_path()).expect("open side conn");
f(&conn).expect("with_conn closure")
}
}

View File

@@ -0,0 +1,135 @@
//! Contract: drive the full pipeline (`kb-parse-md` → `kb-normalize` →
//! `kb-chunk`) on a real fixture and prove `DocumentStore` round-trips
//! the resulting `CanonicalDocument` + `Vec<Chunk>` losslessly.
//!
//! `kb-parse-md`, `kb-normalize`, `kb-chunk` are dev-deps only — see the
//! crate's `Cargo.toml`. The store crate's production tree (visible via
//! `cargo tree -p kb-store-sqlite --depth 1`) does NOT include them.
use std::path::PathBuf;
use kb_chunk::MdHeadingV1Chunker;
use kb_core::{
AssetId, AssetStorage, Checksum, ChunkPolicy, ChunkerVersion, Chunker, DocumentStore,
MediaType, ParserVersion, RawAsset, SourceUri, WorkspacePath,
};
use kb_normalize::build_canonical_document;
use kb_parse_md::{BodyHints, parse_blocks, parse_frontmatter};
use kb_store_sqlite::SqliteStore;
use time::OffsetDateTime;
mod common;
fn fixtures_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("..")
.join("..")
.join("fixtures")
.join("markdown")
}
#[test]
fn document_and_chunks_round_trip_through_sqlite() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
// ── Build inputs from the fixture ───────────────────────────────
let dir = fixtures_dir();
let bytes = std::fs::read(dir.join("code-and-table.md")).expect("read fixture");
let cs = blake3::hash(&bytes).to_hex().to_string();
let asset = RawAsset {
asset_id: AssetId("a".repeat(32)),
source_uri: SourceUri::File(dir.join("code-and-table.md")),
workspace_path: WorkspacePath::new("notes/code-and-table.md".into()).unwrap(),
media_type: MediaType::Markdown,
byte_len: bytes.len() as u64,
checksum: Checksum(cs.clone()),
discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
stored: AssetStorage::Reference {
path: dir.join("code-and-table.md"),
sha: Checksum(cs.clone()),
},
};
let hints = BodyHints {
first_h1: Some("Code And Table".into()),
fs_ctime: asset.discovered_at,
fs_mtime: asset.discovered_at,
fallback_lang: Some("en".into()),
};
let (mut metadata, _fm_span, _fm_warns) =
parse_frontmatter(&bytes, &hints).unwrap();
let (parsed_blocks, parse_warns) = parse_blocks(&bytes, 1).unwrap();
metadata.aliases.sort();
metadata.tags.sort();
let parser_version = ParserVersion("kb-store-sqlite-roundtrip".into());
let doc = build_canonical_document(
&asset,
metadata,
parsed_blocks,
&parser_version,
parse_warns,
)
.unwrap();
let policy = ChunkPolicy {
target_tokens: 200,
overlap_tokens: 40,
respect_markdown_headings: true,
chunker_version: ChunkerVersion("md-heading-v1".into()),
};
let chunks = MdHeadingV1Chunker.chunk(&doc, &policy).unwrap();
assert!(!chunks.is_empty(), "fixture must produce ≥1 chunk");
// ── Persist via the store ────────────────────────────────────────
store
.put_asset_with_bytes(&asset, &bytes)
.expect("put_asset_with_bytes");
store.put_document(&doc).expect("put_document");
store
.put_blocks(&doc.doc_id, &doc.blocks)
.expect("put_blocks");
store
.put_chunks(&doc.doc_id, &chunks)
.expect("put_chunks");
// ── Read back ────────────────────────────────────────────────────
let loaded = store
.get_document(&doc.doc_id)
.expect("get_document err")
.expect("get_document Some");
// Document-level fields must match. doc_version is bumped by the
// UPSERT path even on first put (the trigger runs on conflict
// only; first-insert lands the caller-supplied 1). updated_at is
// re-stamped server-side and is NOT round-tripped (the loaded
// CanonicalDocument carries `metadata.updated_at` from the
// metadata_json blob, which is the input value). So we compare
// the field-by-field copies that ARE deterministic:
assert_eq!(loaded.doc_id, doc.doc_id);
assert_eq!(loaded.workspace_path, doc.workspace_path);
assert_eq!(loaded.title, doc.title);
assert_eq!(loaded.lang, doc.lang);
assert_eq!(loaded.parser_version, doc.parser_version);
assert_eq!(loaded.schema_version, doc.schema_version);
assert_eq!(loaded.metadata, doc.metadata, "metadata round-trip");
assert_eq!(loaded.provenance, doc.provenance, "provenance round-trip");
assert_eq!(
loaded.blocks.len(),
doc.blocks.len(),
"block count round-trip"
);
assert_eq!(loaded.blocks, doc.blocks, "block stream round-trip");
// Chunks: get_chunk for each id.
for c in &chunks {
let back = store
.get_chunk(&c.chunk_id)
.expect("get_chunk err")
.expect("get_chunk Some");
assert_eq!(&back, c, "chunk round-trip mismatch for {}", c.chunk_id.0);
}
}

View File

@@ -0,0 +1,241 @@
//! Idempotency: re-ingesting the same `(workspace_path, asset_id,
//! parser_version)` keeps documents at one row but bumps `doc_version`
//! and replaces blocks/chunks rather than duplicating them.
use std::path::PathBuf;
use kb_core::{
AssetId, AssetStorage, Block, CanonicalDocument, Checksum, Chunk, ChunkerVersion,
CommonBlock, DocumentId, DocumentStore, HeadingBlock, Lang, MediaType, Metadata,
ParserVersion, Provenance, RawAsset, SourceSpan, SourceType, SourceUri, TextBlock,
TrustLevel, WorkspacePath,
};
use kb_store_sqlite::SqliteStore;
use time::OffsetDateTime;
mod common;
fn make_asset() -> RawAsset {
let bytes = b"dummy";
RawAsset {
asset_id: AssetId("a".repeat(32)),
source_uri: SourceUri::File(PathBuf::from("/tmp/foo.md")),
workspace_path: WorkspacePath::new("notes/foo.md".into()).unwrap(),
media_type: MediaType::Markdown,
byte_len: bytes.len() as u64,
checksum: Checksum(blake3::hash(bytes).to_hex().to_string()),
discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
stored: AssetStorage::Reference {
path: PathBuf::from("/tmp/foo.md"),
sha: Checksum(blake3::hash(bytes).to_hex().to_string()),
},
}
}
fn make_metadata() -> Metadata {
Metadata {
aliases: vec![],
tags: vec!["one".into(), "two".into()],
created_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
updated_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
source_type: SourceType::Markdown,
trust_level: TrustLevel::Primary,
user_id_alias: None,
user: Default::default(),
}
}
fn make_doc() -> CanonicalDocument {
let doc_id = DocumentId("d".repeat(32));
let span = SourceSpan::Line { start: 1, end: 1 };
let block = Block::Heading(HeadingBlock {
common: CommonBlock {
block_id: kb_core::BlockId("b".repeat(32)),
heading_path: vec![],
source_span: span.clone(),
},
level: 1,
text: "Title".into(),
});
let para = Block::Paragraph(TextBlock {
common: CommonBlock {
block_id: kb_core::BlockId("c".repeat(32)),
heading_path: vec!["Title".into()],
source_span: span,
},
text: "body".into(),
inlines: vec![],
});
CanonicalDocument {
doc_id,
source_asset_id: AssetId("a".repeat(32)),
workspace_path: WorkspacePath::new("notes/foo.md".into()).unwrap(),
title: "Title".into(),
lang: Lang("en".into()),
blocks: vec![block, para],
metadata: make_metadata(),
provenance: Provenance { events: vec![] },
parser_version: ParserVersion("test-parser".into()),
schema_version: 1,
doc_version: 1,
}
}
fn make_chunks(doc_id: &DocumentId) -> Vec<Chunk> {
vec![Chunk {
chunk_id: kb_core::ChunkId("e".repeat(32)),
doc_id: doc_id.clone(),
block_ids: vec![kb_core::BlockId("b".repeat(32))],
text: "Title\n\nbody".into(),
heading_path: vec!["Title".into()],
source_spans: vec![SourceSpan::Line { start: 1, end: 1 }],
token_estimate: 5,
chunker_version: ChunkerVersion("md-heading-v1".into()),
policy_hash: "deadbeefdeadbeef".into(),
}]
}
#[test]
fn put_document_idempotent_bumps_doc_version() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
let asset = make_asset();
store.put_asset(&asset).expect("put_asset 1");
let doc = make_doc();
store.put_document(&doc).expect("put_document 1");
// First ingest → exactly one row, doc_version=1.
let (count, dv1): (i64, i64) = env.with_conn(|c| {
c.query_row(
"SELECT COUNT(*), MAX(doc_version) FROM documents WHERE doc_id = ?",
[&doc.doc_id.0],
|r| Ok((r.get(0)?, r.get(1)?)),
)
});
assert_eq!(count, 1);
assert_eq!(dv1, 1);
// Re-ingest the same doc → still one row, doc_version=2.
store.put_document(&doc).expect("put_document 2");
let (count2, dv2): (i64, i64) = env.with_conn(|c| {
c.query_row(
"SELECT COUNT(*), MAX(doc_version) FROM documents WHERE doc_id = ?",
[&doc.doc_id.0],
|r| Ok((r.get(0)?, r.get(1)?)),
)
});
assert_eq!(count2, 1, "second put must not duplicate the row");
assert_eq!(dv2, 2, "doc_version must increment on re-ingest");
// Tags were re-derived: still exactly the two original tags.
let tags: Vec<String> = env.with_conn(|c| {
let mut stmt =
c.prepare("SELECT tag FROM document_tags WHERE doc_id = ? ORDER BY tag")?;
let rows = stmt.query_map([&doc.doc_id.0], |r| r.get::<_, String>(0))?;
rows.collect::<rusqlite::Result<Vec<String>>>()
});
assert_eq!(tags, vec!["one".to_string(), "two".to_string()]);
}
#[test]
fn put_blocks_and_put_chunks_replace_not_duplicate() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
let asset = make_asset();
store.put_asset(&asset).unwrap();
let doc = make_doc();
store.put_document(&doc).unwrap();
store.put_blocks(&doc.doc_id, &doc.blocks).unwrap();
store.put_chunks(&doc.doc_id, &make_chunks(&doc.doc_id)).unwrap();
let (b1, ch1): (i64, i64) = env.with_conn(|c| {
Ok((
c.query_row(
"SELECT COUNT(*) FROM blocks WHERE doc_id = ?",
[&doc.doc_id.0],
|r| r.get(0),
)?,
c.query_row(
"SELECT COUNT(*) FROM chunks WHERE doc_id = ?",
[&doc.doc_id.0],
|r| r.get(0),
)?,
))
});
assert_eq!(b1, 2);
assert_eq!(ch1, 1);
// Re-put same data → counts unchanged (DELETE-then-INSERT).
store.put_blocks(&doc.doc_id, &doc.blocks).unwrap();
store.put_chunks(&doc.doc_id, &make_chunks(&doc.doc_id)).unwrap();
let (b2, ch2): (i64, i64) = env.with_conn(|c| {
Ok((
c.query_row(
"SELECT COUNT(*) FROM blocks WHERE doc_id = ?",
[&doc.doc_id.0],
|r| r.get(0),
)?,
c.query_row(
"SELECT COUNT(*) FROM chunks WHERE doc_id = ?",
[&doc.doc_id.0],
|r| r.get(0),
)?,
))
});
assert_eq!(b2, 2, "blocks must not double on re-put");
assert_eq!(ch2, 1, "chunks must not double on re-put");
}
/// `put_blocks` runs in a transaction. If we feed it a block whose
/// `doc_id` references a document that does not exist, the FK
/// constraint (`blocks.doc_id REFERENCES documents(doc_id)`) trips,
/// the transaction rolls back, and the table count is unchanged.
#[test]
fn put_blocks_transactional_rollback_on_fk_violation() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
let asset = make_asset();
store.put_asset(&asset).unwrap();
let doc = make_doc();
store.put_document(&doc).unwrap();
// Establish a baseline row in `blocks`.
store.put_blocks(&doc.doc_id, &doc.blocks).unwrap();
let baseline: i64 = env.with_conn(|c| {
c.query_row("SELECT COUNT(*) FROM blocks", [], |r| r.get(0))
});
assert_eq!(baseline, 2);
// Now ask put_blocks to write to a doc_id that does NOT exist.
// The implementation issues `DELETE FROM blocks WHERE doc_id = ?`
// (no-op for the missing doc) followed by INSERTs that violate the
// FK constraint. The whole tx must roll back, so `blocks` count
// stays at `baseline`.
let phantom = DocumentId("0".repeat(32));
let phantom_blocks = vec![Block::Heading(HeadingBlock {
common: CommonBlock {
block_id: kb_core::BlockId("9".repeat(32)),
heading_path: vec![],
source_span: SourceSpan::Line { start: 1, end: 1 },
},
level: 1,
text: "phantom".into(),
})];
let res = store.put_blocks(&phantom, &phantom_blocks);
assert!(res.is_err(), "FK violation must surface as Err");
let after: i64 = env.with_conn(|c| {
c.query_row("SELECT COUNT(*) FROM blocks", [], |r| r.get(0))
});
assert_eq!(
after, baseline,
"transaction must roll back; blocks count must be unchanged"
);
}

View File

@@ -0,0 +1,99 @@
//! Snapshot test pinning the JSON wire form of `kb_core::IngestReport`
//! for an inline fixture run. The store crate doesn't (yet) write
//! IngestReports — that's `kb-app`'s job — but the wire schema lives in
//! `kb-core`, and we want a determinism pin that fails loudly if the
//! shape drifts.
//!
//! Set `UPDATE_SNAPSHOTS=1` to re-bake the baseline.
use std::path::PathBuf;
use kb_core::{
AssetId, ChunkerVersion, DocumentId, IngestItem, IngestItemKind, IngestReport,
ParserVersion, SourceScope, WorkspacePath,
};
use serde_json::Value;
fn baseline_path() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("snapshots")
.join("ingest_report.snapshot.json")
}
fn fixture_report() -> IngestReport {
IngestReport {
scope: SourceScope {
root: PathBuf::from("/home/u/KB"),
include: vec!["**/*.md".into()],
exclude: vec![".git/**".into()],
},
scanned: 3,
new: 2,
updated: 1,
skipped: 0,
errors: 0,
duration_ms: 187,
items: Some(vec![
IngestItem {
kind: IngestItemKind::New,
doc_id: Some(DocumentId("a".repeat(32))),
doc_path: WorkspacePath::new("notes/alpha.md".into()).unwrap(),
asset_id: Some(AssetId("a".repeat(32))),
byte_len: Some(1234),
block_count: Some(7),
chunk_count: Some(3),
parser_version: Some(ParserVersion("pulldown-cmark-0.x".into())),
chunker_version: Some(ChunkerVersion("md-heading-v1".into())),
warnings: vec![],
error: None,
},
IngestItem {
kind: IngestItemKind::Updated,
doc_id: Some(DocumentId("b".repeat(32))),
doc_path: WorkspacePath::new("notes/beta.md".into()).unwrap(),
asset_id: Some(AssetId("b".repeat(32))),
byte_len: Some(2048),
block_count: Some(12),
chunk_count: Some(5),
parser_version: Some(ParserVersion("pulldown-cmark-0.x".into())),
chunker_version: Some(ChunkerVersion("md-heading-v1".into())),
warnings: vec!["malformed frontmatter".into()],
error: None,
},
]),
}
}
#[test]
fn ingest_report_wire_form_is_stable() {
let report = fixture_report();
let actual = serde_json::to_value(&report).unwrap();
let baseline = match std::fs::read_to_string(baseline_path()) {
Ok(s) => s,
Err(_) if std::env::var("UPDATE_SNAPSHOTS").is_ok() => {
std::fs::create_dir_all(baseline_path().parent().unwrap()).unwrap();
let pretty = serde_json::to_string_pretty(&actual).unwrap();
std::fs::write(baseline_path(), format!("{pretty}\n")).unwrap();
return;
}
Err(e) => panic!(
"missing baseline {}; run with UPDATE_SNAPSHOTS=1: {e}",
baseline_path().display()
),
};
let expected: Value = serde_json::from_str(&baseline).unwrap();
if actual != expected {
if std::env::var("UPDATE_SNAPSHOTS").is_ok() {
let pretty = serde_json::to_string_pretty(&actual).unwrap();
std::fs::write(baseline_path(), format!("{pretty}\n")).unwrap();
return;
}
let pretty = serde_json::to_string_pretty(&actual).unwrap();
panic!(
"ingest_report snapshot drift\n\
--- expected ({}) ---\n{baseline}\n\
--- actual ---\n{pretty}",
baseline_path().display()
);
}
}

View File

@@ -0,0 +1,90 @@
//! `JobRepo` smoke tests: create → progress → finish, list filters.
use kb_core::{JobFilter, JobKind, JobRepo, JobStatus};
use kb_store_sqlite::SqliteStore;
use serde_json::json;
mod common;
#[test]
fn create_then_progress_then_finish() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
let id = store
.create(JobKind::Ingest, json!({"path": "notes/x.md"}))
.unwrap();
// Status starts pending.
let row = store.list(&JobFilter::default()).unwrap();
assert_eq!(row.len(), 1);
assert_eq!(row[0].status, JobStatus::Pending);
// First progress flips pending → running.
store
.update_progress(&id, json!({"processed": 1, "total": 10}))
.unwrap();
let row = store.list(&JobFilter::default()).unwrap();
assert_eq!(row[0].status, JobStatus::Running);
assert_eq!(row[0].progress.as_ref().unwrap()["total"], json!(10));
// Finish with success.
store
.finish(&id, JobStatus::Succeeded, None)
.unwrap();
let row = store.list(&JobFilter::default()).unwrap();
assert_eq!(row[0].status, JobStatus::Succeeded);
assert!(row[0].finished_at.is_some());
assert!(row[0].error.is_none());
}
#[test]
fn finish_with_error_message_is_round_trippable() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
let id = store.create(JobKind::Embed, json!({})).unwrap();
store
.finish(&id, JobStatus::Failed, Some("boom: model not pulled"))
.unwrap();
let row = store.list(&JobFilter::default()).unwrap();
assert_eq!(row[0].status, JobStatus::Failed);
assert_eq!(
row[0].error.as_deref(),
Some("boom: model not pulled"),
"error message must round-trip"
);
}
#[test]
fn list_filters_status_and_kind() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
// Two ingest jobs (one finished succeeded, one pending) + one embed.
let a = store.create(JobKind::Ingest, json!({"a": 1})).unwrap();
let _b = store.create(JobKind::Ingest, json!({"b": 1})).unwrap();
let _c = store.create(JobKind::Embed, json!({"c": 1})).unwrap();
store.finish(&a, JobStatus::Succeeded, None).unwrap();
let by_status_succeeded = store
.list(&JobFilter {
status: Some(JobStatus::Succeeded),
kind: None,
})
.unwrap();
assert_eq!(by_status_succeeded.len(), 1);
assert_eq!(by_status_succeeded[0].kind, JobKind::Ingest);
let by_kind_embed = store
.list(&JobFilter {
status: None,
kind: Some(JobKind::Embed),
})
.unwrap();
assert_eq!(by_kind_embed.len(), 1);
assert_eq!(by_kind_embed[0].kind, JobKind::Embed);
}

View File

@@ -0,0 +1,140 @@
//! `DocumentStore::list_documents` filter coverage.
use std::path::PathBuf;
use kb_core::{
AssetId, AssetStorage, Block, CanonicalDocument, Checksum, CommonBlock, DocFilter,
DocumentId, DocumentStore, HeadingBlock, Lang, MediaType, Metadata, ParserVersion,
Provenance, RawAsset, SourceSpan, SourceType, SourceUri, TrustLevel, WorkspacePath,
};
use kb_store_sqlite::SqliteStore;
use time::OffsetDateTime;
mod common;
fn make_doc(
suffix: char,
workspace_path: &str,
lang: &str,
tags: Vec<&str>,
trust: TrustLevel,
) -> (RawAsset, CanonicalDocument) {
let bytes: Vec<u8> = vec![suffix as u8; 16];
let cs = blake3::hash(&bytes).to_hex().to_string();
let asset_id = AssetId(format!("{suffix}").repeat(32));
let asset = RawAsset {
asset_id: asset_id.clone(),
source_uri: SourceUri::File(PathBuf::from(format!("/tmp/{suffix}.md"))),
workspace_path: WorkspacePath::new(workspace_path.into()).unwrap(),
media_type: MediaType::Markdown,
byte_len: bytes.len() as u64,
checksum: Checksum(cs.clone()),
discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
stored: AssetStorage::Reference {
path: PathBuf::from(format!("/tmp/{suffix}.md")),
sha: Checksum(cs),
},
};
let doc_id = DocumentId(format!("d{suffix}").repeat(16));
let block = Block::Heading(HeadingBlock {
common: CommonBlock {
block_id: kb_core::BlockId(format!("b{suffix}").repeat(16)),
heading_path: vec![],
source_span: SourceSpan::Line { start: 1, end: 1 },
},
level: 1,
text: format!("Title {suffix}"),
});
let metadata = Metadata {
aliases: vec![],
tags: tags.into_iter().map(String::from).collect(),
created_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
updated_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
source_type: SourceType::Markdown,
trust_level: trust,
user_id_alias: None,
user: Default::default(),
};
let doc = CanonicalDocument {
doc_id,
source_asset_id: asset_id,
workspace_path: asset.workspace_path.clone(),
title: format!("Title {suffix}"),
lang: Lang(lang.into()),
blocks: vec![block],
metadata,
provenance: Provenance { events: vec![] },
parser_version: ParserVersion("test".into()),
schema_version: 1,
doc_version: 1,
};
(asset, doc)
}
#[test]
fn list_documents_filters_lang_and_tags() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
for (asset, doc) in [
make_doc('a', "notes/a.md", "en", vec!["rust", "kb"], TrustLevel::Primary),
make_doc('b', "notes/b.md", "ko", vec!["rust"], TrustLevel::Secondary),
make_doc('c', "papers/c.md", "en", vec!["bio"], TrustLevel::Generated),
] {
store.put_asset(&asset).unwrap();
store.put_document(&doc).unwrap();
}
// No filter → all three docs.
let all = store.list_documents(&DocFilter::default()).unwrap();
assert_eq!(all.len(), 3);
// lang filter.
let en = store
.list_documents(&DocFilter {
lang: Some(Lang("en".into())),
..Default::default()
})
.unwrap();
assert_eq!(en.len(), 2);
assert!(en.iter().all(|d| d.lang == Lang("en".into())));
// path glob.
let papers = store
.list_documents(&DocFilter {
path_glob: Some("papers/*.md".into()),
..Default::default()
})
.unwrap();
assert_eq!(papers.len(), 1);
assert_eq!(papers[0].doc_path.0, "papers/c.md");
// tags_any.
let rust = store
.list_documents(&DocFilter {
tags_any: vec!["rust".into()],
..Default::default()
})
.unwrap();
assert_eq!(rust.len(), 2);
// tags must be hydrated on the result.
for d in &rust {
assert!(
d.tags.iter().any(|t| t == "rust"),
"expected `rust` tag on {}: {:?}",
d.doc_path.0,
d.tags
);
}
// trust_min — Primary only.
let primary = store
.list_documents(&DocFilter {
trust_min: Some(TrustLevel::Primary),
..Default::default()
})
.unwrap();
assert_eq!(primary.len(), 1);
assert_eq!(primary[0].trust_level, TrustLevel::Primary);
}

View File

@@ -0,0 +1,83 @@
//! Migration test: a fresh DB, after `run_migrations`, exposes every
//! table and index P1 needs (per §5.1§5.7).
use kb_store_sqlite::SqliteStore;
mod common;
#[test]
fn fresh_db_has_all_p1_tables_and_indexes() {
let env = common::TestEnv::new();
let store = SqliteStore::open(&env.config()).expect("open");
store.run_migrations().expect("run migrations");
// Pull the list of user tables from sqlite_master.
let tables: Vec<String> = env.with_conn(|c| {
let mut stmt = c.prepare(
"SELECT name FROM sqlite_master
WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
ORDER BY name",
)?;
let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
let tables: rusqlite::Result<Vec<String>> = rows.collect();
tables
});
let required = [
"answers",
"assets",
"blocks",
"chunks",
"document_tags",
"documents",
"embedding_records",
"eval_query_results",
"eval_runs",
"ingest_runs",
"jobs",
// refinery's own bookkeeping table (`refinery_schema_history`)
// also lands here; we don't pin it but it's expected.
"migrations",
"schema_meta",
];
for t in required {
assert!(
tables.iter().any(|n| n == t),
"table `{t}` missing; got {tables:?}"
);
}
// Pin the documented indexes (subset that matters for hot paths).
let indexes: Vec<String> = env.with_conn(|c| {
let mut stmt = c.prepare(
"SELECT name FROM sqlite_master
WHERE type = 'index' AND name NOT LIKE 'sqlite_%'
ORDER BY name",
)?;
let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
let idx: rusqlite::Result<Vec<String>> = rows.collect();
idx
});
for i in [
"idx_assets_workspace_path",
"idx_assets_media_type",
"idx_docs_workspace_path",
"idx_docs_lang",
"idx_docs_source_type",
"idx_document_tags_tag",
"idx_blocks_doc_id",
"idx_chunks_doc_id",
"idx_chunks_chunker_version",
"idx_embed_chunk",
"idx_embed_model",
"idx_jobs_status",
"idx_jobs_kind",
"idx_answers_created_at",
"idx_answers_grounded",
] {
assert!(
indexes.iter().any(|n| n == i),
"index `{i}` missing; got {indexes:?}"
);
}
}

View File

@@ -11,6 +11,7 @@
"heading_path": [
"Alpha"
],
"policy_hash": "de6868f3b7949242",
"source_spans": [
{
"end": 1,
@@ -43,6 +44,7 @@
"Alpha",
"Alpha Sub"
],
"policy_hash": "de6868f3b7949242",
"source_spans": [
{
"end": 7,
@@ -69,6 +71,7 @@
"Alpha",
"Alpha Sub"
],
"policy_hash": "de6868f3b7949242",
"source_spans": [
{
"end": 53,
@@ -90,6 +93,7 @@
"heading_path": [
"Beta"
],
"policy_hash": "de6868f3b7949242",
"source_spans": [
{
"end": 55,
@@ -115,6 +119,7 @@
"heading_path": [
"Beta"
],
"policy_hash": "de6868f3b7949242",
"source_spans": [
{
"end": 64,
@@ -135,6 +140,7 @@
"heading_path": [
"Beta"
],
"policy_hash": "de6868f3b7949242",
"source_spans": [
{
"end": 66,
@@ -157,6 +163,7 @@
"heading_path": [
"Gamma"
],
"policy_hash": "de6868f3b7949242",
"source_spans": [
{
"end": 68,
@@ -188,6 +195,7 @@
"heading_path": [
"Gamma"
],
"policy_hash": "de6868f3b7949242",
"source_spans": [
{
"end": 72,

View File

@@ -1,7 +1,10 @@
-- V001__init.sql — schema bootstrap.
-- Per design §5.1 + §5.9. Only the meta + migrations tables land here;
-- data tables (assets, documents, blocks, chunks, fts5, …) ship in later
-- phase-specific migrations (P1-6 / P2-1 / P3-3).
-- V001__init.sql — full P1 schema bootstrap.
-- Per design §5.1 (meta), §5.2 (assets), §5.3 (documents/document_tags),
-- §5.4 (blocks), §5.5 (chunks — FTS5 virtual table + triggers DEFERRED to
-- V002 in P2-1), §5.6 (embedding_records), §5.7 (jobs / ingest_runs /
-- answers / eval_runs / eval_query_results).
-- §5.1 Migrations meta -------------------------------------------------------
CREATE TABLE schema_meta (
key TEXT PRIMARY KEY,
@@ -13,3 +16,167 @@ CREATE TABLE migrations (
applied_at TEXT NOT NULL,
description TEXT NOT NULL
);
-- §5.2 Assets ----------------------------------------------------------------
CREATE TABLE assets (
asset_id TEXT PRIMARY KEY,
source_uri TEXT NOT NULL,
workspace_path TEXT NOT NULL,
media_type TEXT NOT NULL,
byte_len INTEGER NOT NULL,
checksum TEXT NOT NULL,
storage_kind TEXT NOT NULL CHECK (storage_kind IN ('copied','reference')),
storage_path TEXT NOT NULL,
discovered_at TEXT NOT NULL
);
CREATE UNIQUE INDEX idx_assets_workspace_path ON assets(workspace_path);
CREATE INDEX idx_assets_media_type ON assets(media_type);
-- §5.3 Documents -------------------------------------------------------------
CREATE TABLE documents (
doc_id TEXT PRIMARY KEY,
asset_id TEXT NOT NULL REFERENCES assets(asset_id) ON DELETE RESTRICT,
workspace_path TEXT NOT NULL,
title TEXT,
lang TEXT,
source_type TEXT NOT NULL,
trust_level TEXT NOT NULL,
parser_version TEXT NOT NULL,
doc_version INTEGER NOT NULL,
schema_version INTEGER NOT NULL,
metadata_json TEXT NOT NULL,
provenance_json TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE UNIQUE INDEX idx_docs_workspace_path ON documents(workspace_path);
CREATE INDEX idx_docs_lang ON documents(lang);
CREATE INDEX idx_docs_source_type ON documents(source_type);
CREATE TABLE document_tags (
doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE,
tag TEXT NOT NULL,
PRIMARY KEY (doc_id, tag)
);
CREATE INDEX idx_document_tags_tag ON document_tags(tag);
-- §5.4 Blocks ----------------------------------------------------------------
CREATE TABLE blocks (
block_id TEXT PRIMARY KEY,
doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE,
kind TEXT NOT NULL,
heading_path_json TEXT NOT NULL,
ordinal INTEGER NOT NULL,
source_span_json TEXT NOT NULL,
payload_json TEXT NOT NULL
);
CREATE INDEX idx_blocks_doc_id ON blocks(doc_id);
-- §5.5 Chunks (FTS5 virtual table + triggers deferred to V002 / P2-1) -------
CREATE TABLE chunks (
chunk_id TEXT PRIMARY KEY,
doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE,
text TEXT NOT NULL,
heading_path_json TEXT NOT NULL,
section_label TEXT,
source_spans_json TEXT NOT NULL,
token_estimate INTEGER NOT NULL,
chunker_version TEXT NOT NULL,
policy_hash TEXT NOT NULL,
block_ids_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX idx_chunks_doc_id ON chunks(doc_id);
CREATE INDEX idx_chunks_chunker_version ON chunks(chunker_version);
-- §5.6 Embedding records (P3 — table empty in P1, present for forward compat) -
CREATE TABLE embedding_records (
embedding_id TEXT PRIMARY KEY,
chunk_id TEXT NOT NULL REFERENCES chunks(chunk_id) ON DELETE CASCADE,
model_id TEXT NOT NULL,
model_version TEXT NOT NULL,
dimensions INTEGER NOT NULL,
lance_table TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(chunk_id, model_id, model_version, dimensions)
);
CREATE INDEX idx_embed_chunk ON embedding_records(chunk_id);
CREATE INDEX idx_embed_model ON embedding_records(model_id, model_version, dimensions);
-- §5.7 Jobs / IngestRuns / Answers / EvalRuns -------------------------------
CREATE TABLE jobs (
job_id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('pending','running','succeeded','failed','canceled')),
payload_json TEXT NOT NULL,
progress_json TEXT,
error_json TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
finished_at TEXT
);
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_kind ON jobs(kind);
CREATE TABLE ingest_runs (
run_id TEXT PRIMARY KEY,
scope_json TEXT NOT NULL,
scanned INTEGER NOT NULL,
new_count INTEGER NOT NULL,
updated_count INTEGER NOT NULL,
skipped_count INTEGER NOT NULL,
error_count INTEGER NOT NULL,
duration_ms INTEGER NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT NOT NULL,
items_json TEXT
);
CREATE TABLE answers (
trace_id TEXT PRIMARY KEY,
query TEXT NOT NULL,
answer TEXT NOT NULL,
grounded INTEGER NOT NULL,
refusal_reason TEXT,
model_id TEXT NOT NULL,
model_provider TEXT NOT NULL,
embedding_model_id TEXT,
embedding_dimensions INTEGER,
prompt_template_version TEXT NOT NULL,
retrieval_mode TEXT NOT NULL,
retrieval_k INTEGER NOT NULL,
score_gate REAL NOT NULL,
top_score REAL NOT NULL,
chunks_returned INTEGER NOT NULL,
chunks_used INTEGER NOT NULL,
citations_json TEXT NOT NULL,
packed_chunks_json TEXT,
prompt_tokens INTEGER,
completion_tokens INTEGER,
latency_ms INTEGER,
created_at TEXT NOT NULL
);
CREATE INDEX idx_answers_created_at ON answers(created_at);
CREATE INDEX idx_answers_grounded ON answers(grounded);
CREATE TABLE eval_runs (
run_id TEXT PRIMARY KEY,
suite TEXT NOT NULL,
config_snapshot_json TEXT NOT NULL,
aggregate_json TEXT NOT NULL,
commit_hash TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE eval_query_results (
run_id TEXT NOT NULL REFERENCES eval_runs(run_id) ON DELETE CASCADE,
query_id TEXT NOT NULL,
result_json TEXT NOT NULL,
PRIMARY KEY (run_id, query_id)
);