From 17d52461b221b4188661d3263da9bd591c8c5f8e Mon Sep 17 00:00:00 2001 From: altair823 Date: Fri, 1 May 2026 12:11:21 +0000 Subject: [PATCH] =?UTF-8?q?feat(p3-5):=20wire=20kb-app=20facade=20?= =?UTF-8?q?=E2=80=94=20ingest=20/=20search=20/=20list=20/=20inspect?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the P0 `bail!("not yet wired")` stubs in kb-app with real bodies that compose the libraries shipped through P3-4. After this commit, `cargo run -p kb-cli -- index` actually walks the workspace and persists chunks (SQLite + optionally LanceDB), and `cargo run -p kb-cli -- search --mode {lexical,vector,hybrid}` returns real SearchHits with citations. `kb-app::ask` stays stubbed; P4-3 owns it. App lifecycle (crates/kb-app/src/app.rs): - Internal pub(crate) struct App holds the Config plus Arc eagerly, with embedder + LanceVectorStore behind OnceLock> for memoization. First call pays the ~470MB fastembed init / Lance open; subsequent calls return the cached Arc::clone. OnceLock::set race losers fall back to get().cloned() so the lazy-init is concurrent-safe. - One-shot CLI invocations pay the cost once at most. The P9 TUI (which holds an App for the session) gets memoization for free. ingest pipeline (lib.rs): - FsSourceConnector::scan(&scope) → per asset: parse_frontmatter → parse_blocks → build_canonical_document → MdHeadingV1Chunker.chunk → put_asset_with_bytes → put_document → put_blocks → put_chunks. One transaction per document per design §5.8 (kb-store-sqlite's put_* methods own the transactions). - When provider != "none" and dimensions > 0: build embedder once, embed each doc's chunks as Document kind, ensure_table once at the top of the run, then upsert the VectorRecord batch. Lexical-only config (provider == "none") skips both — verified by ingest_provider_none_skips_lance test. - Per-asset parse failures recorded as IngestItemKind::Error with the warning attached; the run continues. Only structural failures (DB unreachable etc.) abort. - Aggregate counts (assets_scanned / new / updated / skipped / errors / chunks_indexed / embeddings_indexed / duration_ms) flow into both the JobRepo progress_json AND a dedicated ingest_runs row written via SqliteStore::record_ingest_run (new pub(crate) helper added to kb-store-sqlite — see below). summary_only=true writes items_json=NULL but still populates the count columns. search dispatch: - SearchMode::Lexical → LexicalRetriever directly. - SearchMode::Vector → VectorRetriever with embedder + LanceVectorStore. - SearchMode::Hybrid → HybridRetriever composing the two. - Vector / Hybrid with provider=none returns a clear error naming the config key to flip ("models.embedding.provider"). list_docs / inspect_doc / inspect_chunk delegate straight to DocumentStore trait methods. Returns Err with actionable message on not-found. Test seam: each public free function has a matching #[doc(hidden)] pub fn *_with_config(cfg, ...) companion that integration tests invoke directly (the public form internally calls load_config()). pub(crate) would not reach across the integration- tests crate boundary; #[doc(hidden)] keeps it out of rustdoc and the function comment flags it as test-only. kb-store-sqlite additions: - pub struct IngestRunRow + pub fn record_ingest_run on SqliteStore for the kb-app aggregate-counts persistence path. Helper writes the ingest_runs row directly with all aggregate columns; jobs table still gets a JobRepo create/update_progress/finish trio in parallel. Tests (11 default, 2 #[ignore] AVX-gated): - ingest_lexical: round-trip, idempotent, summary_only_drops_items, provider_none_skips_lance (asserts no .lance dir on disk), records_ingest_runs_row_with_aggregate_counts, tags_any filter, inspect_doc_not_found, inspect_chunk_not_found. - search_lexical: lexical hits with embedding_model=None, empty_query_returns_empty, vector_mode_with_provider_none returns clear error. - search_vector: hybrid mode end-to-end (#[ignore], AVX), Vector mode embedding_model assertion (#[ignore], AVX). Both run on the AVX VM in ~21s combined (first run pays the model download). - TestEnv pins workspace.root + storage.{data_dir,model_dir} to a TempDir so tests don't touch the user's $HOME/.local/share. - Fixture workspace at crates/kb-app/tests/fixtures/workspace/ has three small markdown files with varied frontmatter (rust+cargo+ python tags) so the tags_any filter test exercises a non-trivial predicate. Workspace 269 passed / 24 ignored / 0 failed (was 261/22). cargo clippy --workspace --all-targets -- -D warnings clean. CLI smoke verified manually: `cargo run -p kb-cli -- index` returns a real IngestReport JSON; `cargo run -p kb-cli -- search "..."` returns hits with citations; `cargo run -p kb-cli -- list docs` lists the indexed documents. Allowed deps respected: kb-source-fs, kb-parse-md, kb-parse-types, kb-normalize, kb-chunk, kb-store-sqlite, kb-search, kb-store-vector, kb-embed, kb-embed-local plus existing tracing / anyhow / serde / toml / dirs and now blake3 (run_id) + time. Forbidden (kb-llm*, kb-rag, kb-tui, kb-desktop, kb-parse-{pdf,image,audio}) absent from cargo tree -p kb-app. Out of scope per spec: ask body (P4-3), --rebuild-fts wiring, --resume checkpointing (P+), --watch (P+), TUI / desktop integration (P9 consumes this facade). Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 14 + crates/kb-app/Cargo.toml | 16 + crates/kb-app/src/app.rs | 126 ++++ crates/kb-app/src/lib.rs | 714 +++++++++++++++++- crates/kb-app/tests/common/mod.rs | 104 +++ .../kb-app/tests/fixtures/workspace/intro.md | 24 + .../tests/fixtures/workspace/notes/cargo.md | 23 + .../tests/fixtures/workspace/notes/python.md | 23 + crates/kb-app/tests/ingest_lexical.rs | 220 ++++++ crates/kb-app/tests/search_lexical.rs | 69 ++ crates/kb-app/tests/search_vector.rs | 89 +++ crates/kb-store-sqlite/src/jobs.rs | 73 ++ crates/kb-store-sqlite/src/lib.rs | 1 + 13 files changed, 1480 insertions(+), 16 deletions(-) create mode 100644 crates/kb-app/src/app.rs create mode 100644 crates/kb-app/tests/common/mod.rs create mode 100644 crates/kb-app/tests/fixtures/workspace/intro.md create mode 100644 crates/kb-app/tests/fixtures/workspace/notes/cargo.md create mode 100644 crates/kb-app/tests/fixtures/workspace/notes/python.md create mode 100644 crates/kb-app/tests/ingest_lexical.rs create mode 100644 crates/kb-app/tests/search_lexical.rs create mode 100644 crates/kb-app/tests/search_vector.rs diff --git a/Cargo.lock b/Cargo.lock index 0fc9172..f9e1952 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3334,11 +3334,25 @@ name = "kb-app" version = "0.1.0" dependencies = [ "anyhow", + "blake3", "dirs 5.0.1", + "kb-chunk", "kb-config", "kb-core", + "kb-embed", + "kb-embed-local", + "kb-normalize", + "kb-parse-md", + "kb-parse-types", + "kb-search", + "kb-source-fs", + "kb-store-sqlite", + "kb-store-vector", + "rusqlite", "serde", "serde_json", + "tempfile", + "time", "toml", "tracing", "tracing-appender", diff --git a/crates/kb-app/Cargo.toml b/crates/kb-app/Cargo.toml index b224590..a00639e 100644 --- a/crates/kb-app/Cargo.toml +++ b/crates/kb-app/Cargo.toml @@ -10,11 +10,27 @@ description = "Facade — orchestrates components for kb-cli/tui/desktop" [dependencies] kb-core = { path = "../kb-core" } kb-config = { path = "../kb-config" } +kb-source-fs = { path = "../kb-source-fs" } +kb-parse-md = { path = "../kb-parse-md" } +kb-parse-types = { path = "../kb-parse-types" } +kb-normalize = { path = "../kb-normalize" } +kb-chunk = { path = "../kb-chunk" } +kb-store-sqlite = { path = "../kb-store-sqlite" } +kb-store-vector = { path = "../kb-store-vector" } +kb-search = { path = "../kb-search" } +kb-embed = { path = "../kb-embed" } +kb-embed-local = { path = "../kb-embed-local" } anyhow = { workspace = true } +blake3 = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +time = { workspace = true } tracing = { workspace = true } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] } tracing-appender = "0.2" toml = "0.8" dirs = "5" + +[dev-dependencies] +rusqlite = { workspace = true } +tempfile = { workspace = true } diff --git a/crates/kb-app/src/app.rs b/crates/kb-app/src/app.rs new file mode 100644 index 0000000..01266d8 --- /dev/null +++ b/crates/kb-app/src/app.rs @@ -0,0 +1,126 @@ +//! `App` — internal lifecycle struct (§7). +//! +//! A single `App` represents one CLI invocation's worth of state: a +//! resolved `Config`, an open `SqliteStore`, and (when embeddings are +//! enabled) an `Embedder` + `LanceVectorStore`. Each public free +//! function on `kb-app` wraps `App::open(config)` once, runs the +//! requested op, and drops everything on return. +//! +//! The struct is `pub(crate)` because it is an internal seam: `kb-cli` +//! calls only the free functions on the crate root. `kb-tui` (P9) is +//! expected to hold one `App` for the session, at which point the +//! struct may need to be promoted to `pub`. Until then, keep it +//! private to insulate the wiring shape from downstream callers. +//! +//! ## Embedder + Vector store lifetime +//! +//! `App::open` builds the SQLite store unconditionally. The embedder +//! and vector store are *lazy + memoized* — built on first call to +//! [`App::embedder`] / [`App::vector`] and cached in `OnceLock`s — so +//! a long-lived `App` (e.g., the P9 TUI session) pays the ~470 MB +//! ONNX init plus Lance reopen cost exactly once. +//! +//! - `kb list` / `kb inspect` never need them. +//! - `kb search --mode lexical` never needs them. +//! - `kb ingest` and `kb search --mode {vector,hybrid}` always do. +//! +//! Building eagerly would force every CLI invocation to load ~470 MB of +//! ONNX weights, which is the dominant cold-start cost. The lazy +//! pattern keeps the lexical-only paths instant; the memoization makes +//! the TUI's repeated searches cheap after the first. +//! +//! Embeddings can also be **disabled** workspace-wide via +//! `config.models.embedding.provider = "none"` (or `dimensions = 0`); +//! in that mode [`App::embedder`] returns `None` and callers must fall +//! back to lexical-only search. + +use std::sync::{Arc, OnceLock}; + +use anyhow::{Context, Result}; + +use kb_core::Embedder; +use kb_embed_local::FastembedEmbedder; +use kb_store_sqlite::SqliteStore; +use kb_store_vector::LanceVectorStore; + +/// Internal facade state. See module docs for lifetime rules. +pub(crate) struct App { + pub(crate) config: kb_config::Config, + pub(crate) sqlite: Arc, + /// Memoized embedder — built lazily on first `embedder()` call when + /// embeddings are enabled. `OnceLock` keeps the struct `Sync` and + /// the build path cold-only-once. + embedder: OnceLock>, + /// Memoized vector store — built lazily on first `vector()` call + /// when embeddings are enabled. Same rationale as `embedder`. + vector: OnceLock>, +} + +impl App { + /// Open the SQLite store and run migrations. Does NOT load the + /// embedder or vector store — those are lazy via + /// [`Self::embedder`] / [`Self::vector`]. + /// + /// **Caveat:** must be called from a synchronous context. + /// Downstream `LanceVectorStore::new` (called by [`Self::vector`]) + /// internally drives a `tokio::Runtime::block_on`, which panics if + /// invoked from inside another tokio runtime. + pub(crate) fn open(config: kb_config::Config) -> Result { + let sqlite = SqliteStore::open(&config).context("kb-app: open SqliteStore")?; + sqlite + .run_migrations() + .context("kb-app: run SqliteStore migrations")?; + Ok(Self { + config, + sqlite: Arc::new(sqlite), + embedder: OnceLock::new(), + vector: OnceLock::new(), + }) + } + + /// Returns `true` when the workspace has embeddings turned off + /// (`provider = "none"` or `dimensions = 0`). Lexical-only mode. + pub(crate) fn embeddings_disabled(&self) -> bool { + let cfg = &self.config.models.embedding; + cfg.provider == "none" || cfg.dimensions == 0 + } + + /// Build (or reuse) the fastembed embedder. Returns `None` when the + /// workspace is in lexical-only mode (see + /// [`Self::embeddings_disabled`]). The first call pays the ~470 MB + /// ONNX load; subsequent calls are a single `OnceLock` read. + pub(crate) fn embedder(&self) -> Result>> { + if self.embeddings_disabled() { + return Ok(None); + } + if let Some(e) = self.embedder.get() { + return Ok(Some(e.clone())); + } + let emb: Arc = Arc::new( + FastembedEmbedder::new(&self.config) + .context("kb-app: load FastembedEmbedder")?, + ); + // `set` returns Err if another thread won the race; in that case + // the loser still returns the (now-cached) winner via `get()`. + let _ = self.embedder.set(emb.clone()); + Ok(Some(self.embedder.get().cloned().unwrap_or(emb))) + } + + /// Build (or reuse) the LanceDB-backed vector store. Returns `None` + /// when embeddings are disabled. Memoized via `OnceLock` for the + /// same reasons as [`Self::embedder`]. + pub(crate) fn vector(&self) -> Result>> { + if self.embeddings_disabled() { + return Ok(None); + } + if let Some(v) = self.vector.get() { + return Ok(Some(v.clone())); + } + let store = Arc::new( + LanceVectorStore::new(&self.config, self.sqlite.clone()) + .context("kb-app: open LanceVectorStore")?, + ); + let _ = self.vector.set(store.clone()); + Ok(Some(self.vector.get().cloned().unwrap_or(store))) + } +} diff --git a/crates/kb-app/src/lib.rs b/crates/kb-app/src/lib.rs index 8196bfd..dbad263 100644 --- a/crates/kb-app/src/lib.rs +++ b/crates/kb-app/src/lib.rs @@ -1,8 +1,11 @@ //! `kb-app` — facade that downstream `kb-cli` / `kb-tui` / `kb-desktop` //! depend on (§7, §8). //! -//! P0 implementations stub out — the signatures are frozen so that later -//! phases swap in real bodies without breaking call sites. +//! P3-5 swapped the `bail!("not yet wired")` stubs for real bodies that +//! compose the libraries shipped through P3-4. After this task, `kb +//! ingest` actually walks a workspace and persists chunks, and `kb +//! search --mode {lexical,vector,hybrid}` returns real `SearchHit`s. +//! `kb-app::ask` stays stubbed (P4-3 owns it). //! //! ## Wire-schema convention //! @@ -16,20 +19,48 @@ //! surface (no domain-side equivalent in `kb-core`). When adding a new //! facade function in a later phase, remember: keep the return type pure, //! and add a matching `wire_*` helper in `kb-cli/src/wire.rs`. +//! +//! ## Test seam +//! +//! Each public free function has a `pub(crate) fn *_with_config` +//! companion that takes a fully-resolved `Config` directly. Public +//! callers go through the top-level functions which call +//! `load_config()`; integration tests call the `_with_config` variant +//! and pass a mutated `Config` pointing at a `TempDir` to avoid +//! polluting the user's real `data_dir` / `model_dir`. use std::path::PathBuf; +use std::sync::Arc; -use anyhow::bail; +use anyhow::{Context, anyhow}; use serde::{Deserialize, Serialize}; +use kb_chunk::MdHeadingV1Chunker; use kb_core::{ - Answer, CanonicalDocument, Chunk, ChunkId, DocFilter, DocSummary, DocumentId, - IngestReport, SearchHit, SearchMode, SearchQuery, SourceScope, + Answer, CanonicalDocument, Chunk, ChunkId, ChunkPolicy, ChunkerVersion, Chunker, + DocFilter, DocSummary, DocumentId, DocumentStore, Embedder, EmbeddingInput, + EmbeddingKind, IndexVersion, IngestReport, ParserVersion, RawAsset, Retriever, + SearchHit, SearchMode, SearchQuery, SourceConnector, SourceScope, SourceUri, + VectorRecord, VectorStore, }; +use kb_normalize::build_canonical_document; +use kb_parse_md::{BodyHints, parse_blocks, parse_frontmatter}; +use kb_search::{HybridRetriever, LexicalRetriever, VectorRetriever}; +use kb_source_fs::FsSourceConnector; +mod app; pub mod doctor_signal; pub mod logging; +use app::App; + +/// Parser-version label persisted in `documents.parser_version` for +/// every Markdown file ingested through the `kb-parse-md` pipeline. +/// Kept in lock-step with the literal used in the `kb-store-sqlite` +/// idempotency / round-trip tests so the version label written by the +/// app and the one used in cross-crate fixtures match. +const KB_PARSE_MD_VERSION: &str = "pulldown-cmark-0.x"; + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct AskOpts { pub k: usize, @@ -101,28 +132,679 @@ fn expand_tilde(s: &str) -> PathBuf { PathBuf::from(s) } -pub fn ingest(_scope: SourceScope, _summary_only: bool) -> anyhow::Result { - bail!("not yet wired (P1-2)") +/// Load the active Config from XDG (or fall back to defaults). Mirrors +/// what `kb-cli` does at the top of every subcommand path; we re-do +/// the load inside each facade entry so callers don't have to thread +/// a Config through. +fn load_config() -> anyhow::Result { + kb_config::Config::load(None) } -pub fn list_docs(_filter: DocFilter) -> anyhow::Result> { - bail!("not yet wired (P1-5)") +// ── ingest ──────────────────────────────────────────────────────────────── + +pub fn ingest(scope: SourceScope, summary_only: bool) -> anyhow::Result { + let config = load_config()?; + ingest_with_config(config, scope, summary_only) } -pub fn inspect_doc(_id: &DocumentId) -> anyhow::Result { - bail!("not yet wired (P1-5)") +/// Test-only seam — kb-cli must call the public free function +/// ([`ingest`]), not this. See module docs. +#[doc(hidden)] +pub fn ingest_with_config( + config: kb_config::Config, + scope: SourceScope, + summary_only: bool, +) -> anyhow::Result { + let started_instant = std::time::Instant::now(); + + let app = App::open(config)?; + + // Walk the workspace. + let connector = FsSourceConnector::new(&app.config) + .context("kb-app::ingest: build FsSourceConnector")?; + let assets = connector + .scan(&scope) + .context("kb-app::ingest: scan workspace")?; + + // Embedder + vector store: build once at the top so the cold-start + // cost is paid once even when the workspace has 1000 markdown files. + let embedder = app.embedder()?; + let vector_store = app.vector()?; + + // If both are present, ensure the table exists for the (model, dim) + // pair so the first per-doc upsert doesn't pay the create-table + // round-trip. + if let (Some(emb), Some(vec)) = (embedder.as_ref(), vector_store.as_ref()) { + let mid = emb.model_id(); + vec.ensure_table(&mid, emb.dimensions()) + .context("kb-app::ingest: ensure Lance table")?; + } + + let parser_version = ParserVersion(KB_PARSE_MD_VERSION.to_string()); + let chunk_policy = chunk_policy_from_config(&app.config); + + // Pre-load every existing doc_id so we can label `IngestItem.kind` + // as `New` vs `Updated` correctly. `list_documents` returns one + // row per `(workspace_path, asset_id)` — index by the deterministic + // `doc_id` recipe input so the first ingest of an unseen file is + // labelled `New`. + let existing_doc_ids: std::collections::HashSet = app + .sqlite + .list_documents(&DocFilter::default()) + .context("kb-app::ingest: list existing documents")? + .into_iter() + .map(|d| d.doc_id.0) + .collect(); + + let started_at = time::OffsetDateTime::now_utc(); + + let mut items: Vec = Vec::new(); + let mut new_count: u32 = 0; + let mut updated_count: u32 = 0; + let mut skipped_count: u32 = 0; + let mut error_count: u32 = 0; + // Aggregate counts surfaced into `ingest_runs` (and tracing). Not + // exposed on `IngestReport` today — `kb_core::IngestReport` is a + // wire-stable struct without these fields — but persisting them + // means audit tooling and `kb jobs` (P+) can recover the totals + // without re-walking the DB. + let mut chunks_indexed: u32 = 0; + let mut embeddings_indexed: u32 = 0; + let scanned_count: u32 = u32::try_from(assets.len()).unwrap_or(u32::MAX); + + let embed_active = embedder.is_some() && vector_store.is_some(); + + for asset in assets { + let item = ingest_one_asset( + &app, + &asset, + &parser_version, + &chunk_policy, + embedder.as_ref(), + vector_store.as_ref(), + &existing_doc_ids, + ); + + let item = match item { + Ok(i) => i, + Err(e) => { + tracing::error!( + target: "kb-app", + path = %asset.workspace_path.0, + error = %e, + "kb-app::ingest: per-file fatal" + ); + error_count = error_count.saturating_add(1); + kb_core::IngestItem { + kind: kb_core::IngestItemKind::Error, + doc_id: None, + doc_path: asset.workspace_path.clone(), + asset_id: Some(asset.asset_id.clone()), + byte_len: Some(asset.byte_len), + block_count: None, + chunk_count: None, + parser_version: None, + chunker_version: None, + warnings: Vec::new(), + error: Some(format!("{e:#}")), + } + } + }; + + match item.kind { + kb_core::IngestItemKind::New => { + new_count = new_count.saturating_add(1); + let n = item.chunk_count.unwrap_or(0); + chunks_indexed = chunks_indexed.saturating_add(n); + if embed_active { + embeddings_indexed = embeddings_indexed.saturating_add(n); + } + } + kb_core::IngestItemKind::Updated => { + updated_count = updated_count.saturating_add(1); + let n = item.chunk_count.unwrap_or(0); + chunks_indexed = chunks_indexed.saturating_add(n); + if embed_active { + embeddings_indexed = embeddings_indexed.saturating_add(n); + } + } + kb_core::IngestItemKind::Skipped => { + skipped_count = skipped_count.saturating_add(1) + } + kb_core::IngestItemKind::Error => { + error_count = error_count.saturating_add(1) + } + } + items.push(item); + } + + // Record a row in `jobs` so `kb jobs` (P+) can list the run. Distinct + // from the `ingest_runs` row written below — the `jobs` table is the + // generic job-lifecycle surface (`kind=ingest`), `ingest_runs` is the + // ingest-specific aggregate counts row. + let payload = serde_json::json!({ + "scope": scope, + "summary_only": summary_only, + }); + let job_id_res = ::create( + &app.sqlite, + kb_core::JobKind::Ingest, + payload, + ); + match job_id_res { + Ok(jid) => { + // Stash the aggregate counts as the job's `progress_json` + // so a future `kb jobs show` can surface them without + // joining `ingest_runs`. + let progress = serde_json::json!({ + "scanned": scanned_count, + "new": new_count, + "updated": updated_count, + "skipped": skipped_count, + "errors": error_count, + "chunks_indexed": chunks_indexed, + "embeddings_indexed": embeddings_indexed, + }); + if let Err(e) = ::update_progress( + &app.sqlite, + &jid, + progress, + ) { + tracing::warn!( + target: "kb-app", + error = %e, + "kb-app::ingest: JobRepo::update_progress failed" + ); + } + if let Err(e) = ::finish( + &app.sqlite, + &jid, + kb_core::JobStatus::Succeeded, + None, + ) { + tracing::warn!( + target: "kb-app", + error = %e, + "kb-app::ingest: JobRepo::finish failed" + ); + } + } + Err(e) => { + tracing::warn!( + target: "kb-app", + error = %e, + "kb-app::ingest: JobRepo::create failed; run not recorded in `jobs`" + ); + } + } + + let duration_ms = u32::try_from(started_instant.elapsed().as_millis()) + .unwrap_or(u32::MAX); + let finished_at = time::OffsetDateTime::now_utc(); + + // Record the ingest_runs row with aggregate counts. + // `summary_only=true` writes `items_json=NULL` (per design §5.7); + // the count columns are populated either way. + let scope_json = serde_json::to_string(&scope) + .context("kb-app::ingest: serialize scope for ingest_runs.scope_json")?; + let items_json: Option = if summary_only { + None + } else { + match serde_json::to_string(&items) { + Ok(s) => Some(s), + Err(e) => { + tracing::warn!( + target: "kb-app", + error = %e, + "kb-app::ingest: failed to serialize items_json; storing NULL" + ); + None + } + } + }; + let run_id = mint_ingest_run_id(&scope_json, started_at); + let row = kb_store_sqlite::IngestRunRow { + run_id: &run_id, + scope_json: &scope_json, + scanned: scanned_count, + new_count, + updated_count, + skipped_count, + error_count, + duration_ms, + started_at, + finished_at, + items_json: items_json.as_deref(), + }; + if let Err(e) = app.sqlite.record_ingest_run(&row) { + tracing::warn!( + target: "kb-app", + error = %e, + "kb-app::ingest: record_ingest_run failed" + ); + } + + tracing::info!( + target: "kb-app", + scanned = scanned_count, + new = new_count, + updated = updated_count, + skipped = skipped_count, + errors = error_count, + chunks_indexed, + embeddings_indexed, + duration_ms, + "kb-app::ingest: run complete" + ); + + Ok(IngestReport { + scope, + scanned: scanned_count, + new: new_count, + updated: updated_count, + skipped: skipped_count, + errors: error_count, + duration_ms, + items: if summary_only { None } else { Some(items) }, + }) } -pub fn inspect_chunk(_id: &ChunkId) -> anyhow::Result { - bail!("not yet wired (P1-5)") +/// Mint a stable 32-hex-char `run_id` for an `ingest_runs` row. +/// `(scope, started_at_nanos)` is enough to make two runs with the +/// same scope started a nanosecond apart distinguish — same shape as +/// the JobId recipe in `kb-store-sqlite::jobs`. +fn mint_ingest_run_id(scope_json: &str, at: time::OffsetDateTime) -> String { + let mut hasher = blake3::Hasher::new(); + hasher.update(scope_json.as_bytes()); + hasher.update(&at.unix_timestamp_nanos().to_be_bytes()); + let hex = hasher.finalize().to_hex().to_string(); + hex[..32].to_string() } -pub fn search(_query: SearchQuery) -> anyhow::Result> { - bail!("not yet wired (P3-1/P4-1)") +/// Trait alias type used to disambiguate the two impls (`DocumentStore` +/// vs `JobRepo`) on the same store. Plain `app.sqlite.create(...)` +/// would pick one based on inherent vs trait methods; we go through +/// `<… as JobRepo>` to be explicit. +type SqliteStoreAlias = kb_store_sqlite::SqliteStore; + +/// Process a single asset: read bytes, parse, normalize, chunk, +/// persist, embed. Per-asset failures bubble up to the caller for +/// labelling as `IngestItemKind::Error` — they do NOT abort the +/// whole run. +fn ingest_one_asset( + app: &App, + asset: &RawAsset, + parser_version: &ParserVersion, + chunk_policy: &ChunkPolicy, + embedder: Option<&Arc>, + vector_store: Option<&Arc>, + existing_doc_ids: &std::collections::HashSet, +) -> anyhow::Result { + tracing::debug!( + target: "kb-app::ingest", + path = %asset.workspace_path.0, + "processing asset" + ); + // Only handle Markdown for now; other media types are P6+ work. + if asset.media_type != kb_core::MediaType::Markdown { + return Ok(kb_core::IngestItem { + kind: kb_core::IngestItemKind::Skipped, + doc_id: None, + doc_path: asset.workspace_path.clone(), + asset_id: Some(asset.asset_id.clone()), + byte_len: Some(asset.byte_len), + block_count: None, + chunk_count: None, + parser_version: None, + chunker_version: None, + warnings: Vec::new(), + error: None, + }); + } + + let path = match &asset.source_uri { + SourceUri::File(p) => p.clone(), + SourceUri::Kb(_) => { + return Ok(kb_core::IngestItem { + kind: kb_core::IngestItemKind::Skipped, + doc_id: None, + doc_path: asset.workspace_path.clone(), + asset_id: Some(asset.asset_id.clone()), + byte_len: Some(asset.byte_len), + block_count: None, + chunk_count: None, + parser_version: None, + chunker_version: None, + warnings: vec![ + "kb:// source URIs are not supported by the fs ingester".into(), + ], + error: None, + }); + } + }; + + let bytes = std::fs::read(&path) + .with_context(|| format!("read asset bytes from {}", path.display()))?; + + let body_hints = build_body_hints(asset); + + // Frontmatter — `parse_frontmatter` returns Ok even on malformed + // frontmatter (warnings are surfaced through the `Vec`). + let (metadata, fm_span, fm_warns) = parse_frontmatter(&bytes, &body_hints) + .context("kb-parse-md::parse_frontmatter")?; + + let body_offset_lines = match fm_span { + Some(span) => count_lines_in(&bytes[..span.end]), + None => 0, + }; + + let (parsed_blocks, blk_warns) = parse_blocks(&bytes[fm_span_end(fm_span)..], body_offset_lines) + .context("kb-parse-md::parse_blocks")?; + + let mut all_warnings = Vec::with_capacity(fm_warns.len() + blk_warns.len()); + all_warnings.extend(fm_warns); + all_warnings.extend(blk_warns); + + // Snapshot warning notes for the IngestItem before the vec is + // consumed by `build_canonical_document`. + let warning_notes: Vec = all_warnings + .iter() + .map(|w| format!("{:?}: {}", w.kind, w.note)) + .collect(); + + let canonical = build_canonical_document( + asset, + metadata, + parsed_blocks, + parser_version, + all_warnings, + ) + .context("kb-normalize::build_canonical_document")?; + + let chunks = MdHeadingV1Chunker + .chunk(&canonical, chunk_policy) + .context("kb-chunk::MdHeadingV1Chunker::chunk")?; + + // Persist. Each `put_*` call wraps its own short transaction + // (per-document tx semantics per design §5.8); composing them is + // the kb-app job. A failure mid-way leaves the DB in a state the + // next ingest run can re-converge (UPSERT + DELETE-then-INSERT). + app.sqlite + .put_asset_with_bytes(asset, &bytes) + .context("DocumentStore::put_asset_with_bytes")?; + app.sqlite + .put_document(&canonical) + .context("DocumentStore::put_document")?; + app.sqlite + .put_blocks(&canonical.doc_id, &canonical.blocks) + .context("DocumentStore::put_blocks")?; + app.sqlite + .put_chunks(&canonical.doc_id, &chunks) + .context("DocumentStore::put_chunks")?; + + // Embed + vector upsert (only when both sides are configured). + if let (Some(emb), Some(vec_store)) = (embedder, vector_store) { + if !chunks.is_empty() { + let inputs: Vec> = chunks + .iter() + .map(|c| EmbeddingInput { + text: c.text.as_str(), + kind: EmbeddingKind::Document, + }) + .collect(); + let vectors = emb + .embed(&inputs) + .context("Embedder::embed (document chunks)")?; + let model_id = emb.model_id(); + let model_version = emb.model_version(); + let dimensions = emb.dimensions(); + let records: Vec = chunks + .iter() + .zip(vectors) + .map(|(c, v)| VectorRecord { + embedding_id: kb_core::id_for_embedding( + &c.chunk_id, + &model_id, + &model_version, + dimensions, + ), + chunk_id: c.chunk_id.clone(), + vector: v, + doc_id: canonical.doc_id.clone(), + text: c.text.clone(), + heading_path: c.heading_path.clone(), + model_id: model_id.clone(), + model_version: model_version.clone(), + dimensions, + }) + .collect(); + vec_store + .upsert(&records) + .context("VectorStore::upsert")?; + } + } + + let kind = if existing_doc_ids.contains(&canonical.doc_id.0) { + kb_core::IngestItemKind::Updated + } else { + kb_core::IngestItemKind::New + }; + + Ok(kb_core::IngestItem { + kind, + doc_id: Some(canonical.doc_id.clone()), + doc_path: asset.workspace_path.clone(), + asset_id: Some(asset.asset_id.clone()), + byte_len: Some(asset.byte_len), + block_count: u32::try_from(canonical.blocks.len()).ok(), + chunk_count: u32::try_from(chunks.len()).ok(), + parser_version: Some(parser_version.clone()), + chunker_version: Some(MdHeadingV1Chunker.chunker_version()), + warnings: warning_notes, + error: None, + }) } +/// Convenience: end byte of the frontmatter region (or 0 when absent). +fn fm_span_end(span: Option) -> usize { + span.map(|s| s.end).unwrap_or(0) +} + +/// Count `\n` in a byte prefix to convert frontmatter byte span to +/// the line-offset `parse_blocks` expects. +fn count_lines_in(bytes: &[u8]) -> u32 { + let n = bytes.iter().filter(|&&b| b == b'\n').count(); + u32::try_from(n).unwrap_or(u32::MAX) +} + +/// Build `BodyHints` from the asset alone. We use the asset's +/// `discovered_at` for both `fs_ctime` and `fs_mtime` because going +/// through the FS metadata API for every file would be a noticeable +/// overhead for large workspaces and the source-of-truth timestamps +/// are written into the document's frontmatter when the user wants +/// authoritative values. +fn build_body_hints(asset: &RawAsset) -> BodyHints { + BodyHints { + first_h1: None, + fs_ctime: asset.discovered_at, + fs_mtime: asset.discovered_at, + fallback_lang: None, + } +} + +/// Build a `ChunkPolicy` from the active config. +fn chunk_policy_from_config(config: &kb_config::Config) -> ChunkPolicy { + ChunkPolicy { + target_tokens: config.chunking.target_tokens, + overlap_tokens: config.chunking.overlap_tokens, + respect_markdown_headings: config.chunking.respect_markdown_headings, + chunker_version: ChunkerVersion(config.chunking.chunker_version.clone()), + } +} + +// ── list_docs / inspect_doc / inspect_chunk ─────────────────────────────── + +pub fn list_docs(filter: DocFilter) -> anyhow::Result> { + let config = load_config()?; + list_docs_with_config(config, filter) +} + +/// Test-only seam — kb-cli must call the public free function +/// ([`list_docs`]), not this. +#[doc(hidden)] +pub fn list_docs_with_config( + config: kb_config::Config, + filter: DocFilter, +) -> anyhow::Result> { + let app = App::open(config)?; + app.sqlite.list_documents(&filter) +} + +pub fn inspect_doc(id: &DocumentId) -> anyhow::Result { + let config = load_config()?; + inspect_doc_with_config(config, id) +} + +/// Test-only seam — kb-cli must call the public free function +/// ([`inspect_doc`]), not this. +#[doc(hidden)] +pub fn inspect_doc_with_config( + config: kb_config::Config, + id: &DocumentId, +) -> anyhow::Result { + let app = App::open(config)?; + app.sqlite + .get_document(id)? + .ok_or_else(|| anyhow!("document not found: {} (try `kb list docs`)", id.0)) +} + +pub fn inspect_chunk(id: &ChunkId) -> anyhow::Result { + let config = load_config()?; + inspect_chunk_with_config(config, id) +} + +/// Test-only seam — kb-cli must call the public free function +/// ([`inspect_chunk`]), not this. +#[doc(hidden)] +pub fn inspect_chunk_with_config( + config: kb_config::Config, + id: &ChunkId, +) -> anyhow::Result { + let app = App::open(config)?; + app.sqlite + .get_chunk(id)? + .ok_or_else(|| anyhow!("chunk not found: {} (try `kb inspect doc `)", id.0)) +} + +// ── search ──────────────────────────────────────────────────────────────── + +pub fn search(query: SearchQuery) -> anyhow::Result> { + let config = load_config()?; + search_with_config(config, query) +} + +/// Test-only seam — kb-cli must call the public free function +/// ([`search`]), not this. +#[doc(hidden)] +pub fn search_with_config( + config: kb_config::Config, + query: SearchQuery, +) -> anyhow::Result> { + let app = App::open(config)?; + + match query.mode { + SearchMode::Lexical => { + let lex = LexicalRetriever::with_settings( + app.sqlite.clone(), + lexical_index_version(&app.config), + app.config.search.snippet_chars, + ); + lex.search(&query) + } + SearchMode::Vector => { + let (emb, vec_store) = require_embeddings(&app)?; + let vec_iv = vector_index_version(emb.as_ref()); + let vec_dyn: Arc = vec_store; + let emb_dyn: Arc = emb; + let retr = VectorRetriever::with_settings( + vec_dyn, + emb_dyn, + app.sqlite.clone(), + vec_iv, + app.config.search.snippet_chars, + ); + retr.search(&query) + } + SearchMode::Hybrid => { + let lex = Arc::new(LexicalRetriever::with_settings( + app.sqlite.clone(), + lexical_index_version(&app.config), + app.config.search.snippet_chars, + )) as Arc; + let (emb, vec_store) = require_embeddings(&app)?; + let vec_iv = vector_index_version(emb.as_ref()); + let vec_dyn: Arc = vec_store; + let emb_dyn: Arc = emb; + let vec_retr = Arc::new(VectorRetriever::with_settings( + vec_dyn, + emb_dyn, + app.sqlite.clone(), + vec_iv, + app.config.search.snippet_chars, + )) as Arc; + let hybrid = HybridRetriever::new(&app.config, lex, vec_retr); + hybrid.search(&query) + } + } +} + +fn require_embeddings( + app: &App, +) -> anyhow::Result<( + Arc, + Arc, +)> { + let emb = app.embedder()?.ok_or_else(|| { + anyhow!( + "embeddings disabled (config.models.embedding.provider == \"none\" \ + or dimensions == 0); vector / hybrid search require embeddings — \ + switch to --mode lexical or enable an embedding provider in config.toml" + ) + })?; + let vec_store = app.vector()?.ok_or_else(|| { + anyhow!( + "vector store unavailable while embedder is configured — this should \ + not happen; check `kb doctor` and the data_dir permissions" + ) + })?; + Ok((emb, vec_store)) +} + +/// Compose a stable `IndexVersion` for the lexical retriever from +/// the active config. This token surfaces in `SearchHit.index_version` +/// and on snapshot tests; including the chunker version pins it to +/// the chunking policy in effect. +fn lexical_index_version(config: &kb_config::Config) -> IndexVersion { + IndexVersion(format!("lex:{}", config.chunking.chunker_version)) +} + +/// Compose a stable `IndexVersion` for the vector retriever. Tracks +/// `(embedding_model, embedding_version, dimensions)` so a model swap +/// flags drift via the existing index_version mismatch warning in +/// `HybridRetriever::new`. +fn vector_index_version(embedder: &dyn Embedder) -> IndexVersion { + IndexVersion(format!( + "vec:{}@{}:{}", + embedder.model_id().0, + embedder.model_version().0, + embedder.dimensions(), + )) +} + +// ── ask (still stubbed — P4-3) ──────────────────────────────────────────── + pub fn ask(_query: &str, _opts: AskOpts) -> anyhow::Result { - bail!("not yet wired (P5-1)") + anyhow::bail!("not yet wired (P4-3)") } /// Run the doctor checks. P0 emits `config_loaded` + `data_dir_writable` diff --git a/crates/kb-app/tests/common/mod.rs b/crates/kb-app/tests/common/mod.rs new file mode 100644 index 0000000..59b04ce --- /dev/null +++ b/crates/kb-app/tests/common/mod.rs @@ -0,0 +1,104 @@ +//! Shared test scaffolding for `kb-app` integration tests. +//! +//! Each test gets a fresh `TempDir` and a `Config` whose storage paths +//! all point inside it, so the user's real `data_dir` / `model_dir` +//! is never touched. The fixture workspace at +//! `tests/fixtures/workspace/` is *copied* into the temp dir for each +//! test so a write-side ingest can't trip on a read-only fixture +//! tree. The default lane (no `--ignored`) opts out of embeddings via +//! `provider = "none"` so AVX is not required. + +#![allow(dead_code)] + +use std::path::{Path, PathBuf}; + +use kb_config::Config; +use tempfile::TempDir; + +/// Test environment: owns a `TempDir` and exposes a `Config` whose +/// storage paths live inside it. +pub struct TestEnv { + pub temp: TempDir, + pub workspace_root: PathBuf, + pub config: Config, +} + +impl TestEnv { + /// Build an env with embeddings disabled (lexical-only). Default + /// lane — no AVX, no fastembed download. + pub fn lexical_only() -> Self { + let env = Self::new_inner(); + let mut e = env; + e.config.models.embedding.provider = "none".to_string(); + e.config.models.embedding.dimensions = 0; + e + } + + /// Build an env with the default fastembed embedding provider. + /// Used by AVX-gated `#[ignore]` tests. + pub fn with_embeddings() -> Self { + Self::new_inner() + } + + fn new_inner() -> Self { + let temp = tempfile::tempdir().expect("tempdir"); + let workspace_root = temp.path().join("workspace"); + copy_fixture_workspace(&workspace_root); + + let data_dir = temp.path().join("data"); + std::fs::create_dir_all(&data_dir).unwrap(); + let model_dir = temp.path().join("models"); + std::fs::create_dir_all(&model_dir).unwrap(); + + let mut config = Config::defaults(); + config.workspace.root = workspace_root.to_string_lossy().into_owned(); + // Drop the ".obsidian" / "node_modules" excludes — they bring + // in nothing useful for fixtures and just hide debugging. + config.workspace.exclude.clear(); + config.storage.data_dir = data_dir.to_string_lossy().into_owned(); + // Pin model_dir to the TempDir so a future fastembed-touching + // test can't accidentally write to the user's `~/.local/share`. + config.storage.model_dir = model_dir.to_string_lossy().into_owned(); + // Drop in a small chunk policy so the fixture's small files + // emit at least a couple of chunks even with overlap_tokens + // honored. + config.chunking.target_tokens = 80; + config.chunking.overlap_tokens = 20; + + Self { + temp, + workspace_root, + config, + } + } + + pub fn scope(&self) -> kb_core::SourceScope { + kb_core::SourceScope { + root: self.workspace_root.clone(), + include: self.config.workspace.include.clone(), + exclude: self.config.workspace.exclude.clone(), + } + } +} + +fn copy_fixture_workspace(dest: &Path) { + let src = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests") + .join("fixtures") + .join("workspace"); + copy_dir_recursive(&src, dest); +} + +fn copy_dir_recursive(src: &Path, dest: &Path) { + std::fs::create_dir_all(dest).unwrap(); + for entry in std::fs::read_dir(src).expect("read fixture dir") { + let entry = entry.unwrap(); + let path = entry.path(); + let target = dest.join(entry.file_name()); + if path.is_dir() { + copy_dir_recursive(&path, &target); + } else { + std::fs::copy(&path, &target).expect("copy fixture file"); + } + } +} diff --git a/crates/kb-app/tests/fixtures/workspace/intro.md b/crates/kb-app/tests/fixtures/workspace/intro.md new file mode 100644 index 0000000..b2ea9ba --- /dev/null +++ b/crates/kb-app/tests/fixtures/workspace/intro.md @@ -0,0 +1,24 @@ +--- +title: Introduction to Rust +tags: [rust, language] +lang: en +created_at: 2024-03-01T00:00:00Z +updated_at: 2024-03-02T00:00:00Z +source_type: note +trust_level: primary +--- + +# Introduction to Rust + +Rust is a systems programming language focused on safety, speed, and concurrency. +The compiler enforces memory safety without a garbage collector. + +## Ownership + +Each value has a single owner. When the owner goes out of scope the value is +dropped. References are borrows that the compiler tracks at compile time. + +## Concurrency + +Threads in Rust use the ownership system to prevent data races. The Send and +Sync traits codify which types can move between threads. diff --git a/crates/kb-app/tests/fixtures/workspace/notes/cargo.md b/crates/kb-app/tests/fixtures/workspace/notes/cargo.md new file mode 100644 index 0000000..b6d1011 --- /dev/null +++ b/crates/kb-app/tests/fixtures/workspace/notes/cargo.md @@ -0,0 +1,23 @@ +--- +title: Cargo Notes +tags: [rust, cargo, tools] +lang: en +created_at: 2024-04-01T00:00:00Z +updated_at: 2024-04-02T00:00:00Z +source_type: note +trust_level: primary +--- + +# Cargo Notes + +Cargo is the Rust package manager and build tool. + +## Workspaces + +A workspace is a set of packages that share the same `Cargo.lock` and output +directory. Member crates are listed under `[workspace.members]`. + +## Features + +Cargo features let crates expose optional functionality behind a feature flag. +Default features are enabled unless `default-features = false` is set. diff --git a/crates/kb-app/tests/fixtures/workspace/notes/python.md b/crates/kb-app/tests/fixtures/workspace/notes/python.md new file mode 100644 index 0000000..39dc378 --- /dev/null +++ b/crates/kb-app/tests/fixtures/workspace/notes/python.md @@ -0,0 +1,23 @@ +--- +title: Python Snippets +tags: [python, language] +lang: en +created_at: 2024-05-01T00:00:00Z +updated_at: 2024-05-02T00:00:00Z +source_type: note +trust_level: primary +--- + +# Python Snippets + +Quick reference for everyday Python tasks. + +## List comprehensions + +Filter and transform in one pass: `[x*2 for x in xs if x > 0]`. Cleaner than +the map+filter pair when the predicate is simple. + +## Decorators + +Wrap a function in another function. `functools.wraps` preserves the +docstring and `__name__` of the inner function on the outer wrapper. diff --git a/crates/kb-app/tests/ingest_lexical.rs b/crates/kb-app/tests/ingest_lexical.rs new file mode 100644 index 0000000..344dee9 --- /dev/null +++ b/crates/kb-app/tests/ingest_lexical.rs @@ -0,0 +1,220 @@ +//! Integration tests for `kb-app::ingest` + `list_docs` + `inspect_*` +//! along the lexical-only path (no embeddings → no AVX requirement). + +mod common; + +use common::TestEnv; + +#[test] +fn ingest_then_list_inspects_round_trip() { + let env = TestEnv::lexical_only(); + let report = + kb_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap(); + + // The fixture has 3 markdown files; first ingest should label them + // all as New. + assert_eq!(report.scanned, 3, "scanned: {report:?}"); + assert_eq!(report.new, 3, "new: {report:?}"); + assert_eq!(report.updated, 0, "updated: {report:?}"); + assert_eq!(report.errors, 0, "errors: {report:?}"); + let items = report.items.as_ref().expect("items present"); + assert_eq!(items.len(), 3); + for it in items { + assert!(it.error.is_none(), "per-item error: {it:?}"); + assert!(it.doc_id.is_some()); + // Each fixture file emits ≥1 chunk. + assert!(it.chunk_count.unwrap_or(0) >= 1, "chunks: {it:?}"); + } + + // list_docs returns the 3 docs. + let docs = kb_app::list_docs_with_config( + env.config.clone(), + kb_core::DocFilter::default(), + ) + .unwrap(); + assert_eq!(docs.len(), 3, "docs: {docs:?}"); + + // inspect_doc round-trips one of them. + let any_doc_id = docs[0].doc_id.clone(); + let canonical = kb_app::inspect_doc_with_config(env.config.clone(), &any_doc_id) + .unwrap(); + assert_eq!(canonical.doc_id, any_doc_id); + assert!(!canonical.blocks.is_empty(), "blocks empty"); +} + +#[test] +fn ingest_idempotent_on_second_run() { + let env = TestEnv::lexical_only(); + + let r1 = + kb_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap(); + assert_eq!(r1.new, 3); + + let r2 = + kb_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap(); + // Same files re-ingested — labelled Updated, not duplicated. + assert_eq!(r2.scanned, 3, "second scan: {r2:?}"); + assert_eq!(r2.new, 0, "second run new should be 0: {r2:?}"); + assert_eq!(r2.updated, 3, "second run updated: {r2:?}"); + + // list_docs still has 3 docs (no duplicates). + let docs = kb_app::list_docs_with_config( + env.config.clone(), + kb_core::DocFilter::default(), + ) + .unwrap(); + assert_eq!(docs.len(), 3); +} + +#[test] +fn ingest_summary_only_drops_items() { + let env = TestEnv::lexical_only(); + let report = + kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap(); + assert_eq!(report.scanned, 3); + assert!(report.items.is_none(), "summary-only should null items"); +} + +#[test] +fn ingest_records_ingest_runs_row_with_aggregate_counts() { + // The ingest_runs table is the §5.7 sibling of `jobs`: dedicated + // count columns (`scanned`, `new_count`, …) populated at the end + // of every run. `summary_only=true` writes `items_json=NULL`; the + // counts MUST still be present. + let env = TestEnv::lexical_only(); + let report = kb_app::ingest_with_config(env.config.clone(), env.scope(), true) + .unwrap(); + assert_eq!(report.scanned, 3); + + let db_path = std::path::PathBuf::from(&env.config.storage.data_dir) + .join("kb.sqlite"); + let conn = rusqlite::Connection::open(&db_path).expect("open kb.sqlite"); + let (scanned, new_c, updated, skipped, errors, items_json): ( + i64, + i64, + i64, + i64, + i64, + Option, + ) = conn + .query_row( + "SELECT scanned, new_count, updated_count, skipped_count, + error_count, items_json + FROM ingest_runs + ORDER BY started_at DESC + LIMIT 1", + [], + |r| { + Ok(( + r.get(0)?, + r.get(1)?, + r.get(2)?, + r.get(3)?, + r.get(4)?, + r.get(5)?, + )) + }, + ) + .expect("ingest_runs row present"); + assert_eq!(scanned, 3); + assert_eq!(new_c, 3); + assert_eq!(updated, 0); + assert_eq!(skipped, 0); + assert_eq!(errors, 0); + assert!( + items_json.is_none(), + "summary_only=true must store items_json=NULL: {items_json:?}" + ); +} + +#[test] +fn ingest_provider_none_skips_lance() { + // `provider="none"` must short-circuit the embedder + vector store + // build entirely, so the LanceDB directory MUST NOT be created on + // disk during ingest. `IngestReport` currently has no + // `embeddings_indexed` field, so we assert via the on-disk lance + // tree shape (no `/lancedb` directory, or no `*.lance` + // tables under it). + let env = TestEnv::lexical_only(); + let report = + kb_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap(); + assert_eq!(report.errors, 0, "lexical-only run must not error"); + assert_eq!(report.new, 3); + + let lance_dir = std::path::PathBuf::from(&env.config.storage.data_dir) + .join("lancedb"); + if lance_dir.exists() { + // If the dir was created (e.g., by an earlier consumer touching + // the path), it MUST contain no `.lance` tables. + let mut had_lance_table = false; + for entry in std::fs::read_dir(&lance_dir).expect("read lance_dir") { + let entry = entry.unwrap(); + if entry + .path() + .extension() + .and_then(|s| s.to_str()) + == Some("lance") + { + had_lance_table = true; + break; + } + } + assert!( + !had_lance_table, + "provider=none must not produce any *.lance table under {}", + lance_dir.display() + ); + } +} + +#[test] +fn list_docs_filters_by_tags_any() { + let env = TestEnv::lexical_only(); + kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap(); + + let filter = kb_core::DocFilter { + tags_any: vec!["python".to_string()], + ..Default::default() + }; + let docs = kb_app::list_docs_with_config(env.config.clone(), filter).unwrap(); + assert_eq!(docs.len(), 1, "expected only the python doc: {docs:?}"); + assert!(docs[0].tags.contains(&"python".to_string())); + + let rust_filter = kb_core::DocFilter { + tags_any: vec!["rust".to_string()], + ..Default::default() + }; + let rust_docs = + kb_app::list_docs_with_config(env.config.clone(), rust_filter).unwrap(); + // intro.md and notes/cargo.md both tag "rust". + assert_eq!(rust_docs.len(), 2, "expected 2 rust docs: {rust_docs:?}"); +} + +#[test] +fn inspect_doc_not_found_returns_actionable_error() { + let env = TestEnv::lexical_only(); + let bogus = + kb_core::DocumentId("0000000000000000000000000000000000000000000000000000000000000000".to_string()); + let err = kb_app::inspect_doc_with_config(env.config.clone(), &bogus).unwrap_err(); + let msg = format!("{err:#}"); + assert!( + msg.contains("not found"), + "error must mention not-found: {msg}" + ); + assert!( + msg.contains("kb list docs") || msg.contains("list"), + "error must hint at `kb list docs`: {msg}" + ); +} + +#[test] +fn inspect_chunk_not_found_returns_actionable_error() { + let env = TestEnv::lexical_only(); + let bogus = kb_core::ChunkId( + "0000000000000000000000000000000000000000000000000000000000000000".to_string(), + ); + let err = kb_app::inspect_chunk_with_config(env.config.clone(), &bogus) + .unwrap_err(); + let msg = format!("{err:#}"); + assert!(msg.contains("not found"), "got: {msg}"); +} diff --git a/crates/kb-app/tests/search_lexical.rs b/crates/kb-app/tests/search_lexical.rs new file mode 100644 index 0000000..3391ede --- /dev/null +++ b/crates/kb-app/tests/search_lexical.rs @@ -0,0 +1,69 @@ +//! Lexical search integration tests. The vector / hybrid lanes are +//! AVX-gated and live in `search_vector.rs` (`#[ignore]`). + +mod common; + +use common::TestEnv; + +fn lexical_query(text: &str) -> kb_core::SearchQuery { + kb_core::SearchQuery { + text: text.to_string(), + mode: kb_core::SearchMode::Lexical, + k: 10, + filters: kb_core::SearchFilters::default(), + } +} + +#[test] +fn lexical_search_returns_hits_after_ingest() { + let env = TestEnv::lexical_only(); + kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap(); + + // "Ownership" appears as a heading + paragraph in intro.md and + // matches FTS5 default tokenizer easily. + let hits = + kb_app::search_with_config(env.config.clone(), lexical_query("ownership")) + .unwrap(); + assert!(!hits.is_empty(), "expected ≥1 hit for 'ownership'"); + + for h in &hits { + // Lexical retriever sets embedding_model=None per spec. + assert!( + h.embedding_model.is_none(), + "lexical-mode hit must have None embedding_model: {h:?}" + ); + assert_eq!( + h.retrieval.method, + kb_core::SearchMode::Lexical, + "method label should be Lexical" + ); + } +} + +#[test] +fn lexical_search_empty_query_returns_empty() { + let env = TestEnv::lexical_only(); + kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap(); + let hits = kb_app::search_with_config(env.config.clone(), lexical_query(" ")) + .unwrap(); + assert!(hits.is_empty(), "blank query must short-circuit empty"); +} + +#[test] +fn vector_mode_with_provider_none_errors_clearly() { + let env = TestEnv::lexical_only(); + kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap(); + + let q = kb_core::SearchQuery { + text: "ownership".to_string(), + mode: kb_core::SearchMode::Vector, + k: 10, + filters: kb_core::SearchFilters::default(), + }; + let err = kb_app::search_with_config(env.config.clone(), q).unwrap_err(); + let msg = format!("{err:#}"); + assert!( + msg.contains("embeddings disabled") || msg.contains("disabled"), + "error must mention embeddings disabled: {msg}" + ); +} diff --git a/crates/kb-app/tests/search_vector.rs b/crates/kb-app/tests/search_vector.rs new file mode 100644 index 0000000..1ac7df6 --- /dev/null +++ b/crates/kb-app/tests/search_vector.rs @@ -0,0 +1,89 @@ +//! Vector / Hybrid lane — AVX-gated. Marked `#[ignore]` because Lance +//! crashes with `SIGILL` on hosts without AVX, and CI lanes that are +//! AVX-less should not run these. Local hosts run them via +//! `cargo test -p kb-app -- --ignored`. + +mod common; + +use common::TestEnv; + +/// Panic if the host CPU lacks AVX. Mirrors the helper in +/// `kb-store-vector/tests/common/mod.rs` and `kb-search` so a +/// `--ignored` invocation on a non-AVX host fails loudly with a +/// clear message instead of crashing inside Lance's SIMD kernel. +fn require_avx_or_panic() { + #[cfg(target_arch = "x86_64")] + { + if !std::is_x86_feature_detected!("avx") { + panic!( + "kb-app vector integration test requires AVX-capable hardware; \ + host CPU lacks AVX. Run on an AVX-capable machine." + ); + } + } +} + +// First run downloads ~470MB; expect ~30-60s warm, several minutes cold. +#[test] +#[ignore = "AVX-required (Lance SIMD kernels)"] +fn ingest_then_hybrid_search_returns_hits() { + require_avx_or_panic(); + + let env = TestEnv::with_embeddings(); + let report = + kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap(); + assert_eq!(report.errors, 0, "no per-file errors: {report:?}"); + assert_eq!(report.new, 3); + + let q = kb_core::SearchQuery { + text: "ownership".to_string(), + mode: kb_core::SearchMode::Hybrid, + k: 10, + filters: kb_core::SearchFilters::default(), + }; + let hits = kb_app::search_with_config(env.config.clone(), q).unwrap(); + assert!(!hits.is_empty(), "expected hybrid hits for 'ownership'"); + let methods: Vec<_> = hits.iter().map(|h| h.retrieval.method).collect(); + assert!( + methods.iter().all(|m| *m == kb_core::SearchMode::Hybrid), + "every hit must report method=Hybrid: {methods:?}" + ); +} + +// First run downloads ~470MB; expect ~30-60s warm, several minutes cold. +#[test] +#[ignore = "AVX-required (Lance SIMD kernels)"] +fn ingest_then_vector_search_carries_embedding_model() { + require_avx_or_panic(); + + let env = TestEnv::with_embeddings(); + let report = + kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap(); + assert_eq!(report.errors, 0, "no per-file errors: {report:?}"); + assert_eq!(report.new, 3); + + let q = kb_core::SearchQuery { + text: "ownership".to_string(), + mode: kb_core::SearchMode::Vector, + k: 10, + filters: kb_core::SearchFilters::default(), + }; + let hits = kb_app::search_with_config(env.config.clone(), q).unwrap(); + assert!(!hits.is_empty(), "expected vector hits for 'ownership'"); + + // Vector mode dispatches through `VectorRetriever` and MUST stamp + // each hit with the configured embedding_model id. + let expected = kb_core::EmbeddingModelId(env.config.models.embedding.model.clone()); + for h in &hits { + assert_eq!( + h.embedding_model, + Some(expected.clone()), + "vector-mode hit must carry embedding_model={expected:?}: {h:?}" + ); + assert_eq!( + h.retrieval.method, + kb_core::SearchMode::Vector, + "vector-mode hit must report method=Vector" + ); + } +} diff --git a/crates/kb-store-sqlite/src/jobs.rs b/crates/kb-store-sqlite/src/jobs.rs index 4493822..745ffb7 100644 --- a/crates/kb-store-sqlite/src/jobs.rs +++ b/crates/kb-store-sqlite/src/jobs.rs @@ -3,6 +3,14 @@ //! The `jobs` table is the §5.7 schema. JobIds are minted via blake3 over //! `(now, kind, payload)` so two `create` calls in the same millisecond //! still distinguish. +//! +//! This module also owns the `ingest_runs` writer. `ingest_runs` is the +//! §5.7 sibling table that records per-run aggregate counts (`scanned`, +//! `new_count`, `updated_count`, …) alongside the `jobs` row that +//! `kb jobs` shows. The aggregate-counts surface is intentionally a +//! direct INSERT (not a `JobRepo` trait method) because `JobRepo` is +//! generic across job kinds, while `ingest_runs` is ingest-specific +//! schema with dedicated columns. use anyhow::{Context, Result}; use rusqlite::params; @@ -12,6 +20,71 @@ use time::OffsetDateTime; use crate::error::StoreError; use crate::store::SqliteStore; +/// Aggregate counts for one ingest run. Written into the `ingest_runs` +/// table so `kb jobs` (P+) and audit tooling can surface the per-run +/// summary without re-walking the workspace. +/// +/// `items_json` carries the per-item detail when the run was NOT +/// `summary_only`; it is `None` when the caller asked for a summary +/// (the table column is then NULL per design §5.7). +#[derive(Clone, Debug)] +pub struct IngestRunRow<'a> { + pub run_id: &'a str, + pub scope_json: &'a str, + pub scanned: u32, + pub new_count: u32, + pub updated_count: u32, + pub skipped_count: u32, + pub error_count: u32, + pub duration_ms: u32, + pub started_at: OffsetDateTime, + pub finished_at: OffsetDateTime, + pub items_json: Option<&'a str>, +} + +impl SqliteStore { + /// Write one row into `ingest_runs` with the aggregate counts. Mirrors + /// the schema in `migrations/V001__init.sql` (§5.7). Called by + /// `kb-app::ingest` at the very end of a run, after the per-document + /// transactions have committed and the totals are known. + /// + /// `items_json = None` ↔ the column stores SQL `NULL`, which is the + /// `summary_only=true` contract. + pub fn record_ingest_run(&self, row: &IngestRunRow<'_>) -> Result<()> { + let started = row + .started_at + .format(&time::format_description::well_known::Rfc3339) + .context("format ingest_run started_at")?; + let finished = row + .finished_at + .format(&time::format_description::well_known::Rfc3339) + .context("format ingest_run finished_at")?; + let conn = self.lock_conn(); + conn.execute( + "INSERT INTO ingest_runs ( + run_id, scope_json, scanned, new_count, updated_count, + skipped_count, error_count, duration_ms, + started_at, finished_at, items_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + params![ + row.run_id, + row.scope_json, + row.scanned as i64, + row.new_count as i64, + row.updated_count as i64, + row.skipped_count as i64, + row.error_count as i64, + row.duration_ms as i64, + started, + finished, + row.items_json, + ], + ) + .map_err(StoreError::from)?; + Ok(()) + } +} + impl kb_core::JobRepo for SqliteStore { fn create( &self, diff --git a/crates/kb-store-sqlite/src/lib.rs b/crates/kb-store-sqlite/src/lib.rs index c006e60..dfbf006 100644 --- a/crates/kb-store-sqlite/src/lib.rs +++ b/crates/kb-store-sqlite/src/lib.rs @@ -29,4 +29,5 @@ mod store; pub use embeddings::EmbeddingRecordRow; pub use error::StoreError; pub use fts::rebuild_chunks_fts; +pub use jobs::IngestRunRow; pub use store::SqliteStore;