fix(dogfood): document-centric try_skip_unchanged for twin-file idempotency
Identical-content files at different workspace paths share one assets row (assets.asset_id = blake3 content hash, PRIMARY KEY). The UPSERT `ON CONFLICT(asset_id) DO UPDATE SET workspace_path = excluded` made twin files overwrite each other's workspace_path on every ingest, so `get_asset_by_workspace_path(path1)` returned the OTHER twin's row (or None) — break idempotent unchanged-detection for both files. Fix: switch try_skip_unchanged to document-centric lookup. `documents. workspace_path` is already UNIQUE (V001) and `id_for_doc(path, ...)` includes path, so each twin has its own stable document row. Compare `doc.source_asset_id` with the new asset's checksum instead of going through the assets table. Dogfood (multi-root: kebab-docs + httpx + zod + lodash) showed 27 of 726 docs marked Updated on every idempotent re-ingest — all 27 are twin-file victims (empty `__init__.py` ×3, AGENTS.md ↔ CLAUDE.md same content, duplicate logo PDFs/JPGs). After: re-ingest reports 0 new / 0 updated / 726 unchanged. No schema migration needed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -773,31 +773,19 @@ fn try_skip_unchanged(
|
||||
if force_reingest {
|
||||
return Ok(None);
|
||||
}
|
||||
let existing_asset = match app
|
||||
// Document-centric skip: look up the existing document row by
|
||||
// workspace_path directly. This avoids the twin-file flip-flop
|
||||
// that the old asset-side lookup suffers from — multiple files
|
||||
// with identical content share one `assets` row whose
|
||||
// `workspace_path` is overwritten on every UPSERT, so
|
||||
// `get_asset_by_workspace_path(path1)` could return the OTHER
|
||||
// twin's path (or None) after any ingest of the twin. The
|
||||
// `documents` table has a UNIQUE index on `workspace_path` (V001),
|
||||
// so each twin has its own stable row regardless of asset de-dup.
|
||||
let existing_doc = match app
|
||||
.sqlite
|
||||
.get_asset_by_workspace_path(&asset.workspace_path)
|
||||
.get_document_by_workspace_path(&asset.workspace_path)
|
||||
{
|
||||
Ok(Some(a)) => a,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
target: "kebab-app",
|
||||
path = %asset.workspace_path.0,
|
||||
error = %e,
|
||||
"skip-check: get_asset_by_workspace_path failed; falling through to re-process"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
if existing_asset.checksum != asset.checksum {
|
||||
return Ok(None);
|
||||
}
|
||||
let candidate_doc_id = kebab_core::id_for_doc(
|
||||
&asset.workspace_path,
|
||||
&asset.asset_id,
|
||||
current_parser_version,
|
||||
);
|
||||
let existing_doc = match app.sqlite.get_document(&candidate_doc_id) {
|
||||
Ok(Some(d)) => d,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => {
|
||||
@@ -805,21 +793,37 @@ fn try_skip_unchanged(
|
||||
target: "kebab-app",
|
||||
path = %asset.workspace_path.0,
|
||||
error = %e,
|
||||
"skip-check: get_document failed; falling through to re-process"
|
||||
"skip-check: get_document_by_workspace_path failed; falling through to re-process"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
// 1. Content unchanged: the freshly-computed asset_id (blake3
|
||||
// content hash) must match what this document was ingested from.
|
||||
if existing_doc.source_asset_id != asset.asset_id {
|
||||
return Ok(None);
|
||||
}
|
||||
// 2. Parser unchanged: parser_version is baked into id_for_doc so
|
||||
// a version bump yields a different doc_id and the row above
|
||||
// would have been missing. Checking here explicitly keeps the
|
||||
// logic self-documenting and guards against future id_for_doc
|
||||
// changes.
|
||||
if existing_doc.parser_version != *current_parser_version {
|
||||
return Ok(None);
|
||||
}
|
||||
// 3. Chunker unchanged.
|
||||
let chunker_match = existing_doc.last_chunker_version.as_ref()
|
||||
== Some(current_chunker_version);
|
||||
if !chunker_match {
|
||||
return Ok(None);
|
||||
}
|
||||
// 4. Embedder unchanged.
|
||||
let embedder_match = existing_doc.last_embedding_version.as_ref()
|
||||
== current_embedding_version;
|
||||
if !embedder_match {
|
||||
return Ok(None);
|
||||
}
|
||||
let candidate_doc_id = existing_doc.doc_id.clone();
|
||||
tracing::debug!(
|
||||
target: "kebab-app::ingest",
|
||||
path = %asset.workspace_path.0,
|
||||
|
||||
90
crates/kebab-app/tests/twin_files_idempotent.rs
Normal file
90
crates/kebab-app/tests/twin_files_idempotent.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
//! Regression test for the twin-file idempotency bug.
|
||||
//!
|
||||
//! Identical-content files at different workspace paths share one
|
||||
//! `assets` row (`asset_id` = blake3 content hash, PRIMARY KEY). The
|
||||
//! old UPSERT `ON CONFLICT(asset_id) DO UPDATE SET workspace_path =
|
||||
//! excluded.workspace_path` made each twin overwrite the other's path
|
||||
//! on every ingest, so `get_asset_by_workspace_path(path1)` returned
|
||||
//! None (or the wrong twin) → re-process every time.
|
||||
//!
|
||||
//! Fix: `try_skip_unchanged` now uses `get_document_by_workspace_path`
|
||||
//! instead. `documents.workspace_path` is UNIQUE (V001) so each twin
|
||||
//! has its own stable document row.
|
||||
//!
|
||||
//! Assertion contract:
|
||||
//! 1st ingest → 2 New (one per twin)
|
||||
//! 2nd ingest → 0 New, 0 Updated, 2 Unchanged
|
||||
|
||||
mod common;
|
||||
|
||||
use common::TestEnv;
|
||||
use kebab_app::ingest_with_config;
|
||||
use kebab_core::IngestItemKind;
|
||||
|
||||
#[test]
|
||||
fn twin_files_second_ingest_is_unchanged() {
|
||||
let env = TestEnv::lexical_only();
|
||||
|
||||
// Write two files with identical content at different paths.
|
||||
let pkg_a = env.workspace_root.join("pkg_a");
|
||||
let pkg_b = env.workspace_root.join("pkg_b");
|
||||
std::fs::create_dir_all(&pkg_a).unwrap();
|
||||
std::fs::create_dir_all(&pkg_b).unwrap();
|
||||
|
||||
let content = b"# shared\nThis content is identical in both files.\n";
|
||||
std::fs::write(pkg_a.join("__init__.py"), content).unwrap();
|
||||
std::fs::write(pkg_b.join("__init__.py"), content).unwrap();
|
||||
|
||||
// First ingest — both files must be New.
|
||||
let first = ingest_with_config(env.config.clone(), env.scope(), false)
|
||||
.expect("first ingest must succeed");
|
||||
assert_eq!(first.errors, 0, "first ingest: no errors; report={first:?}");
|
||||
|
||||
let items = first.items.as_ref().expect("items must be present");
|
||||
let twin_items: Vec<_> = items
|
||||
.iter()
|
||||
.filter(|i| {
|
||||
i.doc_path.0.ends_with("__init__.py")
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(
|
||||
twin_items.len(),
|
||||
2,
|
||||
"first ingest: expected exactly 2 __init__.py items; items={items:?}"
|
||||
);
|
||||
for item in &twin_items {
|
||||
assert_eq!(
|
||||
item.kind,
|
||||
IngestItemKind::New,
|
||||
"first ingest: each twin must be New; item={item:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Second ingest — same files, same content → both must be Unchanged.
|
||||
let second = ingest_with_config(env.config.clone(), env.scope(), false)
|
||||
.expect("second ingest must succeed");
|
||||
assert_eq!(second.errors, 0, "second ingest: no errors; report={second:?}");
|
||||
assert_eq!(second.new, 0, "second ingest: no new docs; report={second:?}");
|
||||
assert_eq!(
|
||||
second.updated, 0,
|
||||
"second ingest: no updated docs (twin-file bug would set this to 2); report={second:?}"
|
||||
);
|
||||
|
||||
let second_items = second.items.as_ref().expect("items must be present");
|
||||
let twin_items2: Vec<_> = second_items
|
||||
.iter()
|
||||
.filter(|i| i.doc_path.0.ends_with("__init__.py"))
|
||||
.collect();
|
||||
assert_eq!(
|
||||
twin_items2.len(),
|
||||
2,
|
||||
"second ingest: expected exactly 2 __init__.py items; items={second_items:?}"
|
||||
);
|
||||
for item in &twin_items2 {
|
||||
assert_eq!(
|
||||
item.kind,
|
||||
IngestItemKind::Unchanged,
|
||||
"second ingest: each twin must be Unchanged; item={item:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -169,6 +169,20 @@ pub trait DocumentStore {
|
||||
&self,
|
||||
path: &WorkspacePath,
|
||||
) -> anyhow::Result<Option<RawAsset>>;
|
||||
|
||||
/// Look up a document row by its workspace path. Used by the
|
||||
/// document-centric skip path in `try_skip_unchanged` to avoid the
|
||||
/// twin-file flip-flop that the asset-side lookup suffers from
|
||||
/// (multiple files with identical content share one `assets` row
|
||||
/// whose `workspace_path` is overwritten on every UPSERT, so
|
||||
/// `get_asset_by_workspace_path` returns the wrong twin's path).
|
||||
///
|
||||
/// `documents.workspace_path` is UNIQUE (V001), so each twin has
|
||||
/// its own stable document row regardless of the asset de-dup.
|
||||
fn get_document_by_workspace_path(
|
||||
&self,
|
||||
path: &WorkspacePath,
|
||||
) -> anyhow::Result<Option<CanonicalDocument>>;
|
||||
}
|
||||
|
||||
pub trait VectorStore {
|
||||
|
||||
@@ -286,6 +286,72 @@ impl kebab_core::DocumentStore for SqliteStore {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_document_by_workspace_path(
|
||||
&self,
|
||||
path: &kebab_core::WorkspacePath,
|
||||
) -> Result<Option<kebab_core::CanonicalDocument>> {
|
||||
let conn = self.lock_conn();
|
||||
let row: Option<DocumentRow> = conn
|
||||
.query_row(
|
||||
"SELECT
|
||||
doc_id, asset_id, workspace_path, title, lang,
|
||||
source_type, trust_level, parser_version,
|
||||
doc_version, schema_version, metadata_json,
|
||||
provenance_json, created_at, updated_at,
|
||||
last_chunker_version, last_embedding_version
|
||||
FROM documents WHERE workspace_path = ?",
|
||||
params![path.0],
|
||||
document_row_from_sql,
|
||||
)
|
||||
.map(Some)
|
||||
.or_else(rows_optional)
|
||||
.map_err(StoreError::from)?;
|
||||
let Some(row) = row else { return Ok(None) };
|
||||
|
||||
let doc_id = kebab_core::DocumentId(row.doc_id.clone());
|
||||
let mut blocks_stmt = conn
|
||||
.prepare(
|
||||
"SELECT payload_json FROM blocks
|
||||
WHERE doc_id = ? ORDER BY ordinal ASC",
|
||||
)
|
||||
.map_err(StoreError::from)?;
|
||||
let block_rows = blocks_stmt
|
||||
.query_map(params![row.doc_id], |r| {
|
||||
let payload_json: String = r.get(0)?;
|
||||
Ok(payload_json)
|
||||
})
|
||||
.map_err(StoreError::from)?;
|
||||
let mut blocks: Vec<kebab_core::Block> = Vec::new();
|
||||
for block_row in block_rows {
|
||||
let payload_json = block_row.map_err(StoreError::from)?;
|
||||
let block: kebab_core::Block = serde_json::from_str(&payload_json)
|
||||
.context("deserialize block payload_json")?;
|
||||
blocks.push(block);
|
||||
}
|
||||
|
||||
let metadata: kebab_core::Metadata = serde_json::from_str(&row.metadata_json)
|
||||
.context("deserialize metadata_json")?;
|
||||
let provenance: kebab_core::Provenance =
|
||||
serde_json::from_str(&row.provenance_json)
|
||||
.context("deserialize provenance_json")?;
|
||||
|
||||
Ok(Some(kebab_core::CanonicalDocument {
|
||||
doc_id,
|
||||
source_asset_id: kebab_core::AssetId(row.asset_id),
|
||||
workspace_path: kebab_core::WorkspacePath(row.workspace_path),
|
||||
title: row.title.unwrap_or_default(),
|
||||
lang: kebab_core::Lang(row.lang.unwrap_or_default()),
|
||||
blocks,
|
||||
metadata,
|
||||
provenance,
|
||||
parser_version: kebab_core::ParserVersion(row.parser_version),
|
||||
schema_version: row.schema_version as u32,
|
||||
doc_version: row.doc_version as u32,
|
||||
last_chunker_version: row.last_chunker_version.map(kebab_core::ChunkerVersion),
|
||||
last_embedding_version: row.last_embedding_version.map(kebab_core::EmbeddingVersion),
|
||||
}))
|
||||
}
|
||||
|
||||
fn list_documents(
|
||||
&self,
|
||||
filter: &kebab_core::DocFilter,
|
||||
|
||||
Reference in New Issue
Block a user