fix(dogfood): document-centric try_skip_unchanged for twin-file idempotency #146
@@ -748,15 +748,18 @@ struct ImagePipeline<'a> {
|
||||
/// hold (per design §9 cascade rule):
|
||||
///
|
||||
/// 1. `force_reingest == false` — caller hasn't asked to bypass skip.
|
||||
/// 2. The freshly-scanned asset's blake3 checksum equals what the
|
||||
/// existing `assets` row stores at the same `workspace_path`.
|
||||
/// 3. The doc keyed on `(workspace_path, asset_id, current_parser_version)`
|
||||
/// exists. If the parser_version changed, `id_for_doc` produces a
|
||||
/// different `doc_id` so the lookup misses → no skip → re-process.
|
||||
/// 4. The existing doc's stamped `last_chunker_version` AND
|
||||
/// `last_embedding_version` match the values the caller is about
|
||||
/// to use (`Some(v) == Some(v)` and `None == None` — see design
|
||||
/// doc for the `None == None` rule when no embedder is configured).
|
||||
/// 2. A document already exists at this `workspace_path`
|
||||
/// (`get_document_by_workspace_path`). The lookup is document-side, not
|
||||
/// asset-side, so twin files (identical content at different paths) each
|
||||
/// hit their own stable doc row — `documents.workspace_path` is UNIQUE
|
||||
/// while `assets` may dedupe content into a single row with a flip-flop
|
||||
/// `workspace_path` column (dogfood bug #4, see `tasks/HOTFIXES.md`).
|
||||
/// 3. The existing doc's `source_asset_id` equals the freshly-scanned
|
||||
/// asset's blake3 checksum (content unchanged).
|
||||
/// 4. The existing doc's `parser_version` matches the current extractor's
|
||||
/// `parser_version` (extractor not upgraded). Combined with `chunker_version`
|
||||
/// and `last_embedding_version` checks immediately below — full cascade
|
||||
/// per design §9.
|
||||
///
|
||||
/// Returns `Ok(None)` (proceed with full re-process) when any check
|
||||
/// fails or any DB read errors out — the skip path is opportunistic;
|
||||
@@ -773,31 +776,19 @@ fn try_skip_unchanged(
|
||||
if force_reingest {
|
||||
return Ok(None);
|
||||
}
|
||||
let existing_asset = match app
|
||||
// Document-centric skip: look up the existing document row by
|
||||
// workspace_path directly. This avoids the twin-file flip-flop
|
||||
// that the old asset-side lookup suffers from — multiple files
|
||||
// with identical content share one `assets` row whose
|
||||
// `workspace_path` is overwritten on every UPSERT, so
|
||||
// `get_asset_by_workspace_path(path1)` could return the OTHER
|
||||
// twin's path (or None) after any ingest of the twin. The
|
||||
// `documents` table has a UNIQUE index on `workspace_path` (V001),
|
||||
// so each twin has its own stable row regardless of asset de-dup.
|
||||
let existing_doc = match app
|
||||
.sqlite
|
||||
.get_asset_by_workspace_path(&asset.workspace_path)
|
||||
.get_document_by_workspace_path(&asset.workspace_path)
|
||||
{
|
||||
Ok(Some(a)) => a,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
target: "kebab-app",
|
||||
path = %asset.workspace_path.0,
|
||||
error = %e,
|
||||
"skip-check: get_asset_by_workspace_path failed; falling through to re-process"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
if existing_asset.checksum != asset.checksum {
|
||||
return Ok(None);
|
||||
}
|
||||
let candidate_doc_id = kebab_core::id_for_doc(
|
||||
&asset.workspace_path,
|
||||
&asset.asset_id,
|
||||
current_parser_version,
|
||||
);
|
||||
let existing_doc = match app.sqlite.get_document(&candidate_doc_id) {
|
||||
Ok(Some(d)) => d,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => {
|
||||
@@ -805,21 +796,37 @@ fn try_skip_unchanged(
|
||||
target: "kebab-app",
|
||||
path = %asset.workspace_path.0,
|
||||
error = %e,
|
||||
"skip-check: get_document failed; falling through to re-process"
|
||||
"skip-check: get_document_by_workspace_path failed; falling through to re-process"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
// 1. Content unchanged: the freshly-computed asset_id (blake3
|
||||
// content hash) must match what this document was ingested from.
|
||||
if existing_doc.source_asset_id != asset.asset_id {
|
||||
return Ok(None);
|
||||
}
|
||||
// 2. Parser unchanged: parser_version is baked into id_for_doc so
|
||||
// a version bump yields a different doc_id and the row above
|
||||
// would have been missing. Checking here explicitly keeps the
|
||||
// logic self-documenting and guards against future id_for_doc
|
||||
// changes.
|
||||
if existing_doc.parser_version != *current_parser_version {
|
||||
return Ok(None);
|
||||
}
|
||||
// 3. Chunker unchanged.
|
||||
let chunker_match = existing_doc.last_chunker_version.as_ref()
|
||||
== Some(current_chunker_version);
|
||||
if !chunker_match {
|
||||
return Ok(None);
|
||||
}
|
||||
// 4. Embedder unchanged.
|
||||
let embedder_match = existing_doc.last_embedding_version.as_ref()
|
||||
== current_embedding_version;
|
||||
if !embedder_match {
|
||||
return Ok(None);
|
||||
}
|
||||
let candidate_doc_id = existing_doc.doc_id.clone();
|
||||
tracing::debug!(
|
||||
target: "kebab-app::ingest",
|
||||
path = %asset.workspace_path.0,
|
||||
|
||||
90
crates/kebab-app/tests/twin_files_idempotent.rs
Normal file
90
crates/kebab-app/tests/twin_files_idempotent.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
//! Regression test for the twin-file idempotency bug.
|
||||
//!
|
||||
//! Identical-content files at different workspace paths share one
|
||||
//! `assets` row (`asset_id` = blake3 content hash, PRIMARY KEY). The
|
||||
//! old UPSERT `ON CONFLICT(asset_id) DO UPDATE SET workspace_path =
|
||||
//! excluded.workspace_path` made each twin overwrite the other's path
|
||||
//! on every ingest, so `get_asset_by_workspace_path(path1)` returned
|
||||
//! None (or the wrong twin) → re-process every time.
|
||||
//!
|
||||
//! Fix: `try_skip_unchanged` now uses `get_document_by_workspace_path`
|
||||
//! instead. `documents.workspace_path` is UNIQUE (V001) so each twin
|
||||
//! has its own stable document row.
|
||||
//!
|
||||
//! Assertion contract:
|
||||
//! 1st ingest → 2 New (one per twin)
|
||||
//! 2nd ingest → 0 New, 0 Updated, 2 Unchanged
|
||||
|
||||
mod common;
|
||||
|
||||
use common::TestEnv;
|
||||
use kebab_app::ingest_with_config;
|
||||
use kebab_core::IngestItemKind;
|
||||
|
||||
#[test]
|
||||
fn twin_files_second_ingest_is_unchanged() {
|
||||
let env = TestEnv::lexical_only();
|
||||
|
||||
// Write two files with identical content at different paths.
|
||||
let pkg_a = env.workspace_root.join("pkg_a");
|
||||
let pkg_b = env.workspace_root.join("pkg_b");
|
||||
std::fs::create_dir_all(&pkg_a).unwrap();
|
||||
std::fs::create_dir_all(&pkg_b).unwrap();
|
||||
|
||||
let content = b"# shared\nThis content is identical in both files.\n";
|
||||
std::fs::write(pkg_a.join("__init__.py"), content).unwrap();
|
||||
std::fs::write(pkg_b.join("__init__.py"), content).unwrap();
|
||||
|
||||
// First ingest — both files must be New.
|
||||
let first = ingest_with_config(env.config.clone(), env.scope(), false)
|
||||
.expect("first ingest must succeed");
|
||||
assert_eq!(first.errors, 0, "first ingest: no errors; report={first:?}");
|
||||
|
||||
let items = first.items.as_ref().expect("items must be present");
|
||||
let twin_items: Vec<_> = items
|
||||
.iter()
|
||||
.filter(|i| {
|
||||
i.doc_path.0.ends_with("__init__.py")
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(
|
||||
twin_items.len(),
|
||||
2,
|
||||
"first ingest: expected exactly 2 __init__.py items; items={items:?}"
|
||||
);
|
||||
for item in &twin_items {
|
||||
assert_eq!(
|
||||
item.kind,
|
||||
IngestItemKind::New,
|
||||
"first ingest: each twin must be New; item={item:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Second ingest — same files, same content → both must be Unchanged.
|
||||
let second = ingest_with_config(env.config.clone(), env.scope(), false)
|
||||
.expect("second ingest must succeed");
|
||||
assert_eq!(second.errors, 0, "second ingest: no errors; report={second:?}");
|
||||
assert_eq!(second.new, 0, "second ingest: no new docs; report={second:?}");
|
||||
assert_eq!(
|
||||
second.updated, 0,
|
||||
"second ingest: no updated docs (twin-file bug would set this to 2); report={second:?}"
|
||||
);
|
||||
|
||||
let second_items = second.items.as_ref().expect("items must be present");
|
||||
let twin_items2: Vec<_> = second_items
|
||||
.iter()
|
||||
.filter(|i| i.doc_path.0.ends_with("__init__.py"))
|
||||
.collect();
|
||||
assert_eq!(
|
||||
twin_items2.len(),
|
||||
2,
|
||||
"second ingest: expected exactly 2 __init__.py items; items={second_items:?}"
|
||||
);
|
||||
for item in &twin_items2 {
|
||||
assert_eq!(
|
||||
item.kind,
|
||||
IngestItemKind::Unchanged,
|
||||
"second ingest: each twin must be Unchanged; item={item:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -169,6 +169,20 @@ pub trait DocumentStore {
|
||||
&self,
|
||||
path: &WorkspacePath,
|
||||
) -> anyhow::Result<Option<RawAsset>>;
|
||||
|
||||
/// Look up a document row by its workspace path. Used by the
|
||||
/// document-centric skip path in `try_skip_unchanged` to avoid the
|
||||
/// twin-file flip-flop that the asset-side lookup suffers from
|
||||
/// (multiple files with identical content share one `assets` row
|
||||
/// whose `workspace_path` is overwritten on every UPSERT, so
|
||||
/// `get_asset_by_workspace_path` returns the wrong twin's path).
|
||||
///
|
||||
/// `documents.workspace_path` is UNIQUE (V001), so each twin has
|
||||
/// its own stable document row regardless of the asset de-dup.
|
||||
fn get_document_by_workspace_path(
|
||||
&self,
|
||||
path: &WorkspacePath,
|
||||
) -> anyhow::Result<Option<CanonicalDocument>>;
|
||||
}
|
||||
|
||||
pub trait VectorStore {
|
||||
|
||||
@@ -286,6 +286,72 @@ impl kebab_core::DocumentStore for SqliteStore {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_document_by_workspace_path(
|
||||
&self,
|
||||
path: &kebab_core::WorkspacePath,
|
||||
) -> Result<Option<kebab_core::CanonicalDocument>> {
|
||||
let conn = self.lock_conn();
|
||||
let row: Option<DocumentRow> = conn
|
||||
.query_row(
|
||||
"SELECT
|
||||
doc_id, asset_id, workspace_path, title, lang,
|
||||
source_type, trust_level, parser_version,
|
||||
doc_version, schema_version, metadata_json,
|
||||
provenance_json, created_at, updated_at,
|
||||
last_chunker_version, last_embedding_version
|
||||
FROM documents WHERE workspace_path = ?",
|
||||
params![path.0],
|
||||
document_row_from_sql,
|
||||
)
|
||||
.map(Some)
|
||||
.or_else(rows_optional)
|
||||
.map_err(StoreError::from)?;
|
||||
let Some(row) = row else { return Ok(None) };
|
||||
|
||||
let doc_id = kebab_core::DocumentId(row.doc_id.clone());
|
||||
|
|
||||
let mut blocks_stmt = conn
|
||||
.prepare(
|
||||
"SELECT payload_json FROM blocks
|
||||
WHERE doc_id = ? ORDER BY ordinal ASC",
|
||||
)
|
||||
.map_err(StoreError::from)?;
|
||||
let block_rows = blocks_stmt
|
||||
.query_map(params![row.doc_id], |r| {
|
||||
let payload_json: String = r.get(0)?;
|
||||
Ok(payload_json)
|
||||
})
|
||||
.map_err(StoreError::from)?;
|
||||
let mut blocks: Vec<kebab_core::Block> = Vec::new();
|
||||
for block_row in block_rows {
|
||||
let payload_json = block_row.map_err(StoreError::from)?;
|
||||
let block: kebab_core::Block = serde_json::from_str(&payload_json)
|
||||
.context("deserialize block payload_json")?;
|
||||
blocks.push(block);
|
||||
}
|
||||
|
||||
let metadata: kebab_core::Metadata = serde_json::from_str(&row.metadata_json)
|
||||
.context("deserialize metadata_json")?;
|
||||
let provenance: kebab_core::Provenance =
|
||||
serde_json::from_str(&row.provenance_json)
|
||||
.context("deserialize provenance_json")?;
|
||||
|
||||
Ok(Some(kebab_core::CanonicalDocument {
|
||||
doc_id,
|
||||
source_asset_id: kebab_core::AssetId(row.asset_id),
|
||||
workspace_path: kebab_core::WorkspacePath(row.workspace_path),
|
||||
title: row.title.unwrap_or_default(),
|
||||
lang: kebab_core::Lang(row.lang.unwrap_or_default()),
|
||||
blocks,
|
||||
metadata,
|
||||
provenance,
|
||||
parser_version: kebab_core::ParserVersion(row.parser_version),
|
||||
schema_version: row.schema_version as u32,
|
||||
doc_version: row.doc_version as u32,
|
||||
last_chunker_version: row.last_chunker_version.map(kebab_core::ChunkerVersion),
|
||||
last_embedding_version: row.last_embedding_version.map(kebab_core::EmbeddingVersion),
|
||||
}))
|
||||
}
|
||||
|
||||
fn list_documents(
|
||||
&self,
|
||||
filter: &kebab_core::DocFilter,
|
||||
|
||||
Reference in New Issue
Block a user
[Minor — 선호도]
get_document(기존)은 블록 루프에서 외부row를 shadow하는 반면 이 구현은block_row로 명명해 shadow를 회피했습니다 — 더 명확합니다.row.doc_id.clone()이 필요한 이유가 코드 맥락만으로 즉시 드러나지 않을 수 있습니다. 짧은 주석 한 줄(// clone: row.doc_id is moved into params![] below)을 추가하면 다음 독자가 borrow-move 이유를 즉시 이해할 수 있습니다. 필수 사항은 아닙니다.