feat(kebab-app): p9-fb-23 task 7 — early-skip Unchanged path in ingest

Adds the per-asset incremental-ingest skip block to all three flows
(markdown / image / pdf). When `IngestOpts::force_reingest = false`
AND the asset's blake3 checksum + parser/chunker/embedding versions
all match the existing DB record, ingest emits
`AssetFinished { result: Unchanged }`, bumps `aggregate.unchanged`,
and skips parse / chunk / embed / vector upsert entirely.

Shared `try_skip_unchanged` helper performs the four checks; per-flow
callers supply the active parser_version + chunker_version + optional
embedding_version. `force_reingest = true` bypasses the skip path so
`incremental_ingest::force_reingest_bypasses_skip` still sees `Updated`.

Tests:
- new `incremental_ingest.rs` covers both paths.
- existing `ingest_idempotent_on_second_run` /
  `re_ingest_image_produces_*` / `re_ingest_identical_pdf_produces_*`
  updated to assert `Unchanged` on identical-bytes re-ingest (the
  pre-task behaviour was `Updated`).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-04 18:12:47 +00:00
parent 4874304d5d
commit 0e6d6073e7
5 changed files with 260 additions and 23 deletions

View File

@@ -277,10 +277,7 @@ pub fn ingest_with_config_opts(
.map(|c| c.load(std::sync::atomic::Ordering::Relaxed))
.unwrap_or(false)
};
// p9-fb-23: opts.force_reingest is consumed by Task 7's skip-detection
// block. For Task 6 alone, the field is plumbed but unused — silence
// the warning until Task 7 wires it.
let _ = opts.force_reingest;
let force_reingest = opts.force_reingest;
let started_instant = std::time::Instant::now();
let app = App::open_with_config(config)?;
@@ -415,6 +412,7 @@ pub fn ingest_with_config_opts(
vector_store.as_ref(),
&existing_doc_ids,
&image_pipeline,
force_reingest,
);
let item = match item {
@@ -716,6 +714,105 @@ struct ImagePipeline<'a> {
caption_llm: Option<&'a dyn LanguageModel>,
}
/// p9-fb-23 task 7: incremental-ingest early-skip predicate. Shared
/// across the markdown / image / PDF per-asset flows. Returns
/// `Some(IngestItem { kind: Unchanged, .. })` when ALL FOUR conditions
/// hold (per design §9 cascade rule):
///
/// 1. `force_reingest == false` — caller hasn't asked to bypass skip.
/// 2. The freshly-scanned asset's blake3 checksum equals what the
/// existing `assets` row stores at the same `workspace_path`.
/// 3. The doc keyed on `(workspace_path, asset_id, current_parser_version)`
/// exists. If the parser_version changed, `id_for_doc` produces a
/// different `doc_id` so the lookup misses → no skip → re-process.
/// 4. The existing doc's stamped `last_chunker_version` AND
/// `last_embedding_version` match the values the caller is about
/// to use (`Some(v) == Some(v)` and `None == None` — see design
/// doc for the `None == None` rule when no embedder is configured).
///
/// Returns `Ok(None)` (proceed with full re-process) when any check
/// fails or any DB read errors out — the skip path is opportunistic;
/// a missed skip is correct (just slower), a wrong skip would corrupt
/// the index.
fn try_skip_unchanged(
app: &App,
asset: &RawAsset,
current_parser_version: &ParserVersion,
current_chunker_version: &ChunkerVersion,
current_embedding_version: Option<&kebab_core::EmbeddingVersion>,
force_reingest: bool,
) -> anyhow::Result<Option<kebab_core::IngestItem>> {
if force_reingest {
return Ok(None);
}
let existing_asset = match app
.sqlite
.get_asset_by_workspace_path(&asset.workspace_path)
{
Ok(Some(a)) => a,
Ok(None) => return Ok(None),
Err(e) => {
tracing::debug!(
target: "kebab-app",
path = %asset.workspace_path.0,
error = %e,
"skip-check: get_asset_by_workspace_path failed; falling through to re-process"
);
return Ok(None);
}
};
if existing_asset.checksum != asset.checksum {
return Ok(None);
}
let candidate_doc_id = kebab_core::id_for_doc(
&asset.workspace_path,
&asset.asset_id,
current_parser_version,
);
let existing_doc = match app.sqlite.get_document(&candidate_doc_id) {
Ok(Some(d)) => d,
Ok(None) => return Ok(None),
Err(e) => {
tracing::debug!(
target: "kebab-app",
path = %asset.workspace_path.0,
error = %e,
"skip-check: get_document failed; falling through to re-process"
);
return Ok(None);
}
};
let chunker_match = existing_doc.last_chunker_version.as_ref()
== Some(current_chunker_version);
if !chunker_match {
return Ok(None);
}
let embedder_match = existing_doc.last_embedding_version.as_ref()
== current_embedding_version;
if !embedder_match {
return Ok(None);
}
tracing::debug!(
target: "kebab-app::ingest",
path = %asset.workspace_path.0,
doc_id = %candidate_doc_id.0,
"skip-unchanged: checksum + parser/chunker/embedding versions match"
);
Ok(Some(kebab_core::IngestItem {
kind: kebab_core::IngestItemKind::Unchanged,
doc_id: Some(candidate_doc_id),
doc_path: asset.workspace_path.clone(),
asset_id: Some(asset.asset_id.clone()),
byte_len: Some(asset.byte_len),
block_count: u32::try_from(existing_doc.blocks.len()).ok(),
chunk_count: None,
parser_version: Some(existing_doc.parser_version.clone()),
chunker_version: existing_doc.last_chunker_version.clone(),
warnings: Vec::new(),
error: None,
}))
}
/// Process a single asset: read bytes, parse, normalize, chunk,
/// persist, embed. Per-asset failures bubble up to the caller for
/// labelling as `IngestItemKind::Error` — they do NOT abort the
@@ -730,6 +827,7 @@ fn ingest_one_asset(
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
existing_doc_ids: &std::collections::HashSet<String>,
image_pipeline: &ImagePipeline<'_>,
force_reingest: bool,
) -> anyhow::Result<kebab_core::IngestItem> {
tracing::debug!(
target: "kebab-app::ingest",
@@ -753,6 +851,7 @@ fn ingest_one_asset(
vector_store,
existing_doc_ids,
image_pipeline,
force_reingest,
);
}
MediaType::Pdf => {
@@ -763,6 +862,7 @@ fn ingest_one_asset(
embedder,
vector_store,
existing_doc_ids,
force_reingest,
);
}
_ => {
@@ -803,6 +903,23 @@ fn ingest_one_asset(
}
};
// p9-fb-23 task 7: incremental-ingest early-skip. When force_reingest
// is false AND the on-disk asset's checksum + parser_version +
// last_chunker_version + last_embedding_version all match the existing
// DB record, this asset doesn't need to be re-parsed / re-chunked /
// re-embedded. Return Unchanged so the caller bumps `aggregate.unchanged`
// and the AssetFinished progress event reflects the skip.
if let Some(item) = try_skip_unchanged(
app,
asset,
parser_version,
&MdHeadingV1Chunker.chunker_version(),
embedder.map(|e| e.model_version()).as_ref(),
force_reingest,
)? {
return Ok(item);
}
let bytes = std::fs::read(&path)
.with_context(|| format!("read asset bytes from {}", path.display()))?;
@@ -954,6 +1071,7 @@ fn ingest_one_image_asset(
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
existing_doc_ids: &std::collections::HashSet<String>,
image_pipeline: &ImagePipeline<'_>,
force_reingest: bool,
) -> anyhow::Result<kebab_core::IngestItem> {
let image_extractor = image_pipeline.extractor;
let ocr_engine = image_pipeline.ocr_engine;
@@ -978,6 +1096,23 @@ fn ingest_one_image_asset(
});
}
};
// p9-fb-23 task 7: incremental-ingest early-skip for the image flow.
// Image docs use the `image-meta-v1` parser_version + the same
// MdHeadingV1Chunker as the markdown flow (single-block doc). The
// embedding-version check matches the markdown path: when the
// active embedder's model_version equals what was stamped on the
// existing doc, the asset is Unchanged.
let image_parser_version = ParserVersion(kebab_parse_image::PARSER_VERSION.to_string());
if let Some(item) = try_skip_unchanged(
app,
asset,
&image_parser_version,
&MdHeadingV1Chunker.chunker_version(),
embedder.map(|e| e.model_version()).as_ref(),
force_reingest,
)? {
return Ok(item);
}
let bytes = std::fs::read(&path)
.with_context(|| format!("read image asset bytes from {}", path.display()))?;
@@ -1274,6 +1409,7 @@ fn ingest_one_pdf_asset(
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
existing_doc_ids: &std::collections::HashSet<String>,
force_reingest: bool,
) -> anyhow::Result<kebab_core::IngestItem> {
let path = match &asset.source_uri {
SourceUri::File(p) => p.clone(),
@@ -1295,6 +1431,20 @@ fn ingest_one_pdf_asset(
});
}
};
// p9-fb-23 task 7: incremental-ingest early-skip for the PDF flow.
// PDF docs use `pdf-text-v1` as the parser_version and `PdfPageV1Chunker`
// as the chunker — both pinned per-medium today (no config knob).
let pdf_parser_version = ParserVersion(kebab_parse_pdf::PARSER_VERSION.to_string());
if let Some(item) = try_skip_unchanged(
app,
asset,
&pdf_parser_version,
&PdfPageV1Chunker.chunker_version(),
embedder.map(|e| e.model_version()).as_ref(),
force_reingest,
)? {
return Ok(item);
}
let bytes = std::fs::read(&path)
.with_context(|| format!("read PDF asset bytes from {}", path.display()))?;

View File

@@ -363,10 +363,14 @@ async fn garbage_png_increments_errors_counter_exactly_once() {
// ── 6. Determinism: re-ingest produces identical doc_id / chunk_id ───────
/// Idempotency contract — running the same ingest twice should mark
/// the asset Updated on the second run with byte-identical IDs.
/// Idempotency contract — running the same ingest twice keeps the
/// doc_id stable. p9-fb-23 task 7 introduced the early-skip path for
/// incremental ingest: when checksum + parser/chunker/embedding versions
/// all match, the second run reports `Unchanged` rather than `Updated`.
/// The pre-p9-fb-23 contract was `Updated` — that path is still exercised
/// by `force_reingest = true` tests in `incremental_ingest.rs`.
#[tokio::test]
async fn re_ingest_image_produces_updated_with_same_doc_id() {
async fn re_ingest_image_produces_unchanged_with_same_doc_id() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/generate"))
@@ -416,6 +420,6 @@ async fn re_ingest_image_produces_updated_with_same_doc_id() {
.iter()
.find(|i| i.doc_path.0.ends_with("diagram.png"))
.unwrap();
assert_eq!(img2.kind, kebab_core::IngestItemKind::Updated);
assert_eq!(img2.kind, kebab_core::IngestItemKind::Unchanged);
assert_eq!(img2.doc_id.as_ref().unwrap(), &id1);
}

View File

@@ -0,0 +1,82 @@
//! p9-fb-23: incremental ingest — skip parse/chunk/embed when nothing
//! has changed.
//!
//! Task 7 contract: when `IngestOpts::force_reingest == false` and the
//! per-asset (checksum, parser_version, chunker_version, embedding_version)
//! tuple matches the existing DB record, ingest emits
//! `IngestEvent::AssetFinished { result: Unchanged }` and skips
//! parse / chunk / embed / vector upsert. `force_reingest = true`
//! bypasses the skip path and re-processes every asset as `Updated`.
mod common;
use common::TestEnv;
use kebab_app::{IngestOpts, ingest_with_config, ingest_with_config_opts};
#[test]
fn second_ingest_of_unchanged_corpus_marks_all_unchanged() {
let env = TestEnv::lexical_only();
// First ingest — populates the DB. Use the legacy entry so the
// assertions cover the "previously ingested" set without needing
// IngestOpts::default() to behave identically.
let first =
ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
assert_eq!(first.errors, 0, "first ingest must not error: {first:?}");
assert!(first.new >= 1, "first ingest must create new docs: {first:?}");
assert_eq!(first.unchanged, 0, "first ingest cannot have unchanged: {first:?}");
let scanned = first.scanned;
// Second ingest — same files, same versions → all assets must be
// labelled Unchanged (no parse / chunk / embed re-work).
let second = ingest_with_config_opts(
env.config.clone(),
env.scope(),
false,
IngestOpts::default(),
)
.unwrap();
assert_eq!(second.scanned, scanned, "second scanned matches first: {second:?}");
assert_eq!(second.new, 0, "no new docs on re-ingest: {second:?}");
assert_eq!(second.updated, 0, "nothing should be marked updated: {second:?}");
assert_eq!(
second.unchanged, scanned,
"every doc must be Unchanged: {second:?}"
);
assert_eq!(second.errors, 0, "no errors expected: {second:?}");
}
#[test]
fn force_reingest_bypasses_skip() {
let env = TestEnv::lexical_only();
let first =
ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
assert_eq!(first.errors, 0, "first ingest must not error: {first:?}");
assert!(first.new >= 1, "first ingest must create new docs: {first:?}");
let scanned = first.scanned;
let second = ingest_with_config_opts(
env.config.clone(),
env.scope(),
false,
IngestOpts {
force_reingest: true,
..Default::default()
},
)
.unwrap();
assert_eq!(second.scanned, scanned);
assert_eq!(
second.unchanged, 0,
"force_reingest must bypass skip: {second:?}"
);
assert_eq!(
second.updated, scanned,
"every doc must be re-processed as Updated: {second:?}"
);
assert_eq!(second.new, 0, "no new docs on force reingest: {second:?}");
assert_eq!(second.errors, 0, "no errors expected: {second:?}");
}

View File

@@ -52,10 +52,15 @@ fn ingest_idempotent_on_second_run() {
let r2 =
kebab_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
// Same files re-ingested — labelled Updated, not duplicated.
// Same files re-ingested — p9-fb-23 task 7 introduced the early-skip
// path: when checksum + parser/chunker/embedding versions all match,
// the second run reports `Unchanged` rather than `Updated`. Pre-p9-fb-23
// returned `Updated` here. The `force_reingest=true` path still returns
// `Updated` and is exercised by `incremental_ingest.rs`.
assert_eq!(r2.scanned, 3, "second scan: {r2:?}");
assert_eq!(r2.new, 0, "second run new should be 0: {r2:?}");
assert_eq!(r2.updated, 3, "second run updated: {r2:?}");
assert_eq!(r2.updated, 0, "second run updated: {r2:?}");
assert_eq!(r2.unchanged, 3, "second run unchanged: {r2:?}");
// list_docs still has 3 docs (no duplicates).
let docs = kebab_app::list_docs_with_config(

View File

@@ -187,10 +187,15 @@ fn ingest_3_page_pdf_produces_one_doc_and_per_page_chunks() {
}
}
/// Re-ingest the SAME PDF bytes → identical doc_id, identical chunk_id
/// set, item kind = Updated. P1 idempotency contract.
/// Re-ingest the SAME PDF bytes → identical doc_id, item kind =
/// Unchanged. p9-fb-23 task 7 introduced the early-skip path: when
/// checksum + parser/chunker/embedding versions all match, the second
/// run reports `Unchanged` rather than `Updated` and skips parse /
/// chunk / embed entirely. The pre-p9-fb-23 contract was `Updated`;
/// the `force_reingest=true` path still exercises that branch (see
/// `incremental_ingest.rs`).
#[test]
fn re_ingest_identical_pdf_produces_updated_with_same_doc_id() {
fn re_ingest_identical_pdf_produces_unchanged_with_same_doc_id() {
let env = TestEnv::lexical_only();
let bytes = build_text_pdf(&[Some("page 1"), Some("page 2")]);
write_pdf(&env.workspace_root, "stable.pdf", &bytes);
@@ -216,17 +221,8 @@ fn re_ingest_identical_pdf_produces_updated_with_same_doc_id() {
.into_iter()
.find(|i| i.doc_path.0.ends_with("stable.pdf"))
.unwrap();
assert_eq!(item2.kind, IngestItemKind::Updated);
assert_eq!(item2.kind, IngestItemKind::Unchanged);
assert_eq!(item2.doc_id, item1.doc_id);
// P1 idempotency contract: identical bytes → identical chunk set.
// Comparing `chunk_count` as a proxy (full chunk_id set comparison
// would need direct sqlite access; the per-chunk #c{char_start}
// hash variant in pdf-page-v1 is already tested for stability in
// `kebab-chunk::pdf_page_v1::deterministic_chunk_ids_1000`).
assert_eq!(
item1.chunk_count, item2.chunk_count,
"identical bytes must produce identical chunk count"
);
}
/// Edit a PDF (replace bytes) → different blake3 → different asset_id