feat(p3-5): wire kb-app facade — ingest / search / list / inspect
Replaces the P0 `bail!("not yet wired")` stubs in kb-app with real
bodies that compose the libraries shipped through P3-4. After this
commit, `cargo run -p kb-cli -- index` actually walks the workspace
and persists chunks (SQLite + optionally LanceDB), and
`cargo run -p kb-cli -- search --mode {lexical,vector,hybrid}` returns
real SearchHits with citations. `kb-app::ask` stays stubbed; P4-3
owns it.
App lifecycle (crates/kb-app/src/app.rs):
- Internal pub(crate) struct App holds the Config plus
Arc<SqliteStore> eagerly, with embedder + LanceVectorStore behind
OnceLock<Arc<...>> for memoization. First call pays the ~470MB
fastembed init / Lance open; subsequent calls return the cached
Arc::clone. OnceLock::set race losers fall back to get().cloned()
so the lazy-init is concurrent-safe.
- One-shot CLI invocations pay the cost once at most. The P9 TUI
(which holds an App for the session) gets memoization for free.
ingest pipeline (lib.rs):
- FsSourceConnector::scan(&scope) → per asset:
parse_frontmatter → parse_blocks → build_canonical_document →
MdHeadingV1Chunker.chunk → put_asset_with_bytes → put_document →
put_blocks → put_chunks. One transaction per document per design
§5.8 (kb-store-sqlite's put_* methods own the transactions).
- When provider != "none" and dimensions > 0: build embedder once,
embed each doc's chunks as Document kind, ensure_table once at the
top of the run, then upsert the VectorRecord batch. Lexical-only
config (provider == "none") skips both — verified by
ingest_provider_none_skips_lance test.
- Per-asset parse failures recorded as IngestItemKind::Error with
the warning attached; the run continues. Only structural failures
(DB unreachable etc.) abort.
- Aggregate counts (assets_scanned / new / updated / skipped /
errors / chunks_indexed / embeddings_indexed / duration_ms) flow
into both the JobRepo progress_json AND a dedicated ingest_runs
row written via SqliteStore::record_ingest_run (new
pub(crate) helper added to kb-store-sqlite — see below).
summary_only=true writes items_json=NULL but still populates the
count columns.
search dispatch:
- SearchMode::Lexical → LexicalRetriever directly.
- SearchMode::Vector → VectorRetriever with embedder + LanceVectorStore.
- SearchMode::Hybrid → HybridRetriever composing the two.
- Vector / Hybrid with provider=none returns a clear error naming the
config key to flip ("models.embedding.provider").
list_docs / inspect_doc / inspect_chunk delegate straight to
DocumentStore trait methods. Returns Err with actionable message on
not-found.
Test seam: each public free function has a matching
#[doc(hidden)] pub fn *_with_config(cfg, ...) companion that
integration tests invoke directly (the public form internally calls
load_config()). pub(crate) would not reach across the integration-
tests crate boundary; #[doc(hidden)] keeps it out of rustdoc and the
function comment flags it as test-only.
kb-store-sqlite additions:
- pub struct IngestRunRow + pub fn record_ingest_run on SqliteStore
for the kb-app aggregate-counts persistence path. Helper writes
the ingest_runs row directly with all aggregate columns; jobs
table still gets a JobRepo create/update_progress/finish trio in
parallel.
Tests (11 default, 2 #[ignore] AVX-gated):
- ingest_lexical: round-trip, idempotent, summary_only_drops_items,
provider_none_skips_lance (asserts no .lance dir on disk),
records_ingest_runs_row_with_aggregate_counts, tags_any filter,
inspect_doc_not_found, inspect_chunk_not_found.
- search_lexical: lexical hits with embedding_model=None,
empty_query_returns_empty, vector_mode_with_provider_none returns
clear error.
- search_vector: hybrid mode end-to-end (#[ignore], AVX), Vector
mode embedding_model assertion (#[ignore], AVX). Both run on the
AVX VM in ~21s combined (first run pays the model download).
- TestEnv pins workspace.root + storage.{data_dir,model_dir} to a
TempDir so tests don't touch the user's $HOME/.local/share.
- Fixture workspace at crates/kb-app/tests/fixtures/workspace/ has
three small markdown files with varied frontmatter (rust+cargo+
python tags) so the tags_any filter test exercises a non-trivial
predicate.
Workspace 269 passed / 24 ignored / 0 failed (was 261/22). cargo
clippy --workspace --all-targets -- -D warnings clean. CLI smoke
verified manually: `cargo run -p kb-cli -- index` returns a real
IngestReport JSON; `cargo run -p kb-cli -- search "..."` returns
hits with citations; `cargo run -p kb-cli -- list docs` lists the
indexed documents.
Allowed deps respected: kb-source-fs, kb-parse-md, kb-parse-types,
kb-normalize, kb-chunk, kb-store-sqlite, kb-search, kb-store-vector,
kb-embed, kb-embed-local plus existing tracing / anyhow / serde /
toml / dirs and now blake3 (run_id) + time. Forbidden (kb-llm*,
kb-rag, kb-tui, kb-desktop, kb-parse-{pdf,image,audio}) absent from
cargo tree -p kb-app.
Out of scope per spec: ask body (P4-3), --rebuild-fts wiring,
--resume checkpointing (P+), --watch (P+), TUI / desktop integration
(P9 consumes this facade).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -3334,11 +3334,25 @@ name = "kb-app"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake3",
|
||||
"dirs 5.0.1",
|
||||
"kb-chunk",
|
||||
"kb-config",
|
||||
"kb-core",
|
||||
"kb-embed",
|
||||
"kb-embed-local",
|
||||
"kb-normalize",
|
||||
"kb-parse-md",
|
||||
"kb-parse-types",
|
||||
"kb-search",
|
||||
"kb-source-fs",
|
||||
"kb-store-sqlite",
|
||||
"kb-store-vector",
|
||||
"rusqlite",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"time",
|
||||
"toml",
|
||||
"tracing",
|
||||
"tracing-appender",
|
||||
|
||||
@@ -10,11 +10,27 @@ description = "Facade — orchestrates components for kb-cli/tui/desktop"
|
||||
[dependencies]
|
||||
kb-core = { path = "../kb-core" }
|
||||
kb-config = { path = "../kb-config" }
|
||||
kb-source-fs = { path = "../kb-source-fs" }
|
||||
kb-parse-md = { path = "../kb-parse-md" }
|
||||
kb-parse-types = { path = "../kb-parse-types" }
|
||||
kb-normalize = { path = "../kb-normalize" }
|
||||
kb-chunk = { path = "../kb-chunk" }
|
||||
kb-store-sqlite = { path = "../kb-store-sqlite" }
|
||||
kb-store-vector = { path = "../kb-store-vector" }
|
||||
kb-search = { path = "../kb-search" }
|
||||
kb-embed = { path = "../kb-embed" }
|
||||
kb-embed-local = { path = "../kb-embed-local" }
|
||||
anyhow = { workspace = true }
|
||||
blake3 = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
time = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] }
|
||||
tracing-appender = "0.2"
|
||||
toml = "0.8"
|
||||
dirs = "5"
|
||||
|
||||
[dev-dependencies]
|
||||
rusqlite = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
|
||||
126
crates/kb-app/src/app.rs
Normal file
126
crates/kb-app/src/app.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
//! `App` — internal lifecycle struct (§7).
|
||||
//!
|
||||
//! A single `App` represents one CLI invocation's worth of state: a
|
||||
//! resolved `Config`, an open `SqliteStore`, and (when embeddings are
|
||||
//! enabled) an `Embedder` + `LanceVectorStore`. Each public free
|
||||
//! function on `kb-app` wraps `App::open(config)` once, runs the
|
||||
//! requested op, and drops everything on return.
|
||||
//!
|
||||
//! The struct is `pub(crate)` because it is an internal seam: `kb-cli`
|
||||
//! calls only the free functions on the crate root. `kb-tui` (P9) is
|
||||
//! expected to hold one `App` for the session, at which point the
|
||||
//! struct may need to be promoted to `pub`. Until then, keep it
|
||||
//! private to insulate the wiring shape from downstream callers.
|
||||
//!
|
||||
//! ## Embedder + Vector store lifetime
|
||||
//!
|
||||
//! `App::open` builds the SQLite store unconditionally. The embedder
|
||||
//! and vector store are *lazy + memoized* — built on first call to
|
||||
//! [`App::embedder`] / [`App::vector`] and cached in `OnceLock`s — so
|
||||
//! a long-lived `App` (e.g., the P9 TUI session) pays the ~470 MB
|
||||
//! ONNX init plus Lance reopen cost exactly once.
|
||||
//!
|
||||
//! - `kb list` / `kb inspect` never need them.
|
||||
//! - `kb search --mode lexical` never needs them.
|
||||
//! - `kb ingest` and `kb search --mode {vector,hybrid}` always do.
|
||||
//!
|
||||
//! Building eagerly would force every CLI invocation to load ~470 MB of
|
||||
//! ONNX weights, which is the dominant cold-start cost. The lazy
|
||||
//! pattern keeps the lexical-only paths instant; the memoization makes
|
||||
//! the TUI's repeated searches cheap after the first.
|
||||
//!
|
||||
//! Embeddings can also be **disabled** workspace-wide via
|
||||
//! `config.models.embedding.provider = "none"` (or `dimensions = 0`);
|
||||
//! in that mode [`App::embedder`] returns `None` and callers must fall
|
||||
//! back to lexical-only search.
|
||||
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
|
||||
use kb_core::Embedder;
|
||||
use kb_embed_local::FastembedEmbedder;
|
||||
use kb_store_sqlite::SqliteStore;
|
||||
use kb_store_vector::LanceVectorStore;
|
||||
|
||||
/// Internal facade state. See module docs for lifetime rules.
|
||||
pub(crate) struct App {
|
||||
pub(crate) config: kb_config::Config,
|
||||
pub(crate) sqlite: Arc<SqliteStore>,
|
||||
/// Memoized embedder — built lazily on first `embedder()` call when
|
||||
/// embeddings are enabled. `OnceLock` keeps the struct `Sync` and
|
||||
/// the build path cold-only-once.
|
||||
embedder: OnceLock<Arc<dyn Embedder + Send + Sync>>,
|
||||
/// Memoized vector store — built lazily on first `vector()` call
|
||||
/// when embeddings are enabled. Same rationale as `embedder`.
|
||||
vector: OnceLock<Arc<LanceVectorStore>>,
|
||||
}
|
||||
|
||||
impl App {
|
||||
/// Open the SQLite store and run migrations. Does NOT load the
|
||||
/// embedder or vector store — those are lazy via
|
||||
/// [`Self::embedder`] / [`Self::vector`].
|
||||
///
|
||||
/// **Caveat:** must be called from a synchronous context.
|
||||
/// Downstream `LanceVectorStore::new` (called by [`Self::vector`])
|
||||
/// internally drives a `tokio::Runtime::block_on`, which panics if
|
||||
/// invoked from inside another tokio runtime.
|
||||
pub(crate) fn open(config: kb_config::Config) -> Result<Self> {
|
||||
let sqlite = SqliteStore::open(&config).context("kb-app: open SqliteStore")?;
|
||||
sqlite
|
||||
.run_migrations()
|
||||
.context("kb-app: run SqliteStore migrations")?;
|
||||
Ok(Self {
|
||||
config,
|
||||
sqlite: Arc::new(sqlite),
|
||||
embedder: OnceLock::new(),
|
||||
vector: OnceLock::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns `true` when the workspace has embeddings turned off
|
||||
/// (`provider = "none"` or `dimensions = 0`). Lexical-only mode.
|
||||
pub(crate) fn embeddings_disabled(&self) -> bool {
|
||||
let cfg = &self.config.models.embedding;
|
||||
cfg.provider == "none" || cfg.dimensions == 0
|
||||
}
|
||||
|
||||
/// Build (or reuse) the fastembed embedder. Returns `None` when the
|
||||
/// workspace is in lexical-only mode (see
|
||||
/// [`Self::embeddings_disabled`]). The first call pays the ~470 MB
|
||||
/// ONNX load; subsequent calls are a single `OnceLock` read.
|
||||
pub(crate) fn embedder(&self) -> Result<Option<Arc<dyn Embedder + Send + Sync>>> {
|
||||
if self.embeddings_disabled() {
|
||||
return Ok(None);
|
||||
}
|
||||
if let Some(e) = self.embedder.get() {
|
||||
return Ok(Some(e.clone()));
|
||||
}
|
||||
let emb: Arc<dyn Embedder + Send + Sync> = Arc::new(
|
||||
FastembedEmbedder::new(&self.config)
|
||||
.context("kb-app: load FastembedEmbedder")?,
|
||||
);
|
||||
// `set` returns Err if another thread won the race; in that case
|
||||
// the loser still returns the (now-cached) winner via `get()`.
|
||||
let _ = self.embedder.set(emb.clone());
|
||||
Ok(Some(self.embedder.get().cloned().unwrap_or(emb)))
|
||||
}
|
||||
|
||||
/// Build (or reuse) the LanceDB-backed vector store. Returns `None`
|
||||
/// when embeddings are disabled. Memoized via `OnceLock` for the
|
||||
/// same reasons as [`Self::embedder`].
|
||||
pub(crate) fn vector(&self) -> Result<Option<Arc<LanceVectorStore>>> {
|
||||
if self.embeddings_disabled() {
|
||||
return Ok(None);
|
||||
}
|
||||
if let Some(v) = self.vector.get() {
|
||||
return Ok(Some(v.clone()));
|
||||
}
|
||||
let store = Arc::new(
|
||||
LanceVectorStore::new(&self.config, self.sqlite.clone())
|
||||
.context("kb-app: open LanceVectorStore")?,
|
||||
);
|
||||
let _ = self.vector.set(store.clone());
|
||||
Ok(Some(self.vector.get().cloned().unwrap_or(store)))
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,11 @@
|
||||
//! `kb-app` — facade that downstream `kb-cli` / `kb-tui` / `kb-desktop`
|
||||
//! depend on (§7, §8).
|
||||
//!
|
||||
//! P0 implementations stub out — the signatures are frozen so that later
|
||||
//! phases swap in real bodies without breaking call sites.
|
||||
//! P3-5 swapped the `bail!("not yet wired")` stubs for real bodies that
|
||||
//! compose the libraries shipped through P3-4. After this task, `kb
|
||||
//! ingest` actually walks a workspace and persists chunks, and `kb
|
||||
//! search --mode {lexical,vector,hybrid}` returns real `SearchHit`s.
|
||||
//! `kb-app::ask` stays stubbed (P4-3 owns it).
|
||||
//!
|
||||
//! ## Wire-schema convention
|
||||
//!
|
||||
@@ -16,20 +19,48 @@
|
||||
//! surface (no domain-side equivalent in `kb-core`). When adding a new
|
||||
//! facade function in a later phase, remember: keep the return type pure,
|
||||
//! and add a matching `wire_*` helper in `kb-cli/src/wire.rs`.
|
||||
//!
|
||||
//! ## Test seam
|
||||
//!
|
||||
//! Each public free function has a `pub(crate) fn *_with_config`
|
||||
//! companion that takes a fully-resolved `Config` directly. Public
|
||||
//! callers go through the top-level functions which call
|
||||
//! `load_config()`; integration tests call the `_with_config` variant
|
||||
//! and pass a mutated `Config` pointing at a `TempDir` to avoid
|
||||
//! polluting the user's real `data_dir` / `model_dir`.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use anyhow::{Context, anyhow};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use kb_chunk::MdHeadingV1Chunker;
|
||||
use kb_core::{
|
||||
Answer, CanonicalDocument, Chunk, ChunkId, DocFilter, DocSummary, DocumentId,
|
||||
IngestReport, SearchHit, SearchMode, SearchQuery, SourceScope,
|
||||
Answer, CanonicalDocument, Chunk, ChunkId, ChunkPolicy, ChunkerVersion, Chunker,
|
||||
DocFilter, DocSummary, DocumentId, DocumentStore, Embedder, EmbeddingInput,
|
||||
EmbeddingKind, IndexVersion, IngestReport, ParserVersion, RawAsset, Retriever,
|
||||
SearchHit, SearchMode, SearchQuery, SourceConnector, SourceScope, SourceUri,
|
||||
VectorRecord, VectorStore,
|
||||
};
|
||||
use kb_normalize::build_canonical_document;
|
||||
use kb_parse_md::{BodyHints, parse_blocks, parse_frontmatter};
|
||||
use kb_search::{HybridRetriever, LexicalRetriever, VectorRetriever};
|
||||
use kb_source_fs::FsSourceConnector;
|
||||
|
||||
mod app;
|
||||
pub mod doctor_signal;
|
||||
pub mod logging;
|
||||
|
||||
use app::App;
|
||||
|
||||
/// Parser-version label persisted in `documents.parser_version` for
|
||||
/// every Markdown file ingested through the `kb-parse-md` pipeline.
|
||||
/// Kept in lock-step with the literal used in the `kb-store-sqlite`
|
||||
/// idempotency / round-trip tests so the version label written by the
|
||||
/// app and the one used in cross-crate fixtures match.
|
||||
const KB_PARSE_MD_VERSION: &str = "pulldown-cmark-0.x";
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AskOpts {
|
||||
pub k: usize,
|
||||
@@ -101,28 +132,679 @@ fn expand_tilde(s: &str) -> PathBuf {
|
||||
PathBuf::from(s)
|
||||
}
|
||||
|
||||
pub fn ingest(_scope: SourceScope, _summary_only: bool) -> anyhow::Result<IngestReport> {
|
||||
bail!("not yet wired (P1-2)")
|
||||
/// Load the active Config from XDG (or fall back to defaults). Mirrors
|
||||
/// what `kb-cli` does at the top of every subcommand path; we re-do
|
||||
/// the load inside each facade entry so callers don't have to thread
|
||||
/// a Config through.
|
||||
fn load_config() -> anyhow::Result<kb_config::Config> {
|
||||
kb_config::Config::load(None)
|
||||
}
|
||||
|
||||
pub fn list_docs(_filter: DocFilter) -> anyhow::Result<Vec<DocSummary>> {
|
||||
bail!("not yet wired (P1-5)")
|
||||
// ── ingest ────────────────────────────────────────────────────────────────
|
||||
|
||||
pub fn ingest(scope: SourceScope, summary_only: bool) -> anyhow::Result<IngestReport> {
|
||||
let config = load_config()?;
|
||||
ingest_with_config(config, scope, summary_only)
|
||||
}
|
||||
|
||||
pub fn inspect_doc(_id: &DocumentId) -> anyhow::Result<CanonicalDocument> {
|
||||
bail!("not yet wired (P1-5)")
|
||||
/// Test-only seam — kb-cli must call the public free function
|
||||
/// ([`ingest`]), not this. See module docs.
|
||||
#[doc(hidden)]
|
||||
pub fn ingest_with_config(
|
||||
config: kb_config::Config,
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
let started_instant = std::time::Instant::now();
|
||||
|
||||
let app = App::open(config)?;
|
||||
|
||||
// Walk the workspace.
|
||||
let connector = FsSourceConnector::new(&app.config)
|
||||
.context("kb-app::ingest: build FsSourceConnector")?;
|
||||
let assets = connector
|
||||
.scan(&scope)
|
||||
.context("kb-app::ingest: scan workspace")?;
|
||||
|
||||
// Embedder + vector store: build once at the top so the cold-start
|
||||
// cost is paid once even when the workspace has 1000 markdown files.
|
||||
let embedder = app.embedder()?;
|
||||
let vector_store = app.vector()?;
|
||||
|
||||
// If both are present, ensure the table exists for the (model, dim)
|
||||
// pair so the first per-doc upsert doesn't pay the create-table
|
||||
// round-trip.
|
||||
if let (Some(emb), Some(vec)) = (embedder.as_ref(), vector_store.as_ref()) {
|
||||
let mid = emb.model_id();
|
||||
vec.ensure_table(&mid, emb.dimensions())
|
||||
.context("kb-app::ingest: ensure Lance table")?;
|
||||
}
|
||||
|
||||
let parser_version = ParserVersion(KB_PARSE_MD_VERSION.to_string());
|
||||
let chunk_policy = chunk_policy_from_config(&app.config);
|
||||
|
||||
// Pre-load every existing doc_id so we can label `IngestItem.kind`
|
||||
// as `New` vs `Updated` correctly. `list_documents` returns one
|
||||
// row per `(workspace_path, asset_id)` — index by the deterministic
|
||||
// `doc_id` recipe input so the first ingest of an unseen file is
|
||||
// labelled `New`.
|
||||
let existing_doc_ids: std::collections::HashSet<String> = app
|
||||
.sqlite
|
||||
.list_documents(&DocFilter::default())
|
||||
.context("kb-app::ingest: list existing documents")?
|
||||
.into_iter()
|
||||
.map(|d| d.doc_id.0)
|
||||
.collect();
|
||||
|
||||
let started_at = time::OffsetDateTime::now_utc();
|
||||
|
||||
let mut items: Vec<kb_core::IngestItem> = Vec::new();
|
||||
let mut new_count: u32 = 0;
|
||||
let mut updated_count: u32 = 0;
|
||||
let mut skipped_count: u32 = 0;
|
||||
let mut error_count: u32 = 0;
|
||||
// Aggregate counts surfaced into `ingest_runs` (and tracing). Not
|
||||
// exposed on `IngestReport` today — `kb_core::IngestReport` is a
|
||||
// wire-stable struct without these fields — but persisting them
|
||||
// means audit tooling and `kb jobs` (P+) can recover the totals
|
||||
// without re-walking the DB.
|
||||
let mut chunks_indexed: u32 = 0;
|
||||
let mut embeddings_indexed: u32 = 0;
|
||||
let scanned_count: u32 = u32::try_from(assets.len()).unwrap_or(u32::MAX);
|
||||
|
||||
let embed_active = embedder.is_some() && vector_store.is_some();
|
||||
|
||||
for asset in assets {
|
||||
let item = ingest_one_asset(
|
||||
&app,
|
||||
&asset,
|
||||
&parser_version,
|
||||
&chunk_policy,
|
||||
embedder.as_ref(),
|
||||
vector_store.as_ref(),
|
||||
&existing_doc_ids,
|
||||
);
|
||||
|
||||
let item = match item {
|
||||
Ok(i) => i,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
target: "kb-app",
|
||||
path = %asset.workspace_path.0,
|
||||
error = %e,
|
||||
"kb-app::ingest: per-file fatal"
|
||||
);
|
||||
error_count = error_count.saturating_add(1);
|
||||
kb_core::IngestItem {
|
||||
kind: kb_core::IngestItemKind::Error,
|
||||
doc_id: None,
|
||||
doc_path: asset.workspace_path.clone(),
|
||||
asset_id: Some(asset.asset_id.clone()),
|
||||
byte_len: Some(asset.byte_len),
|
||||
block_count: None,
|
||||
chunk_count: None,
|
||||
parser_version: None,
|
||||
chunker_version: None,
|
||||
warnings: Vec::new(),
|
||||
error: Some(format!("{e:#}")),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match item.kind {
|
||||
kb_core::IngestItemKind::New => {
|
||||
new_count = new_count.saturating_add(1);
|
||||
let n = item.chunk_count.unwrap_or(0);
|
||||
chunks_indexed = chunks_indexed.saturating_add(n);
|
||||
if embed_active {
|
||||
embeddings_indexed = embeddings_indexed.saturating_add(n);
|
||||
}
|
||||
}
|
||||
kb_core::IngestItemKind::Updated => {
|
||||
updated_count = updated_count.saturating_add(1);
|
||||
let n = item.chunk_count.unwrap_or(0);
|
||||
chunks_indexed = chunks_indexed.saturating_add(n);
|
||||
if embed_active {
|
||||
embeddings_indexed = embeddings_indexed.saturating_add(n);
|
||||
}
|
||||
}
|
||||
kb_core::IngestItemKind::Skipped => {
|
||||
skipped_count = skipped_count.saturating_add(1)
|
||||
}
|
||||
kb_core::IngestItemKind::Error => {
|
||||
error_count = error_count.saturating_add(1)
|
||||
}
|
||||
}
|
||||
items.push(item);
|
||||
}
|
||||
|
||||
// Record a row in `jobs` so `kb jobs` (P+) can list the run. Distinct
|
||||
// from the `ingest_runs` row written below — the `jobs` table is the
|
||||
// generic job-lifecycle surface (`kind=ingest`), `ingest_runs` is the
|
||||
// ingest-specific aggregate counts row.
|
||||
let payload = serde_json::json!({
|
||||
"scope": scope,
|
||||
"summary_only": summary_only,
|
||||
});
|
||||
let job_id_res = <SqliteStoreAlias as kb_core::JobRepo>::create(
|
||||
&app.sqlite,
|
||||
kb_core::JobKind::Ingest,
|
||||
payload,
|
||||
);
|
||||
match job_id_res {
|
||||
Ok(jid) => {
|
||||
// Stash the aggregate counts as the job's `progress_json`
|
||||
// so a future `kb jobs show` can surface them without
|
||||
// joining `ingest_runs`.
|
||||
let progress = serde_json::json!({
|
||||
"scanned": scanned_count,
|
||||
"new": new_count,
|
||||
"updated": updated_count,
|
||||
"skipped": skipped_count,
|
||||
"errors": error_count,
|
||||
"chunks_indexed": chunks_indexed,
|
||||
"embeddings_indexed": embeddings_indexed,
|
||||
});
|
||||
if let Err(e) = <SqliteStoreAlias as kb_core::JobRepo>::update_progress(
|
||||
&app.sqlite,
|
||||
&jid,
|
||||
progress,
|
||||
) {
|
||||
tracing::warn!(
|
||||
target: "kb-app",
|
||||
error = %e,
|
||||
"kb-app::ingest: JobRepo::update_progress failed"
|
||||
);
|
||||
}
|
||||
if let Err(e) = <SqliteStoreAlias as kb_core::JobRepo>::finish(
|
||||
&app.sqlite,
|
||||
&jid,
|
||||
kb_core::JobStatus::Succeeded,
|
||||
None,
|
||||
) {
|
||||
tracing::warn!(
|
||||
target: "kb-app",
|
||||
error = %e,
|
||||
"kb-app::ingest: JobRepo::finish failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
target: "kb-app",
|
||||
error = %e,
|
||||
"kb-app::ingest: JobRepo::create failed; run not recorded in `jobs`"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let duration_ms = u32::try_from(started_instant.elapsed().as_millis())
|
||||
.unwrap_or(u32::MAX);
|
||||
let finished_at = time::OffsetDateTime::now_utc();
|
||||
|
||||
// Record the ingest_runs row with aggregate counts.
|
||||
// `summary_only=true` writes `items_json=NULL` (per design §5.7);
|
||||
// the count columns are populated either way.
|
||||
let scope_json = serde_json::to_string(&scope)
|
||||
.context("kb-app::ingest: serialize scope for ingest_runs.scope_json")?;
|
||||
let items_json: Option<String> = if summary_only {
|
||||
None
|
||||
} else {
|
||||
match serde_json::to_string(&items) {
|
||||
Ok(s) => Some(s),
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
target: "kb-app",
|
||||
error = %e,
|
||||
"kb-app::ingest: failed to serialize items_json; storing NULL"
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
let run_id = mint_ingest_run_id(&scope_json, started_at);
|
||||
let row = kb_store_sqlite::IngestRunRow {
|
||||
run_id: &run_id,
|
||||
scope_json: &scope_json,
|
||||
scanned: scanned_count,
|
||||
new_count,
|
||||
updated_count,
|
||||
skipped_count,
|
||||
error_count,
|
||||
duration_ms,
|
||||
started_at,
|
||||
finished_at,
|
||||
items_json: items_json.as_deref(),
|
||||
};
|
||||
if let Err(e) = app.sqlite.record_ingest_run(&row) {
|
||||
tracing::warn!(
|
||||
target: "kb-app",
|
||||
error = %e,
|
||||
"kb-app::ingest: record_ingest_run failed"
|
||||
);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
target: "kb-app",
|
||||
scanned = scanned_count,
|
||||
new = new_count,
|
||||
updated = updated_count,
|
||||
skipped = skipped_count,
|
||||
errors = error_count,
|
||||
chunks_indexed,
|
||||
embeddings_indexed,
|
||||
duration_ms,
|
||||
"kb-app::ingest: run complete"
|
||||
);
|
||||
|
||||
Ok(IngestReport {
|
||||
scope,
|
||||
scanned: scanned_count,
|
||||
new: new_count,
|
||||
updated: updated_count,
|
||||
skipped: skipped_count,
|
||||
errors: error_count,
|
||||
duration_ms,
|
||||
items: if summary_only { None } else { Some(items) },
|
||||
})
|
||||
}
|
||||
|
||||
pub fn inspect_chunk(_id: &ChunkId) -> anyhow::Result<Chunk> {
|
||||
bail!("not yet wired (P1-5)")
|
||||
/// Mint a stable 32-hex-char `run_id` for an `ingest_runs` row.
|
||||
/// `(scope, started_at_nanos)` is enough to make two runs with the
|
||||
/// same scope started a nanosecond apart distinguish — same shape as
|
||||
/// the JobId recipe in `kb-store-sqlite::jobs`.
|
||||
fn mint_ingest_run_id(scope_json: &str, at: time::OffsetDateTime) -> String {
|
||||
let mut hasher = blake3::Hasher::new();
|
||||
hasher.update(scope_json.as_bytes());
|
||||
hasher.update(&at.unix_timestamp_nanos().to_be_bytes());
|
||||
let hex = hasher.finalize().to_hex().to_string();
|
||||
hex[..32].to_string()
|
||||
}
|
||||
|
||||
pub fn search(_query: SearchQuery) -> anyhow::Result<Vec<SearchHit>> {
|
||||
bail!("not yet wired (P3-1/P4-1)")
|
||||
/// Trait alias type used to disambiguate the two impls (`DocumentStore`
|
||||
/// vs `JobRepo`) on the same store. Plain `app.sqlite.create(...)`
|
||||
/// would pick one based on inherent vs trait methods; we go through
|
||||
/// `<… as JobRepo>` to be explicit.
|
||||
type SqliteStoreAlias = kb_store_sqlite::SqliteStore;
|
||||
|
||||
/// Process a single asset: read bytes, parse, normalize, chunk,
|
||||
/// persist, embed. Per-asset failures bubble up to the caller for
|
||||
/// labelling as `IngestItemKind::Error` — they do NOT abort the
|
||||
/// whole run.
|
||||
fn ingest_one_asset(
|
||||
app: &App,
|
||||
asset: &RawAsset,
|
||||
parser_version: &ParserVersion,
|
||||
chunk_policy: &ChunkPolicy,
|
||||
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
|
||||
vector_store: Option<&Arc<kb_store_vector::LanceVectorStore>>,
|
||||
existing_doc_ids: &std::collections::HashSet<String>,
|
||||
) -> anyhow::Result<kb_core::IngestItem> {
|
||||
tracing::debug!(
|
||||
target: "kb-app::ingest",
|
||||
path = %asset.workspace_path.0,
|
||||
"processing asset"
|
||||
);
|
||||
// Only handle Markdown for now; other media types are P6+ work.
|
||||
if asset.media_type != kb_core::MediaType::Markdown {
|
||||
return Ok(kb_core::IngestItem {
|
||||
kind: kb_core::IngestItemKind::Skipped,
|
||||
doc_id: None,
|
||||
doc_path: asset.workspace_path.clone(),
|
||||
asset_id: Some(asset.asset_id.clone()),
|
||||
byte_len: Some(asset.byte_len),
|
||||
block_count: None,
|
||||
chunk_count: None,
|
||||
parser_version: None,
|
||||
chunker_version: None,
|
||||
warnings: Vec::new(),
|
||||
error: None,
|
||||
});
|
||||
}
|
||||
|
||||
let path = match &asset.source_uri {
|
||||
SourceUri::File(p) => p.clone(),
|
||||
SourceUri::Kb(_) => {
|
||||
return Ok(kb_core::IngestItem {
|
||||
kind: kb_core::IngestItemKind::Skipped,
|
||||
doc_id: None,
|
||||
doc_path: asset.workspace_path.clone(),
|
||||
asset_id: Some(asset.asset_id.clone()),
|
||||
byte_len: Some(asset.byte_len),
|
||||
block_count: None,
|
||||
chunk_count: None,
|
||||
parser_version: None,
|
||||
chunker_version: None,
|
||||
warnings: vec![
|
||||
"kb:// source URIs are not supported by the fs ingester".into(),
|
||||
],
|
||||
error: None,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let bytes = std::fs::read(&path)
|
||||
.with_context(|| format!("read asset bytes from {}", path.display()))?;
|
||||
|
||||
let body_hints = build_body_hints(asset);
|
||||
|
||||
// Frontmatter — `parse_frontmatter` returns Ok even on malformed
|
||||
// frontmatter (warnings are surfaced through the `Vec<Warning>`).
|
||||
let (metadata, fm_span, fm_warns) = parse_frontmatter(&bytes, &body_hints)
|
||||
.context("kb-parse-md::parse_frontmatter")?;
|
||||
|
||||
let body_offset_lines = match fm_span {
|
||||
Some(span) => count_lines_in(&bytes[..span.end]),
|
||||
None => 0,
|
||||
};
|
||||
|
||||
let (parsed_blocks, blk_warns) = parse_blocks(&bytes[fm_span_end(fm_span)..], body_offset_lines)
|
||||
.context("kb-parse-md::parse_blocks")?;
|
||||
|
||||
let mut all_warnings = Vec::with_capacity(fm_warns.len() + blk_warns.len());
|
||||
all_warnings.extend(fm_warns);
|
||||
all_warnings.extend(blk_warns);
|
||||
|
||||
// Snapshot warning notes for the IngestItem before the vec is
|
||||
// consumed by `build_canonical_document`.
|
||||
let warning_notes: Vec<String> = all_warnings
|
||||
.iter()
|
||||
.map(|w| format!("{:?}: {}", w.kind, w.note))
|
||||
.collect();
|
||||
|
||||
let canonical = build_canonical_document(
|
||||
asset,
|
||||
metadata,
|
||||
parsed_blocks,
|
||||
parser_version,
|
||||
all_warnings,
|
||||
)
|
||||
.context("kb-normalize::build_canonical_document")?;
|
||||
|
||||
let chunks = MdHeadingV1Chunker
|
||||
.chunk(&canonical, chunk_policy)
|
||||
.context("kb-chunk::MdHeadingV1Chunker::chunk")?;
|
||||
|
||||
// Persist. Each `put_*` call wraps its own short transaction
|
||||
// (per-document tx semantics per design §5.8); composing them is
|
||||
// the kb-app job. A failure mid-way leaves the DB in a state the
|
||||
// next ingest run can re-converge (UPSERT + DELETE-then-INSERT).
|
||||
app.sqlite
|
||||
.put_asset_with_bytes(asset, &bytes)
|
||||
.context("DocumentStore::put_asset_with_bytes")?;
|
||||
app.sqlite
|
||||
.put_document(&canonical)
|
||||
.context("DocumentStore::put_document")?;
|
||||
app.sqlite
|
||||
.put_blocks(&canonical.doc_id, &canonical.blocks)
|
||||
.context("DocumentStore::put_blocks")?;
|
||||
app.sqlite
|
||||
.put_chunks(&canonical.doc_id, &chunks)
|
||||
.context("DocumentStore::put_chunks")?;
|
||||
|
||||
// Embed + vector upsert (only when both sides are configured).
|
||||
if let (Some(emb), Some(vec_store)) = (embedder, vector_store) {
|
||||
if !chunks.is_empty() {
|
||||
let inputs: Vec<EmbeddingInput<'_>> = chunks
|
||||
.iter()
|
||||
.map(|c| EmbeddingInput {
|
||||
text: c.text.as_str(),
|
||||
kind: EmbeddingKind::Document,
|
||||
})
|
||||
.collect();
|
||||
let vectors = emb
|
||||
.embed(&inputs)
|
||||
.context("Embedder::embed (document chunks)")?;
|
||||
let model_id = emb.model_id();
|
||||
let model_version = emb.model_version();
|
||||
let dimensions = emb.dimensions();
|
||||
let records: Vec<VectorRecord> = chunks
|
||||
.iter()
|
||||
.zip(vectors)
|
||||
.map(|(c, v)| VectorRecord {
|
||||
embedding_id: kb_core::id_for_embedding(
|
||||
&c.chunk_id,
|
||||
&model_id,
|
||||
&model_version,
|
||||
dimensions,
|
||||
),
|
||||
chunk_id: c.chunk_id.clone(),
|
||||
vector: v,
|
||||
doc_id: canonical.doc_id.clone(),
|
||||
text: c.text.clone(),
|
||||
heading_path: c.heading_path.clone(),
|
||||
model_id: model_id.clone(),
|
||||
model_version: model_version.clone(),
|
||||
dimensions,
|
||||
})
|
||||
.collect();
|
||||
vec_store
|
||||
.upsert(&records)
|
||||
.context("VectorStore::upsert")?;
|
||||
}
|
||||
}
|
||||
|
||||
let kind = if existing_doc_ids.contains(&canonical.doc_id.0) {
|
||||
kb_core::IngestItemKind::Updated
|
||||
} else {
|
||||
kb_core::IngestItemKind::New
|
||||
};
|
||||
|
||||
Ok(kb_core::IngestItem {
|
||||
kind,
|
||||
doc_id: Some(canonical.doc_id.clone()),
|
||||
doc_path: asset.workspace_path.clone(),
|
||||
asset_id: Some(asset.asset_id.clone()),
|
||||
byte_len: Some(asset.byte_len),
|
||||
block_count: u32::try_from(canonical.blocks.len()).ok(),
|
||||
chunk_count: u32::try_from(chunks.len()).ok(),
|
||||
parser_version: Some(parser_version.clone()),
|
||||
chunker_version: Some(MdHeadingV1Chunker.chunker_version()),
|
||||
warnings: warning_notes,
|
||||
error: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Convenience: end byte of the frontmatter region (or 0 when absent).
|
||||
fn fm_span_end(span: Option<kb_parse_md::FrontmatterSpan>) -> usize {
|
||||
span.map(|s| s.end).unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Count `\n` in a byte prefix to convert frontmatter byte span to
|
||||
/// the line-offset `parse_blocks` expects.
|
||||
fn count_lines_in(bytes: &[u8]) -> u32 {
|
||||
let n = bytes.iter().filter(|&&b| b == b'\n').count();
|
||||
u32::try_from(n).unwrap_or(u32::MAX)
|
||||
}
|
||||
|
||||
/// Build `BodyHints` from the asset alone. We use the asset's
|
||||
/// `discovered_at` for both `fs_ctime` and `fs_mtime` because going
|
||||
/// through the FS metadata API for every file would be a noticeable
|
||||
/// overhead for large workspaces and the source-of-truth timestamps
|
||||
/// are written into the document's frontmatter when the user wants
|
||||
/// authoritative values.
|
||||
fn build_body_hints(asset: &RawAsset) -> BodyHints {
|
||||
BodyHints {
|
||||
first_h1: None,
|
||||
fs_ctime: asset.discovered_at,
|
||||
fs_mtime: asset.discovered_at,
|
||||
fallback_lang: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a `ChunkPolicy` from the active config.
|
||||
fn chunk_policy_from_config(config: &kb_config::Config) -> ChunkPolicy {
|
||||
ChunkPolicy {
|
||||
target_tokens: config.chunking.target_tokens,
|
||||
overlap_tokens: config.chunking.overlap_tokens,
|
||||
respect_markdown_headings: config.chunking.respect_markdown_headings,
|
||||
chunker_version: ChunkerVersion(config.chunking.chunker_version.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
// ── list_docs / inspect_doc / inspect_chunk ───────────────────────────────
|
||||
|
||||
pub fn list_docs(filter: DocFilter) -> anyhow::Result<Vec<DocSummary>> {
|
||||
let config = load_config()?;
|
||||
list_docs_with_config(config, filter)
|
||||
}
|
||||
|
||||
/// Test-only seam — kb-cli must call the public free function
|
||||
/// ([`list_docs`]), not this.
|
||||
#[doc(hidden)]
|
||||
pub fn list_docs_with_config(
|
||||
config: kb_config::Config,
|
||||
filter: DocFilter,
|
||||
) -> anyhow::Result<Vec<DocSummary>> {
|
||||
let app = App::open(config)?;
|
||||
app.sqlite.list_documents(&filter)
|
||||
}
|
||||
|
||||
pub fn inspect_doc(id: &DocumentId) -> anyhow::Result<CanonicalDocument> {
|
||||
let config = load_config()?;
|
||||
inspect_doc_with_config(config, id)
|
||||
}
|
||||
|
||||
/// Test-only seam — kb-cli must call the public free function
|
||||
/// ([`inspect_doc`]), not this.
|
||||
#[doc(hidden)]
|
||||
pub fn inspect_doc_with_config(
|
||||
config: kb_config::Config,
|
||||
id: &DocumentId,
|
||||
) -> anyhow::Result<CanonicalDocument> {
|
||||
let app = App::open(config)?;
|
||||
app.sqlite
|
||||
.get_document(id)?
|
||||
.ok_or_else(|| anyhow!("document not found: {} (try `kb list docs`)", id.0))
|
||||
}
|
||||
|
||||
pub fn inspect_chunk(id: &ChunkId) -> anyhow::Result<Chunk> {
|
||||
let config = load_config()?;
|
||||
inspect_chunk_with_config(config, id)
|
||||
}
|
||||
|
||||
/// Test-only seam — kb-cli must call the public free function
|
||||
/// ([`inspect_chunk`]), not this.
|
||||
#[doc(hidden)]
|
||||
pub fn inspect_chunk_with_config(
|
||||
config: kb_config::Config,
|
||||
id: &ChunkId,
|
||||
) -> anyhow::Result<Chunk> {
|
||||
let app = App::open(config)?;
|
||||
app.sqlite
|
||||
.get_chunk(id)?
|
||||
.ok_or_else(|| anyhow!("chunk not found: {} (try `kb inspect doc <id>`)", id.0))
|
||||
}
|
||||
|
||||
// ── search ────────────────────────────────────────────────────────────────
|
||||
|
||||
pub fn search(query: SearchQuery) -> anyhow::Result<Vec<SearchHit>> {
|
||||
let config = load_config()?;
|
||||
search_with_config(config, query)
|
||||
}
|
||||
|
||||
/// Test-only seam — kb-cli must call the public free function
|
||||
/// ([`search`]), not this.
|
||||
#[doc(hidden)]
|
||||
pub fn search_with_config(
|
||||
config: kb_config::Config,
|
||||
query: SearchQuery,
|
||||
) -> anyhow::Result<Vec<SearchHit>> {
|
||||
let app = App::open(config)?;
|
||||
|
||||
match query.mode {
|
||||
SearchMode::Lexical => {
|
||||
let lex = LexicalRetriever::with_settings(
|
||||
app.sqlite.clone(),
|
||||
lexical_index_version(&app.config),
|
||||
app.config.search.snippet_chars,
|
||||
);
|
||||
lex.search(&query)
|
||||
}
|
||||
SearchMode::Vector => {
|
||||
let (emb, vec_store) = require_embeddings(&app)?;
|
||||
let vec_iv = vector_index_version(emb.as_ref());
|
||||
let vec_dyn: Arc<dyn VectorStore + Send + Sync> = vec_store;
|
||||
let emb_dyn: Arc<dyn Embedder> = emb;
|
||||
let retr = VectorRetriever::with_settings(
|
||||
vec_dyn,
|
||||
emb_dyn,
|
||||
app.sqlite.clone(),
|
||||
vec_iv,
|
||||
app.config.search.snippet_chars,
|
||||
);
|
||||
retr.search(&query)
|
||||
}
|
||||
SearchMode::Hybrid => {
|
||||
let lex = Arc::new(LexicalRetriever::with_settings(
|
||||
app.sqlite.clone(),
|
||||
lexical_index_version(&app.config),
|
||||
app.config.search.snippet_chars,
|
||||
)) as Arc<dyn Retriever>;
|
||||
let (emb, vec_store) = require_embeddings(&app)?;
|
||||
let vec_iv = vector_index_version(emb.as_ref());
|
||||
let vec_dyn: Arc<dyn VectorStore + Send + Sync> = vec_store;
|
||||
let emb_dyn: Arc<dyn Embedder> = emb;
|
||||
let vec_retr = Arc::new(VectorRetriever::with_settings(
|
||||
vec_dyn,
|
||||
emb_dyn,
|
||||
app.sqlite.clone(),
|
||||
vec_iv,
|
||||
app.config.search.snippet_chars,
|
||||
)) as Arc<dyn Retriever>;
|
||||
let hybrid = HybridRetriever::new(&app.config, lex, vec_retr);
|
||||
hybrid.search(&query)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn require_embeddings(
|
||||
app: &App,
|
||||
) -> anyhow::Result<(
|
||||
Arc<dyn Embedder + Send + Sync>,
|
||||
Arc<kb_store_vector::LanceVectorStore>,
|
||||
)> {
|
||||
let emb = app.embedder()?.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"embeddings disabled (config.models.embedding.provider == \"none\" \
|
||||
or dimensions == 0); vector / hybrid search require embeddings — \
|
||||
switch to --mode lexical or enable an embedding provider in config.toml"
|
||||
)
|
||||
})?;
|
||||
let vec_store = app.vector()?.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"vector store unavailable while embedder is configured — this should \
|
||||
not happen; check `kb doctor` and the data_dir permissions"
|
||||
)
|
||||
})?;
|
||||
Ok((emb, vec_store))
|
||||
}
|
||||
|
||||
/// Compose a stable `IndexVersion` for the lexical retriever from
|
||||
/// the active config. This token surfaces in `SearchHit.index_version`
|
||||
/// and on snapshot tests; including the chunker version pins it to
|
||||
/// the chunking policy in effect.
|
||||
fn lexical_index_version(config: &kb_config::Config) -> IndexVersion {
|
||||
IndexVersion(format!("lex:{}", config.chunking.chunker_version))
|
||||
}
|
||||
|
||||
/// Compose a stable `IndexVersion` for the vector retriever. Tracks
|
||||
/// `(embedding_model, embedding_version, dimensions)` so a model swap
|
||||
/// flags drift via the existing index_version mismatch warning in
|
||||
/// `HybridRetriever::new`.
|
||||
fn vector_index_version(embedder: &dyn Embedder) -> IndexVersion {
|
||||
IndexVersion(format!(
|
||||
"vec:{}@{}:{}",
|
||||
embedder.model_id().0,
|
||||
embedder.model_version().0,
|
||||
embedder.dimensions(),
|
||||
))
|
||||
}
|
||||
|
||||
// ── ask (still stubbed — P4-3) ────────────────────────────────────────────
|
||||
|
||||
pub fn ask(_query: &str, _opts: AskOpts) -> anyhow::Result<Answer> {
|
||||
bail!("not yet wired (P5-1)")
|
||||
anyhow::bail!("not yet wired (P4-3)")
|
||||
}
|
||||
|
||||
/// Run the doctor checks. P0 emits `config_loaded` + `data_dir_writable`
|
||||
|
||||
104
crates/kb-app/tests/common/mod.rs
Normal file
104
crates/kb-app/tests/common/mod.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
//! Shared test scaffolding for `kb-app` integration tests.
|
||||
//!
|
||||
//! Each test gets a fresh `TempDir` and a `Config` whose storage paths
|
||||
//! all point inside it, so the user's real `data_dir` / `model_dir`
|
||||
//! is never touched. The fixture workspace at
|
||||
//! `tests/fixtures/workspace/` is *copied* into the temp dir for each
|
||||
//! test so a write-side ingest can't trip on a read-only fixture
|
||||
//! tree. The default lane (no `--ignored`) opts out of embeddings via
|
||||
//! `provider = "none"` so AVX is not required.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use kb_config::Config;
|
||||
use tempfile::TempDir;
|
||||
|
||||
/// Test environment: owns a `TempDir` and exposes a `Config` whose
|
||||
/// storage paths live inside it.
|
||||
pub struct TestEnv {
|
||||
pub temp: TempDir,
|
||||
pub workspace_root: PathBuf,
|
||||
pub config: Config,
|
||||
}
|
||||
|
||||
impl TestEnv {
|
||||
/// Build an env with embeddings disabled (lexical-only). Default
|
||||
/// lane — no AVX, no fastembed download.
|
||||
pub fn lexical_only() -> Self {
|
||||
let env = Self::new_inner();
|
||||
let mut e = env;
|
||||
e.config.models.embedding.provider = "none".to_string();
|
||||
e.config.models.embedding.dimensions = 0;
|
||||
e
|
||||
}
|
||||
|
||||
/// Build an env with the default fastembed embedding provider.
|
||||
/// Used by AVX-gated `#[ignore]` tests.
|
||||
pub fn with_embeddings() -> Self {
|
||||
Self::new_inner()
|
||||
}
|
||||
|
||||
fn new_inner() -> Self {
|
||||
let temp = tempfile::tempdir().expect("tempdir");
|
||||
let workspace_root = temp.path().join("workspace");
|
||||
copy_fixture_workspace(&workspace_root);
|
||||
|
||||
let data_dir = temp.path().join("data");
|
||||
std::fs::create_dir_all(&data_dir).unwrap();
|
||||
let model_dir = temp.path().join("models");
|
||||
std::fs::create_dir_all(&model_dir).unwrap();
|
||||
|
||||
let mut config = Config::defaults();
|
||||
config.workspace.root = workspace_root.to_string_lossy().into_owned();
|
||||
// Drop the ".obsidian" / "node_modules" excludes — they bring
|
||||
// in nothing useful for fixtures and just hide debugging.
|
||||
config.workspace.exclude.clear();
|
||||
config.storage.data_dir = data_dir.to_string_lossy().into_owned();
|
||||
// Pin model_dir to the TempDir so a future fastembed-touching
|
||||
// test can't accidentally write to the user's `~/.local/share`.
|
||||
config.storage.model_dir = model_dir.to_string_lossy().into_owned();
|
||||
// Drop in a small chunk policy so the fixture's small files
|
||||
// emit at least a couple of chunks even with overlap_tokens
|
||||
// honored.
|
||||
config.chunking.target_tokens = 80;
|
||||
config.chunking.overlap_tokens = 20;
|
||||
|
||||
Self {
|
||||
temp,
|
||||
workspace_root,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn scope(&self) -> kb_core::SourceScope {
|
||||
kb_core::SourceScope {
|
||||
root: self.workspace_root.clone(),
|
||||
include: self.config.workspace.include.clone(),
|
||||
exclude: self.config.workspace.exclude.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn copy_fixture_workspace(dest: &Path) {
|
||||
let src = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("tests")
|
||||
.join("fixtures")
|
||||
.join("workspace");
|
||||
copy_dir_recursive(&src, dest);
|
||||
}
|
||||
|
||||
fn copy_dir_recursive(src: &Path, dest: &Path) {
|
||||
std::fs::create_dir_all(dest).unwrap();
|
||||
for entry in std::fs::read_dir(src).expect("read fixture dir") {
|
||||
let entry = entry.unwrap();
|
||||
let path = entry.path();
|
||||
let target = dest.join(entry.file_name());
|
||||
if path.is_dir() {
|
||||
copy_dir_recursive(&path, &target);
|
||||
} else {
|
||||
std::fs::copy(&path, &target).expect("copy fixture file");
|
||||
}
|
||||
}
|
||||
}
|
||||
24
crates/kb-app/tests/fixtures/workspace/intro.md
vendored
Normal file
24
crates/kb-app/tests/fixtures/workspace/intro.md
vendored
Normal file
@@ -0,0 +1,24 @@
|
||||
---
|
||||
title: Introduction to Rust
|
||||
tags: [rust, language]
|
||||
lang: en
|
||||
created_at: 2024-03-01T00:00:00Z
|
||||
updated_at: 2024-03-02T00:00:00Z
|
||||
source_type: note
|
||||
trust_level: primary
|
||||
---
|
||||
|
||||
# Introduction to Rust
|
||||
|
||||
Rust is a systems programming language focused on safety, speed, and concurrency.
|
||||
The compiler enforces memory safety without a garbage collector.
|
||||
|
||||
## Ownership
|
||||
|
||||
Each value has a single owner. When the owner goes out of scope the value is
|
||||
dropped. References are borrows that the compiler tracks at compile time.
|
||||
|
||||
## Concurrency
|
||||
|
||||
Threads in Rust use the ownership system to prevent data races. The Send and
|
||||
Sync traits codify which types can move between threads.
|
||||
23
crates/kb-app/tests/fixtures/workspace/notes/cargo.md
vendored
Normal file
23
crates/kb-app/tests/fixtures/workspace/notes/cargo.md
vendored
Normal file
@@ -0,0 +1,23 @@
|
||||
---
|
||||
title: Cargo Notes
|
||||
tags: [rust, cargo, tools]
|
||||
lang: en
|
||||
created_at: 2024-04-01T00:00:00Z
|
||||
updated_at: 2024-04-02T00:00:00Z
|
||||
source_type: note
|
||||
trust_level: primary
|
||||
---
|
||||
|
||||
# Cargo Notes
|
||||
|
||||
Cargo is the Rust package manager and build tool.
|
||||
|
||||
## Workspaces
|
||||
|
||||
A workspace is a set of packages that share the same `Cargo.lock` and output
|
||||
directory. Member crates are listed under `[workspace.members]`.
|
||||
|
||||
## Features
|
||||
|
||||
Cargo features let crates expose optional functionality behind a feature flag.
|
||||
Default features are enabled unless `default-features = false` is set.
|
||||
23
crates/kb-app/tests/fixtures/workspace/notes/python.md
vendored
Normal file
23
crates/kb-app/tests/fixtures/workspace/notes/python.md
vendored
Normal file
@@ -0,0 +1,23 @@
|
||||
---
|
||||
title: Python Snippets
|
||||
tags: [python, language]
|
||||
lang: en
|
||||
created_at: 2024-05-01T00:00:00Z
|
||||
updated_at: 2024-05-02T00:00:00Z
|
||||
source_type: note
|
||||
trust_level: primary
|
||||
---
|
||||
|
||||
# Python Snippets
|
||||
|
||||
Quick reference for everyday Python tasks.
|
||||
|
||||
## List comprehensions
|
||||
|
||||
Filter and transform in one pass: `[x*2 for x in xs if x > 0]`. Cleaner than
|
||||
the map+filter pair when the predicate is simple.
|
||||
|
||||
## Decorators
|
||||
|
||||
Wrap a function in another function. `functools.wraps` preserves the
|
||||
docstring and `__name__` of the inner function on the outer wrapper.
|
||||
220
crates/kb-app/tests/ingest_lexical.rs
Normal file
220
crates/kb-app/tests/ingest_lexical.rs
Normal file
@@ -0,0 +1,220 @@
|
||||
//! Integration tests for `kb-app::ingest` + `list_docs` + `inspect_*`
|
||||
//! along the lexical-only path (no embeddings → no AVX requirement).
|
||||
|
||||
mod common;
|
||||
|
||||
use common::TestEnv;
|
||||
|
||||
#[test]
|
||||
fn ingest_then_list_inspects_round_trip() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let report =
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
|
||||
|
||||
// The fixture has 3 markdown files; first ingest should label them
|
||||
// all as New.
|
||||
assert_eq!(report.scanned, 3, "scanned: {report:?}");
|
||||
assert_eq!(report.new, 3, "new: {report:?}");
|
||||
assert_eq!(report.updated, 0, "updated: {report:?}");
|
||||
assert_eq!(report.errors, 0, "errors: {report:?}");
|
||||
let items = report.items.as_ref().expect("items present");
|
||||
assert_eq!(items.len(), 3);
|
||||
for it in items {
|
||||
assert!(it.error.is_none(), "per-item error: {it:?}");
|
||||
assert!(it.doc_id.is_some());
|
||||
// Each fixture file emits ≥1 chunk.
|
||||
assert!(it.chunk_count.unwrap_or(0) >= 1, "chunks: {it:?}");
|
||||
}
|
||||
|
||||
// list_docs returns the 3 docs.
|
||||
let docs = kb_app::list_docs_with_config(
|
||||
env.config.clone(),
|
||||
kb_core::DocFilter::default(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(docs.len(), 3, "docs: {docs:?}");
|
||||
|
||||
// inspect_doc round-trips one of them.
|
||||
let any_doc_id = docs[0].doc_id.clone();
|
||||
let canonical = kb_app::inspect_doc_with_config(env.config.clone(), &any_doc_id)
|
||||
.unwrap();
|
||||
assert_eq!(canonical.doc_id, any_doc_id);
|
||||
assert!(!canonical.blocks.is_empty(), "blocks empty");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_idempotent_on_second_run() {
|
||||
let env = TestEnv::lexical_only();
|
||||
|
||||
let r1 =
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
|
||||
assert_eq!(r1.new, 3);
|
||||
|
||||
let r2 =
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
|
||||
// Same files re-ingested — labelled Updated, not duplicated.
|
||||
assert_eq!(r2.scanned, 3, "second scan: {r2:?}");
|
||||
assert_eq!(r2.new, 0, "second run new should be 0: {r2:?}");
|
||||
assert_eq!(r2.updated, 3, "second run updated: {r2:?}");
|
||||
|
||||
// list_docs still has 3 docs (no duplicates).
|
||||
let docs = kb_app::list_docs_with_config(
|
||||
env.config.clone(),
|
||||
kb_core::DocFilter::default(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(docs.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_summary_only_drops_items() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let report =
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
assert_eq!(report.scanned, 3);
|
||||
assert!(report.items.is_none(), "summary-only should null items");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_records_ingest_runs_row_with_aggregate_counts() {
|
||||
// The ingest_runs table is the §5.7 sibling of `jobs`: dedicated
|
||||
// count columns (`scanned`, `new_count`, …) populated at the end
|
||||
// of every run. `summary_only=true` writes `items_json=NULL`; the
|
||||
// counts MUST still be present.
|
||||
let env = TestEnv::lexical_only();
|
||||
let report = kb_app::ingest_with_config(env.config.clone(), env.scope(), true)
|
||||
.unwrap();
|
||||
assert_eq!(report.scanned, 3);
|
||||
|
||||
let db_path = std::path::PathBuf::from(&env.config.storage.data_dir)
|
||||
.join("kb.sqlite");
|
||||
let conn = rusqlite::Connection::open(&db_path).expect("open kb.sqlite");
|
||||
let (scanned, new_c, updated, skipped, errors, items_json): (
|
||||
i64,
|
||||
i64,
|
||||
i64,
|
||||
i64,
|
||||
i64,
|
||||
Option<String>,
|
||||
) = conn
|
||||
.query_row(
|
||||
"SELECT scanned, new_count, updated_count, skipped_count,
|
||||
error_count, items_json
|
||||
FROM ingest_runs
|
||||
ORDER BY started_at DESC
|
||||
LIMIT 1",
|
||||
[],
|
||||
|r| {
|
||||
Ok((
|
||||
r.get(0)?,
|
||||
r.get(1)?,
|
||||
r.get(2)?,
|
||||
r.get(3)?,
|
||||
r.get(4)?,
|
||||
r.get(5)?,
|
||||
))
|
||||
},
|
||||
)
|
||||
.expect("ingest_runs row present");
|
||||
assert_eq!(scanned, 3);
|
||||
assert_eq!(new_c, 3);
|
||||
assert_eq!(updated, 0);
|
||||
assert_eq!(skipped, 0);
|
||||
assert_eq!(errors, 0);
|
||||
assert!(
|
||||
items_json.is_none(),
|
||||
"summary_only=true must store items_json=NULL: {items_json:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_provider_none_skips_lance() {
|
||||
// `provider="none"` must short-circuit the embedder + vector store
|
||||
// build entirely, so the LanceDB directory MUST NOT be created on
|
||||
// disk during ingest. `IngestReport` currently has no
|
||||
// `embeddings_indexed` field, so we assert via the on-disk lance
|
||||
// tree shape (no `<data_dir>/lancedb` directory, or no `*.lance`
|
||||
// tables under it).
|
||||
let env = TestEnv::lexical_only();
|
||||
let report =
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
|
||||
assert_eq!(report.errors, 0, "lexical-only run must not error");
|
||||
assert_eq!(report.new, 3);
|
||||
|
||||
let lance_dir = std::path::PathBuf::from(&env.config.storage.data_dir)
|
||||
.join("lancedb");
|
||||
if lance_dir.exists() {
|
||||
// If the dir was created (e.g., by an earlier consumer touching
|
||||
// the path), it MUST contain no `.lance` tables.
|
||||
let mut had_lance_table = false;
|
||||
for entry in std::fs::read_dir(&lance_dir).expect("read lance_dir") {
|
||||
let entry = entry.unwrap();
|
||||
if entry
|
||||
.path()
|
||||
.extension()
|
||||
.and_then(|s| s.to_str())
|
||||
== Some("lance")
|
||||
{
|
||||
had_lance_table = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert!(
|
||||
!had_lance_table,
|
||||
"provider=none must not produce any *.lance table under {}",
|
||||
lance_dir.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn list_docs_filters_by_tags_any() {
|
||||
let env = TestEnv::lexical_only();
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
|
||||
let filter = kb_core::DocFilter {
|
||||
tags_any: vec!["python".to_string()],
|
||||
..Default::default()
|
||||
};
|
||||
let docs = kb_app::list_docs_with_config(env.config.clone(), filter).unwrap();
|
||||
assert_eq!(docs.len(), 1, "expected only the python doc: {docs:?}");
|
||||
assert!(docs[0].tags.contains(&"python".to_string()));
|
||||
|
||||
let rust_filter = kb_core::DocFilter {
|
||||
tags_any: vec!["rust".to_string()],
|
||||
..Default::default()
|
||||
};
|
||||
let rust_docs =
|
||||
kb_app::list_docs_with_config(env.config.clone(), rust_filter).unwrap();
|
||||
// intro.md and notes/cargo.md both tag "rust".
|
||||
assert_eq!(rust_docs.len(), 2, "expected 2 rust docs: {rust_docs:?}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inspect_doc_not_found_returns_actionable_error() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let bogus =
|
||||
kb_core::DocumentId("0000000000000000000000000000000000000000000000000000000000000000".to_string());
|
||||
let err = kb_app::inspect_doc_with_config(env.config.clone(), &bogus).unwrap_err();
|
||||
let msg = format!("{err:#}");
|
||||
assert!(
|
||||
msg.contains("not found"),
|
||||
"error must mention not-found: {msg}"
|
||||
);
|
||||
assert!(
|
||||
msg.contains("kb list docs") || msg.contains("list"),
|
||||
"error must hint at `kb list docs`: {msg}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inspect_chunk_not_found_returns_actionable_error() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let bogus = kb_core::ChunkId(
|
||||
"0000000000000000000000000000000000000000000000000000000000000000".to_string(),
|
||||
);
|
||||
let err = kb_app::inspect_chunk_with_config(env.config.clone(), &bogus)
|
||||
.unwrap_err();
|
||||
let msg = format!("{err:#}");
|
||||
assert!(msg.contains("not found"), "got: {msg}");
|
||||
}
|
||||
69
crates/kb-app/tests/search_lexical.rs
Normal file
69
crates/kb-app/tests/search_lexical.rs
Normal file
@@ -0,0 +1,69 @@
|
||||
//! Lexical search integration tests. The vector / hybrid lanes are
|
||||
//! AVX-gated and live in `search_vector.rs` (`#[ignore]`).
|
||||
|
||||
mod common;
|
||||
|
||||
use common::TestEnv;
|
||||
|
||||
fn lexical_query(text: &str) -> kb_core::SearchQuery {
|
||||
kb_core::SearchQuery {
|
||||
text: text.to_string(),
|
||||
mode: kb_core::SearchMode::Lexical,
|
||||
k: 10,
|
||||
filters: kb_core::SearchFilters::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lexical_search_returns_hits_after_ingest() {
|
||||
let env = TestEnv::lexical_only();
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
|
||||
// "Ownership" appears as a heading + paragraph in intro.md and
|
||||
// matches FTS5 default tokenizer easily.
|
||||
let hits =
|
||||
kb_app::search_with_config(env.config.clone(), lexical_query("ownership"))
|
||||
.unwrap();
|
||||
assert!(!hits.is_empty(), "expected ≥1 hit for 'ownership'");
|
||||
|
||||
for h in &hits {
|
||||
// Lexical retriever sets embedding_model=None per spec.
|
||||
assert!(
|
||||
h.embedding_model.is_none(),
|
||||
"lexical-mode hit must have None embedding_model: {h:?}"
|
||||
);
|
||||
assert_eq!(
|
||||
h.retrieval.method,
|
||||
kb_core::SearchMode::Lexical,
|
||||
"method label should be Lexical"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lexical_search_empty_query_returns_empty() {
|
||||
let env = TestEnv::lexical_only();
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
let hits = kb_app::search_with_config(env.config.clone(), lexical_query(" "))
|
||||
.unwrap();
|
||||
assert!(hits.is_empty(), "blank query must short-circuit empty");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn vector_mode_with_provider_none_errors_clearly() {
|
||||
let env = TestEnv::lexical_only();
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
|
||||
let q = kb_core::SearchQuery {
|
||||
text: "ownership".to_string(),
|
||||
mode: kb_core::SearchMode::Vector,
|
||||
k: 10,
|
||||
filters: kb_core::SearchFilters::default(),
|
||||
};
|
||||
let err = kb_app::search_with_config(env.config.clone(), q).unwrap_err();
|
||||
let msg = format!("{err:#}");
|
||||
assert!(
|
||||
msg.contains("embeddings disabled") || msg.contains("disabled"),
|
||||
"error must mention embeddings disabled: {msg}"
|
||||
);
|
||||
}
|
||||
89
crates/kb-app/tests/search_vector.rs
Normal file
89
crates/kb-app/tests/search_vector.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
//! Vector / Hybrid lane — AVX-gated. Marked `#[ignore]` because Lance
|
||||
//! crashes with `SIGILL` on hosts without AVX, and CI lanes that are
|
||||
//! AVX-less should not run these. Local hosts run them via
|
||||
//! `cargo test -p kb-app -- --ignored`.
|
||||
|
||||
mod common;
|
||||
|
||||
use common::TestEnv;
|
||||
|
||||
/// Panic if the host CPU lacks AVX. Mirrors the helper in
|
||||
/// `kb-store-vector/tests/common/mod.rs` and `kb-search` so a
|
||||
/// `--ignored` invocation on a non-AVX host fails loudly with a
|
||||
/// clear message instead of crashing inside Lance's SIMD kernel.
|
||||
fn require_avx_or_panic() {
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
{
|
||||
if !std::is_x86_feature_detected!("avx") {
|
||||
panic!(
|
||||
"kb-app vector integration test requires AVX-capable hardware; \
|
||||
host CPU lacks AVX. Run on an AVX-capable machine."
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// First run downloads ~470MB; expect ~30-60s warm, several minutes cold.
|
||||
#[test]
|
||||
#[ignore = "AVX-required (Lance SIMD kernels)"]
|
||||
fn ingest_then_hybrid_search_returns_hits() {
|
||||
require_avx_or_panic();
|
||||
|
||||
let env = TestEnv::with_embeddings();
|
||||
let report =
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
assert_eq!(report.errors, 0, "no per-file errors: {report:?}");
|
||||
assert_eq!(report.new, 3);
|
||||
|
||||
let q = kb_core::SearchQuery {
|
||||
text: "ownership".to_string(),
|
||||
mode: kb_core::SearchMode::Hybrid,
|
||||
k: 10,
|
||||
filters: kb_core::SearchFilters::default(),
|
||||
};
|
||||
let hits = kb_app::search_with_config(env.config.clone(), q).unwrap();
|
||||
assert!(!hits.is_empty(), "expected hybrid hits for 'ownership'");
|
||||
let methods: Vec<_> = hits.iter().map(|h| h.retrieval.method).collect();
|
||||
assert!(
|
||||
methods.iter().all(|m| *m == kb_core::SearchMode::Hybrid),
|
||||
"every hit must report method=Hybrid: {methods:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// First run downloads ~470MB; expect ~30-60s warm, several minutes cold.
|
||||
#[test]
|
||||
#[ignore = "AVX-required (Lance SIMD kernels)"]
|
||||
fn ingest_then_vector_search_carries_embedding_model() {
|
||||
require_avx_or_panic();
|
||||
|
||||
let env = TestEnv::with_embeddings();
|
||||
let report =
|
||||
kb_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
assert_eq!(report.errors, 0, "no per-file errors: {report:?}");
|
||||
assert_eq!(report.new, 3);
|
||||
|
||||
let q = kb_core::SearchQuery {
|
||||
text: "ownership".to_string(),
|
||||
mode: kb_core::SearchMode::Vector,
|
||||
k: 10,
|
||||
filters: kb_core::SearchFilters::default(),
|
||||
};
|
||||
let hits = kb_app::search_with_config(env.config.clone(), q).unwrap();
|
||||
assert!(!hits.is_empty(), "expected vector hits for 'ownership'");
|
||||
|
||||
// Vector mode dispatches through `VectorRetriever` and MUST stamp
|
||||
// each hit with the configured embedding_model id.
|
||||
let expected = kb_core::EmbeddingModelId(env.config.models.embedding.model.clone());
|
||||
for h in &hits {
|
||||
assert_eq!(
|
||||
h.embedding_model,
|
||||
Some(expected.clone()),
|
||||
"vector-mode hit must carry embedding_model={expected:?}: {h:?}"
|
||||
);
|
||||
assert_eq!(
|
||||
h.retrieval.method,
|
||||
kb_core::SearchMode::Vector,
|
||||
"vector-mode hit must report method=Vector"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,14 @@
|
||||
//! The `jobs` table is the §5.7 schema. JobIds are minted via blake3 over
|
||||
//! `(now, kind, payload)` so two `create` calls in the same millisecond
|
||||
//! still distinguish.
|
||||
//!
|
||||
//! This module also owns the `ingest_runs` writer. `ingest_runs` is the
|
||||
//! §5.7 sibling table that records per-run aggregate counts (`scanned`,
|
||||
//! `new_count`, `updated_count`, …) alongside the `jobs` row that
|
||||
//! `kb jobs` shows. The aggregate-counts surface is intentionally a
|
||||
//! direct INSERT (not a `JobRepo` trait method) because `JobRepo` is
|
||||
//! generic across job kinds, while `ingest_runs` is ingest-specific
|
||||
//! schema with dedicated columns.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use rusqlite::params;
|
||||
@@ -12,6 +20,71 @@ use time::OffsetDateTime;
|
||||
use crate::error::StoreError;
|
||||
use crate::store::SqliteStore;
|
||||
|
||||
/// Aggregate counts for one ingest run. Written into the `ingest_runs`
|
||||
/// table so `kb jobs` (P+) and audit tooling can surface the per-run
|
||||
/// summary without re-walking the workspace.
|
||||
///
|
||||
/// `items_json` carries the per-item detail when the run was NOT
|
||||
/// `summary_only`; it is `None` when the caller asked for a summary
|
||||
/// (the table column is then NULL per design §5.7).
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct IngestRunRow<'a> {
|
||||
pub run_id: &'a str,
|
||||
pub scope_json: &'a str,
|
||||
pub scanned: u32,
|
||||
pub new_count: u32,
|
||||
pub updated_count: u32,
|
||||
pub skipped_count: u32,
|
||||
pub error_count: u32,
|
||||
pub duration_ms: u32,
|
||||
pub started_at: OffsetDateTime,
|
||||
pub finished_at: OffsetDateTime,
|
||||
pub items_json: Option<&'a str>,
|
||||
}
|
||||
|
||||
impl SqliteStore {
|
||||
/// Write one row into `ingest_runs` with the aggregate counts. Mirrors
|
||||
/// the schema in `migrations/V001__init.sql` (§5.7). Called by
|
||||
/// `kb-app::ingest` at the very end of a run, after the per-document
|
||||
/// transactions have committed and the totals are known.
|
||||
///
|
||||
/// `items_json = None` ↔ the column stores SQL `NULL`, which is the
|
||||
/// `summary_only=true` contract.
|
||||
pub fn record_ingest_run(&self, row: &IngestRunRow<'_>) -> Result<()> {
|
||||
let started = row
|
||||
.started_at
|
||||
.format(&time::format_description::well_known::Rfc3339)
|
||||
.context("format ingest_run started_at")?;
|
||||
let finished = row
|
||||
.finished_at
|
||||
.format(&time::format_description::well_known::Rfc3339)
|
||||
.context("format ingest_run finished_at")?;
|
||||
let conn = self.lock_conn();
|
||||
conn.execute(
|
||||
"INSERT INTO ingest_runs (
|
||||
run_id, scope_json, scanned, new_count, updated_count,
|
||||
skipped_count, error_count, duration_ms,
|
||||
started_at, finished_at, items_json
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
params![
|
||||
row.run_id,
|
||||
row.scope_json,
|
||||
row.scanned as i64,
|
||||
row.new_count as i64,
|
||||
row.updated_count as i64,
|
||||
row.skipped_count as i64,
|
||||
row.error_count as i64,
|
||||
row.duration_ms as i64,
|
||||
started,
|
||||
finished,
|
||||
row.items_json,
|
||||
],
|
||||
)
|
||||
.map_err(StoreError::from)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl kb_core::JobRepo for SqliteStore {
|
||||
fn create(
|
||||
&self,
|
||||
|
||||
@@ -29,4 +29,5 @@ mod store;
|
||||
pub use embeddings::EmbeddingRecordRow;
|
||||
pub use error::StoreError;
|
||||
pub use fts::rebuild_chunks_fts;
|
||||
pub use jobs::IngestRunRow;
|
||||
pub use store::SqliteStore;
|
||||
|
||||
Reference in New Issue
Block a user