fix(dogfood): auto-purge stored docs for filesystem-deleted files

Files deleted from disk (rm a.md) were leaving stale documents + chunks +
embeddings in the store, surfacing as ghost citations in search/ask.
Existing purge_orphan_at_workspace_path only handled content-changed
stale (WHERE workspace_path=? AND asset_id != ?) — file deletion has no
new asset_id.

Fix: post-walker-scan sweep. Compute (stored_paths - scanned_paths),
for each candidate check filesystem existence — only purge when the
file is TRULY missing. Scope-narrowing case (file on disk but outside
include glob) is explicitly NOT purged to protect users from accidental
data loss via config edits.

Adds:
- DocumentStore::all_workspace_paths trait method + SqliteStore impl
- purge_deleted_workspace_path in store-sqlite (returns chunk_ids for
  vector delete; deletes doc CASCADE + asset row + copied storage file)
- sweep_deleted_files in kebab-app::ingest path; called once per ingest
  before the per-asset loop
- IngestReport.purged_deleted_files counter (additive, serde default)
- CLI ingest summary mentions purge count when > 0
- 2 integration tests: file_deletion_auto_purge + include_scope_narrowing_does_NOT_purge

dogfood discovery (PR #142 1B + multi-root: kebab-docs + httpx + zod
+ lodash). Per user decision: only filesystem deletion auto-purges;
scope narrowing requires explicit kebab reset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-20 06:51:07 +00:00
parent acf8cf3be2
commit 27baec82ea
11 changed files with 461 additions and 4 deletions

View File

@@ -375,6 +375,28 @@ pub fn ingest_with_config_opts(
.map(|d| d.doc_id.0)
.collect();
// Dogfood: post-walker sweep to remove stored docs whose source
// file has been deleted from the filesystem. Must run BEFORE the
// per-asset loop so the loop's New/Updated labelling is based on
// the post-purge store state (the purged doc_ids won't be in
// `existing_doc_ids` above — they were already removed, OR the
// sweep here removes them before we start counting).
//
// Critical design invariant: only purge when the file is TRULY
// absent from disk. A file that is still on disk but outside the
// current walker scope (config narrowing / include-glob change) is
// NOT purged — we leave it in place to protect against accidental
// data loss via config edits.
let scanned_paths: std::collections::HashSet<kebab_core::WorkspacePath> = assets
.iter()
.map(|a| a.workspace_path.clone())
.collect();
let purged_deleted_files = sweep_deleted_files(
&app,
&scanned_paths,
vector_store.as_ref().map(|v| v.as_ref()),
)?;
let started_at = time::OffsetDateTime::now_utc();
let mut items: Vec<kebab_core::IngestItem> = Vec::new();
@@ -647,11 +669,11 @@ pub fn ingest_with_config_opts(
crate::ingest_progress::emit(progress, terminal_event);
// p9-fb-19: bump the persistent corpus_revision counter when a
// commit landed (any new / updated). This invalidates every
// commit landed (any new / updated / purged). This invalidates every
// entry in any in-process LRU search cache (in this process or
// a sibling) on the next lookup. No-op when nothing changed
// (skipped-only run) — the cache stays valid.
if new_count > 0 || updated_count > 0 {
if new_count > 0 || updated_count > 0 || purged_deleted_files > 0 {
match app.sqlite.bump_corpus_revision() {
Ok(rev) => tracing::debug!(
target: "kebab-app",
@@ -682,6 +704,7 @@ pub fn ingest_with_config_opts(
skipped_generated: fs_skips.skipped_generated,
skipped_size_exceeded: fs_skips.skipped_size_exceeded,
skip_examples: fs_skips.skip_examples,
purged_deleted_files,
items: if summary_only { None } else { Some(items) },
})
}
@@ -1453,6 +1476,112 @@ fn purge_vector_orphans_for_workspace_path(
Ok(())
}
/// Dogfood: post-walker sweep that purges stored documents whose source
/// file has been physically deleted from the filesystem.
///
/// Algorithm:
/// 1. Query `documents` for every `workspace_path` currently stored.
/// 2. Compute `orphan_candidates = stored_paths - scanned_paths`.
/// 3. For each candidate: resolve to an absolute path and call
/// `fs::exists()`. If the file still exists on disk it was merely
/// out-of-scope this run (config narrowing / include-glob change) —
/// leave it alone. Only files that are truly absent trigger a purge.
/// 4. For absent files: call `purge_deleted_workspace_path` (SQLite
/// cascade delete + optional copied-asset file removal) and, if a
/// vector store is present, delete the associated vectors.
///
/// Returns the number of documents purged.
///
/// Non-fatal design: individual purge failures are logged and counted
/// as errors on the per-file level but do NOT abort the sweep — a
/// partial failure is preferable to blocking the rest of ingest. The
/// return value only counts successful purges.
fn sweep_deleted_files(
app: &App,
scanned_paths: &std::collections::HashSet<kebab_core::WorkspacePath>,
vector_store: Option<&kebab_store_vector::LanceVectorStore>,
) -> anyhow::Result<u32> {
use kebab_core::DocumentStore as _;
let stored_paths = app
.sqlite
.all_workspace_paths()
.context("sweep_deleted_files: all_workspace_paths")?;
if stored_paths.is_empty() {
return Ok(0);
}
let workspace_root = app.config.resolve_workspace_root();
let mut purged: u32 = 0;
for stored_path in stored_paths {
if scanned_paths.contains(&stored_path) {
continue; // still in scope — skip
}
// Resolve to an absolute path and check existence on disk.
// Files whose path cannot be joined (theoretically impossible
// for non-empty workspace_path strings, but defense-in-depth)
// are treated as "still present" to avoid accidental deletion.
let abs = workspace_root.join(&stored_path.0);
if abs.exists() {
// File is on disk but not in this scan's scope (config
// narrowing). DO NOT purge — critical design constraint.
tracing::debug!(
target: "kebab-app",
path = %stored_path.0,
"sweep_deleted_files: file on disk but out of scope — leaving in store"
);
continue;
}
// File is truly absent → purge.
let chunk_ids = match kebab_store_sqlite::purge_deleted_workspace_path(
&app.sqlite,
&stored_path,
) {
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
target: "kebab-app",
path = %stored_path.0,
error = %e,
"sweep_deleted_files: purge failed; skipping this path"
);
continue;
}
};
// Purge associated vectors (best-effort; partial failure
// acceptable — orphan vectors get cleaned by `kebab reset
// --vector-only` if they accumulate).
if let Some(vec) = vector_store {
if !chunk_ids.is_empty() {
use kebab_core::VectorStore as _;
if let Err(e) = vec.delete_by_chunk_ids(&chunk_ids) {
tracing::warn!(
target: "kebab-app",
path = %stored_path.0,
count = chunk_ids.len(),
error = %e,
"sweep_deleted_files: vector delete failed; SQLite side already cleaned"
);
}
}
}
tracing::info!(
target: "kebab-app",
path = %stored_path.0,
"sweep_deleted_files: purged document for deleted file"
);
purged = purged.saturating_add(1);
}
Ok(purged)
}
/// P7-3: process one `MediaType::Pdf` asset end-to-end.
///
/// - Reads bytes from disk.

View File

@@ -0,0 +1,178 @@
//! Dogfood: auto-purge stored docs for filesystem-deleted files.
//!
//! Two tests:
//!
//! 1. `file_deletion_auto_purge` — ingest 2 files, delete one, re-ingest.
//! The re-ingest must report `purged_deleted_files = 1`, the deleted
//! file must no longer appear in `list_docs`, and lexical search for
//! its unique content must return no hits.
//!
//! 2. `include_scope_narrowing_does_not_purge` — ingest 2 files under a
//! wide glob, narrow the walker scope to only one file, re-ingest.
//! The narrowed ingest must NOT purge the out-of-scope file because
//! the file is still on disk (just excluded from this run). Protects
//! users against accidental data loss via config edits.
mod common;
use common::TestEnv;
use kebab_app::ingest_with_config_opts;
use kebab_app::IngestOpts;
use kebab_core::{DocFilter, DocumentStore, SearchMode, SearchQuery, SourceScope};
/// Helper: open the store via `TestEnv` and run `list_documents`.
fn list_doc_paths(env: &TestEnv) -> Vec<String> {
use kebab_store_sqlite::SqliteStore;
let store = SqliteStore::open(&env.config).unwrap();
store.run_migrations().unwrap();
store
.list_documents(&DocFilter::default())
.unwrap()
.into_iter()
.map(|d| d.doc_path.0)
.collect()
}
#[test]
fn file_deletion_auto_purge() {
let env = TestEnv::lexical_only();
// Write two .rs files into the workspace.
let a_path = env.workspace_root.join("a.rs");
let b_path = env.workspace_root.join("b.rs");
std::fs::write(&a_path, "// file a\nfn alpha() {}\n").unwrap();
std::fs::write(&b_path, "// file b\nfn bravo() {}\n").unwrap();
// First ingest — both must be New.
let first = ingest_with_config_opts(
env.config.clone(),
env.scope(),
false,
IngestOpts::default(),
)
.expect("first ingest must succeed");
// Only count the .rs files we added (there may be fixture files too).
let first_new = first.new;
assert!(first_new >= 2, "expected at least 2 new docs: {first:?}");
assert_eq!(
first.purged_deleted_files, 0,
"no purges on first ingest: {first:?}"
);
assert_eq!(first.errors, 0, "no errors on first ingest: {first:?}");
// Delete one file from the filesystem.
std::fs::remove_file(&b_path).expect("remove b.rs");
// Second ingest — scanned count drops by 1; b.rs should be purged.
let second = ingest_with_config_opts(
env.config.clone(),
env.scope(),
false,
IngestOpts::default(),
)
.expect("second ingest must succeed");
assert_eq!(
second.purged_deleted_files, 1,
"exactly 1 file should be purged: {second:?}"
);
assert_eq!(second.new, 0, "no new docs after deletion: {second:?}");
assert_eq!(second.updated, 0, "no updated docs: {second:?}");
assert_eq!(second.errors, 0, "no errors: {second:?}");
// b.rs must no longer appear in list_docs.
let doc_paths = list_doc_paths(&env);
let b_ws_path = "b.rs";
assert!(
!doc_paths.iter().any(|p| p == b_ws_path),
"b.rs must be gone from list_docs; got: {doc_paths:?}"
);
// a.rs must still be present.
let a_ws_path = "a.rs";
assert!(
doc_paths.iter().any(|p| p == a_ws_path),
"a.rs must still be in list_docs; got: {doc_paths:?}"
);
// Lexical search for b.rs's unique content returns no hits.
let app = env.app();
let query = SearchQuery {
text: "bravo".to_string(),
mode: SearchMode::Lexical,
k: 10,
filters: kebab_core::SearchFilters::default(),
};
let hits = app.search(query).expect("search must not error");
assert!(
hits.is_empty(),
"search for deleted file's content must return no hits; got: {hits:?}"
);
}
#[test]
fn include_scope_narrowing_does_not_purge() {
let env = TestEnv::lexical_only();
// Write two .rs files.
let a_path = env.workspace_root.join("a_narrow.rs");
let b_path = env.workspace_root.join("b_narrow.rs");
std::fs::write(&a_path, "// narrow a\nfn alpha_narrow() {}\n").unwrap();
std::fs::write(&b_path, "// narrow b\nfn bravo_narrow() {}\n").unwrap();
// Wide scope: first ingest — both must be New.
let wide_scope = SourceScope {
root: env.workspace_root.clone(),
include: vec!["**/*.rs".to_string()],
exclude: env.config.workspace.exclude.clone(),
};
let first = ingest_with_config_opts(
env.config.clone(),
wide_scope,
false,
IngestOpts::default(),
)
.expect("first ingest (wide) must succeed");
assert!(
first.new >= 2,
"expected at least 2 new docs: {first:?}"
);
assert_eq!(
first.purged_deleted_files, 0,
"no purges on first ingest: {first:?}"
);
// Narrow scope: only a_narrow.rs in include — b_narrow.rs is still
// on disk but excluded from the walker scope.
let narrow_scope = SourceScope {
root: env.workspace_root.clone(),
include: vec!["a_narrow.rs".to_string()],
exclude: env.config.workspace.exclude.clone(),
};
let second = ingest_with_config_opts(
env.config.clone(),
narrow_scope,
false,
IngestOpts::default(),
)
.expect("second ingest (narrow) must succeed");
// CRITICAL: b_narrow.rs is still on disk — must NOT be purged.
assert_eq!(
second.purged_deleted_files, 0,
"scope-narrowing must NOT purge on-disk files; got: {second:?}"
);
assert_eq!(second.errors, 0, "no errors: {second:?}");
// b_narrow.rs must still exist in the store.
let doc_paths = list_doc_paths(&env);
let b_ws_path = "b_narrow.rs";
assert!(
doc_paths.iter().any(|p| p == b_ws_path),
"b_narrow.rs must still be in list_docs after scope narrowing; got: {doc_paths:?}"
);
// And the file must still be on disk.
assert!(
b_path.exists(),
"b_narrow.rs must still be on disk (we didn't delete it)"
);
}

View File

@@ -595,14 +595,20 @@ fn run(cli: &Cli) -> anyhow::Result<()> {
println!("{}", serde_json::to_string(&wire::wire_ingest(&report))?);
} else {
let skipped_breakdown = kebab_app::render_skipped_breakdown(&report.skipped_by_extension);
let purged_suffix = if report.purged_deleted_files > 0 {
format!(" purged {}", report.purged_deleted_files)
} else {
String::new()
};
println!(
"scanned {} new {} updated {} skipped {}{} errors {} ({} ms)",
"scanned {} new {} updated {} skipped {}{} errors {}{} ({} ms)",
report.scanned,
report.new,
report.updated,
report.skipped,
skipped_breakdown,
report.errors,
purged_suffix,
report.duration_ms
);
}

View File

@@ -260,6 +260,7 @@ mod tests {
skipped_generated: 0,
skipped_size_exceeded: 0,
skip_examples: SkipExamples::default(),
purged_deleted_files: 0,
items: None,
};
let v = wire_ingest(&r);

View File

@@ -47,6 +47,12 @@ pub struct IngestReport {
/// p10-1A-1: sample file paths per skip category (≤ 5 each).
#[serde(default)]
pub skip_examples: SkipExamples,
/// Dogfood: docs whose on-disk file was deleted since the last ingest
/// and were therefore removed from the store. Additive field — older
/// wire consumers that pre-date this field read it as 0 via
/// `#[serde(default)]`.
#[serde(default)]
pub purged_deleted_files: u32,
/// `None` ↔ wire `items: null` (`--summary-only`).
pub items: Option<Vec<IngestItem>>,
}
@@ -136,6 +142,7 @@ mod tests {
builtin_blacklist: vec!["node_modules/x.js".into()],
gitignore: vec![],
},
purged_deleted_files: 0,
items: None,
};
let v = serde_json::to_value(&r).unwrap();

View File

@@ -183,6 +183,16 @@ pub trait DocumentStore {
&self,
path: &WorkspacePath,
) -> anyhow::Result<Option<CanonicalDocument>>;
/// Return every `workspace_path` stored in the `documents` table.
///
/// Used by the post-walker sweep in `kebab-app::ingest` to detect
/// documents whose source file has been deleted from the filesystem.
/// The set difference `(stored - scanned)` yields orphan candidates;
/// each candidate is then existence-checked on disk so that
/// out-of-scope files (config narrowing) are NOT purged — only truly
/// absent files trigger the purge.
fn all_workspace_paths(&self) -> anyhow::Result<Vec<WorkspacePath>>;
}
pub trait VectorStore {

View File

@@ -56,5 +56,6 @@
"skipped_kebabignore": 0,
"skipped_size_exceeded": 0,
"unchanged": 0,
"purged_deleted_files": 0,
"updated": 1
}

View File

@@ -352,6 +352,22 @@ impl kebab_core::DocumentStore for SqliteStore {
}))
}
fn all_workspace_paths(&self) -> Result<Vec<kebab_core::WorkspacePath>> {
let conn = self.lock_conn();
let mut stmt = conn
.prepare("SELECT workspace_path FROM documents")
.map_err(StoreError::from)?;
let rows = stmt
.query_map([], |r| r.get::<_, String>(0))
.map_err(StoreError::from)?;
let mut out = Vec::new();
for row in rows {
let path = row.map_err(StoreError::from)?;
out.push(kebab_core::WorkspacePath(path));
}
Ok(out)
}
fn list_documents(
&self,
filter: &kebab_core::DocFilter,

View File

@@ -35,4 +35,4 @@ pub use error::StoreError;
pub use eval::{EvalQueryResultRecord, EvalRunRecord, EvalRunRow};
pub use fts::rebuild_chunks_fts;
pub use jobs::IngestRunRow;
pub use store::{CountSummary, NotIndexed, SqliteStore};
pub use store::{CountSummary, NotIndexed, SqliteStore, purge_deleted_workspace_path};

View File

@@ -540,6 +540,114 @@ pub(crate) fn purge_orphan_at_workspace_path(
Ok(())
}
/// Purge all stored data for a document whose on-disk file has been
/// deleted (as opposed to content-changed, which is handled by
/// `purge_orphan_at_workspace_path`).
///
/// Returns the `chunk_id`s that were associated with the document so
/// the caller can issue a matching `VectorStore::delete_by_chunk_ids`
/// on the LanceDB side.
///
/// Deletion order:
/// 1. Collect chunk_ids (before cascade removes them).
/// 2. DELETE the `documents` row → CASCADE clears `blocks`, `chunks`,
/// `embedding_records`.
/// 3. DELETE the `assets` row **only if no other document still
/// references it** (twin-file protection — `assets` can be shared
/// across identical-content files via the blake3 PK).
/// 4. If the asset was `storage_kind = 'copied'`, best-effort delete
/// the on-disk byte file at `storage_path`.
///
/// Returns `Ok(vec![])` when no document exists at `workspace_path`
/// (idempotent — caller doesn't need to pre-check).
pub fn purge_deleted_workspace_path(
store: &SqliteStore,
workspace_path: &kebab_core::WorkspacePath,
) -> anyhow::Result<Vec<kebab_core::ChunkId>> {
let conn = store.lock_conn();
// Look up the document + its asset_id.
let doc_row: Option<(String, String)> = conn
.query_row(
"SELECT doc_id, asset_id FROM documents WHERE workspace_path = ?",
rusqlite::params![workspace_path.0],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.optional()
.map_err(StoreError::from)?;
let Some((doc_id, asset_id)) = doc_row else {
return Ok(Vec::new());
};
// 1. Collect chunk_ids before CASCADE removes them.
let mut stmt = conn
.prepare("SELECT chunk_id FROM chunks WHERE doc_id = ?")
.map_err(StoreError::from)?;
let rows = stmt
.query_map(rusqlite::params![doc_id], |r| r.get::<_, String>(0))
.map_err(StoreError::from)?;
let chunk_ids: Vec<kebab_core::ChunkId> = rows
.map(|r| r.map(kebab_core::ChunkId))
.collect::<rusqlite::Result<Vec<_>>>()
.map_err(StoreError::from)?;
drop(stmt);
// 2. DELETE the document row (CASCADE clears blocks / chunks /
// embedding_records via the FK constraints in V001).
conn.execute(
"DELETE FROM documents WHERE doc_id = ?",
rusqlite::params![doc_id],
)
.map_err(StoreError::from)?;
// 3. Delete the asset row only when no other document still
// references it (twin-file safety: two files with identical
// bytes share a single asset row via the blake3 PK).
let remaining_refs: i64 = conn
.query_row(
"SELECT COUNT(*) FROM documents WHERE asset_id = ?",
rusqlite::params![asset_id],
|r| r.get(0),
)
.map_err(StoreError::from)?;
if remaining_refs == 0 {
// 4. Capture storage details before deleting the row.
let asset_storage: Option<(String, String)> = conn
.query_row(
"SELECT storage_kind, storage_path FROM assets WHERE asset_id = ?",
rusqlite::params![asset_id],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.optional()
.map_err(StoreError::from)?;
conn.execute(
"DELETE FROM assets WHERE asset_id = ?",
rusqlite::params![asset_id],
)
.map_err(StoreError::from)?;
// 5. Best-effort: remove the on-disk copied asset file.
if let Some((storage_kind, storage_path)) = asset_storage {
if storage_kind == "copied" {
let _ = std::fs::remove_file(&storage_path);
}
}
}
tracing::debug!(
target: "kebab-store-sqlite",
workspace_path = %workspace_path.0,
doc_id = %doc_id,
chunk_count = chunk_ids.len(),
"purged deleted-file document from store"
);
Ok(chunk_ids)
}
/// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes`
/// path (which has bytes + computed `storage_kind/path`) and the
/// `DocumentStore::put_asset` path (which only has the `RawAsset` and

View File

@@ -41,6 +41,7 @@ fn fixture_report() -> IngestReport {
skipped_generated: 0,
skipped_size_exceeded: 0,
skip_examples: kebab_core::SkipExamples::default(),
purged_deleted_files: 0,
items: Some(vec![
IngestItem {
kind: IngestItemKind::New,