diff --git a/crates/kebab-app/src/lib.rs b/crates/kebab-app/src/lib.rs index e627b06..8995149 100644 --- a/crates/kebab-app/src/lib.rs +++ b/crates/kebab-app/src/lib.rs @@ -375,6 +375,28 @@ pub fn ingest_with_config_opts( .map(|d| d.doc_id.0) .collect(); + // Dogfood: post-walker sweep to remove stored docs whose source + // file has been deleted from the filesystem. Must run BEFORE the + // per-asset loop so the loop's New/Updated labelling is based on + // the post-purge store state (the purged doc_ids won't be in + // `existing_doc_ids` above — they were already removed, OR the + // sweep here removes them before we start counting). + // + // Critical design invariant: only purge when the file is TRULY + // absent from disk. A file that is still on disk but outside the + // current walker scope (config narrowing / include-glob change) is + // NOT purged — we leave it in place to protect against accidental + // data loss via config edits. + let scanned_paths: std::collections::HashSet = assets + .iter() + .map(|a| a.workspace_path.clone()) + .collect(); + let purged_deleted_files = sweep_deleted_files( + &app, + &scanned_paths, + vector_store.as_ref().map(|v| v.as_ref()), + )?; + let started_at = time::OffsetDateTime::now_utc(); let mut items: Vec = Vec::new(); @@ -647,11 +669,11 @@ pub fn ingest_with_config_opts( crate::ingest_progress::emit(progress, terminal_event); // p9-fb-19: bump the persistent corpus_revision counter when a - // commit landed (any new / updated). This invalidates every + // commit landed (any new / updated / purged). This invalidates every // entry in any in-process LRU search cache (in this process or // a sibling) on the next lookup. No-op when nothing changed // (skipped-only run) — the cache stays valid. - if new_count > 0 || updated_count > 0 { + if new_count > 0 || updated_count > 0 || purged_deleted_files > 0 { match app.sqlite.bump_corpus_revision() { Ok(rev) => tracing::debug!( target: "kebab-app", @@ -682,6 +704,7 @@ pub fn ingest_with_config_opts( skipped_generated: fs_skips.skipped_generated, skipped_size_exceeded: fs_skips.skipped_size_exceeded, skip_examples: fs_skips.skip_examples, + purged_deleted_files, items: if summary_only { None } else { Some(items) }, }) } @@ -1453,6 +1476,120 @@ fn purge_vector_orphans_for_workspace_path( Ok(()) } +/// Dogfood: post-walker sweep that purges stored documents whose source +/// file has been physically deleted from the filesystem. +/// +/// Algorithm: +/// 1. Query `documents` for every `workspace_path` currently stored. +/// 2. Compute `orphan_candidates = stored_paths - scanned_paths`. +/// 3. For each candidate: resolve to an absolute path and call +/// `Path::try_exists().unwrap_or(true)` — transient FS errors +/// (EACCES, NFS hiccup, ownership change) conservatively count as +/// "still present" so we never purge on uncertain signal. If the +/// file still exists on disk it was merely out-of-scope this run +/// (config narrowing / include-glob change) — leave it alone. Only +/// files that are truly absent trigger a purge. +/// 4. For absent files: call `purge_deleted_workspace_path` (SQLite +/// cascade delete + optional copied-asset file removal) and, if a +/// vector store is present, delete the associated vectors. +/// +/// Returns the number of documents purged. +/// +/// Non-fatal design: individual purge failures are logged and counted +/// as errors on the per-file level but do NOT abort the sweep — a +/// partial failure is preferable to blocking the rest of ingest. The +/// return value only counts successful purges. +fn sweep_deleted_files( + app: &App, + scanned_paths: &std::collections::HashSet, + vector_store: Option<&kebab_store_vector::LanceVectorStore>, +) -> anyhow::Result { + use kebab_core::DocumentStore as _; + + let stored_paths = app + .sqlite + .all_workspace_paths() + .context("sweep_deleted_files: all_workspace_paths")?; + + if stored_paths.is_empty() { + return Ok(0); + } + + let workspace_root = app.config.resolve_workspace_root(); + let mut purged: u32 = 0; + + for stored_path in stored_paths { + if scanned_paths.contains(&stored_path) { + continue; // still in scope — skip + } + + // Resolve to an absolute path and check existence on disk. + // Use `try_exists` + `unwrap_or(true)` so transient FS errors + // (EACCES on a path we lack read on, NFS hiccups, ownership + // change) are CONSERVATIVELY treated as "file still present" — + // never purge on uncertain signal (data-safety: PR #148 review). + // `exists()` would return false on Err and trigger a wrongful + // purge. Files whose path cannot be joined (theoretically + // impossible for non-empty workspace_path strings, but + // defense-in-depth) are likewise treated as still present. + let abs = workspace_root.join(&stored_path.0); + if abs.try_exists().unwrap_or(true) { + // File is on disk but not in this scan's scope (config + // narrowing). DO NOT purge — critical design constraint. + tracing::debug!( + target: "kebab-app", + path = %stored_path.0, + "sweep_deleted_files: file on disk but out of scope — leaving in store" + ); + continue; + } + + // File is truly absent → purge. + let chunk_ids = match kebab_store_sqlite::purge_deleted_workspace_path( + &app.sqlite, + &stored_path, + ) { + Ok(ids) => ids, + Err(e) => { + tracing::warn!( + target: "kebab-app", + path = %stored_path.0, + error = %e, + "sweep_deleted_files: purge failed; skipping this path" + ); + continue; + } + }; + + // Purge associated vectors (best-effort; partial failure + // acceptable — orphan vectors get cleaned by `kebab reset + // --vector-only` if they accumulate). + if let Some(vec) = vector_store { + if !chunk_ids.is_empty() { + use kebab_core::VectorStore as _; + if let Err(e) = vec.delete_by_chunk_ids(&chunk_ids) { + tracing::warn!( + target: "kebab-app", + path = %stored_path.0, + count = chunk_ids.len(), + error = %e, + "sweep_deleted_files: vector delete failed; SQLite side already cleaned" + ); + } + } + } + + tracing::info!( + target: "kebab-app", + path = %stored_path.0, + "sweep_deleted_files: purged document for deleted file" + ); + purged = purged.saturating_add(1); + } + + Ok(purged) +} + /// P7-3: process one `MediaType::Pdf` asset end-to-end. /// /// - Reads bytes from disk. diff --git a/crates/kebab-app/tests/file_deletion_auto_purge.rs b/crates/kebab-app/tests/file_deletion_auto_purge.rs new file mode 100644 index 0000000..5e16456 --- /dev/null +++ b/crates/kebab-app/tests/file_deletion_auto_purge.rs @@ -0,0 +1,178 @@ +//! Dogfood: auto-purge stored docs for filesystem-deleted files. +//! +//! Two tests: +//! +//! 1. `file_deletion_auto_purge` — ingest 2 files, delete one, re-ingest. +//! The re-ingest must report `purged_deleted_files = 1`, the deleted +//! file must no longer appear in `list_docs`, and lexical search for +//! its unique content must return no hits. +//! +//! 2. `include_scope_narrowing_does_not_purge` — ingest 2 files under a +//! wide glob, narrow the walker scope to only one file, re-ingest. +//! The narrowed ingest must NOT purge the out-of-scope file because +//! the file is still on disk (just excluded from this run). Protects +//! users against accidental data loss via config edits. + +mod common; + +use common::TestEnv; +use kebab_app::ingest_with_config_opts; +use kebab_app::IngestOpts; +use kebab_core::{DocFilter, DocumentStore, SearchMode, SearchQuery, SourceScope}; + +/// Helper: open the store via `TestEnv` and run `list_documents`. +fn list_doc_paths(env: &TestEnv) -> Vec { + use kebab_store_sqlite::SqliteStore; + let store = SqliteStore::open(&env.config).unwrap(); + store.run_migrations().unwrap(); + store + .list_documents(&DocFilter::default()) + .unwrap() + .into_iter() + .map(|d| d.doc_path.0) + .collect() +} + +#[test] +fn file_deletion_auto_purge() { + let env = TestEnv::lexical_only(); + + // Write two .rs files into the workspace. + let a_path = env.workspace_root.join("a.rs"); + let b_path = env.workspace_root.join("b.rs"); + std::fs::write(&a_path, "// file a\nfn alpha() {}\n").unwrap(); + std::fs::write(&b_path, "// file b\nfn bravo() {}\n").unwrap(); + + // First ingest — both must be New. + let first = ingest_with_config_opts( + env.config.clone(), + env.scope(), + false, + IngestOpts::default(), + ) + .expect("first ingest must succeed"); + // Only count the .rs files we added (there may be fixture files too). + let first_new = first.new; + assert!(first_new >= 2, "expected at least 2 new docs: {first:?}"); + assert_eq!( + first.purged_deleted_files, 0, + "no purges on first ingest: {first:?}" + ); + assert_eq!(first.errors, 0, "no errors on first ingest: {first:?}"); + + // Delete one file from the filesystem. + std::fs::remove_file(&b_path).expect("remove b.rs"); + + // Second ingest — scanned count drops by 1; b.rs should be purged. + let second = ingest_with_config_opts( + env.config.clone(), + env.scope(), + false, + IngestOpts::default(), + ) + .expect("second ingest must succeed"); + + assert_eq!( + second.purged_deleted_files, 1, + "exactly 1 file should be purged: {second:?}" + ); + assert_eq!(second.new, 0, "no new docs after deletion: {second:?}"); + assert_eq!(second.updated, 0, "no updated docs: {second:?}"); + assert_eq!(second.errors, 0, "no errors: {second:?}"); + + // b.rs must no longer appear in list_docs. + let doc_paths = list_doc_paths(&env); + let b_ws_path = "b.rs"; + assert!( + !doc_paths.iter().any(|p| p == b_ws_path), + "b.rs must be gone from list_docs; got: {doc_paths:?}" + ); + // a.rs must still be present. + let a_ws_path = "a.rs"; + assert!( + doc_paths.iter().any(|p| p == a_ws_path), + "a.rs must still be in list_docs; got: {doc_paths:?}" + ); + + // Lexical search for b.rs's unique content returns no hits. + let app = env.app(); + let query = SearchQuery { + text: "bravo".to_string(), + mode: SearchMode::Lexical, + k: 10, + filters: kebab_core::SearchFilters::default(), + }; + let hits = app.search(query).expect("search must not error"); + assert!( + hits.is_empty(), + "search for deleted file's content must return no hits; got: {hits:?}" + ); +} + +#[test] +fn include_scope_narrowing_does_not_purge() { + let env = TestEnv::lexical_only(); + + // Write two .rs files. + let a_path = env.workspace_root.join("a_narrow.rs"); + let b_path = env.workspace_root.join("b_narrow.rs"); + std::fs::write(&a_path, "// narrow a\nfn alpha_narrow() {}\n").unwrap(); + std::fs::write(&b_path, "// narrow b\nfn bravo_narrow() {}\n").unwrap(); + + // Wide scope: first ingest — both must be New. + let wide_scope = SourceScope { + root: env.workspace_root.clone(), + include: vec!["**/*.rs".to_string()], + exclude: env.config.workspace.exclude.clone(), + }; + let first = ingest_with_config_opts( + env.config.clone(), + wide_scope, + false, + IngestOpts::default(), + ) + .expect("first ingest (wide) must succeed"); + assert!( + first.new >= 2, + "expected at least 2 new docs: {first:?}" + ); + assert_eq!( + first.purged_deleted_files, 0, + "no purges on first ingest: {first:?}" + ); + + // Narrow scope: only a_narrow.rs in include — b_narrow.rs is still + // on disk but excluded from the walker scope. + let narrow_scope = SourceScope { + root: env.workspace_root.clone(), + include: vec!["a_narrow.rs".to_string()], + exclude: env.config.workspace.exclude.clone(), + }; + let second = ingest_with_config_opts( + env.config.clone(), + narrow_scope, + false, + IngestOpts::default(), + ) + .expect("second ingest (narrow) must succeed"); + + // CRITICAL: b_narrow.rs is still on disk — must NOT be purged. + assert_eq!( + second.purged_deleted_files, 0, + "scope-narrowing must NOT purge on-disk files; got: {second:?}" + ); + assert_eq!(second.errors, 0, "no errors: {second:?}"); + + // b_narrow.rs must still exist in the store. + let doc_paths = list_doc_paths(&env); + let b_ws_path = "b_narrow.rs"; + assert!( + doc_paths.iter().any(|p| p == b_ws_path), + "b_narrow.rs must still be in list_docs after scope narrowing; got: {doc_paths:?}" + ); + // And the file must still be on disk. + assert!( + b_path.exists(), + "b_narrow.rs must still be on disk (we didn't delete it)" + ); +} diff --git a/crates/kebab-cli/src/main.rs b/crates/kebab-cli/src/main.rs index ad06916..54a313e 100644 --- a/crates/kebab-cli/src/main.rs +++ b/crates/kebab-cli/src/main.rs @@ -595,14 +595,20 @@ fn run(cli: &Cli) -> anyhow::Result<()> { println!("{}", serde_json::to_string(&wire::wire_ingest(&report))?); } else { let skipped_breakdown = kebab_app::render_skipped_breakdown(&report.skipped_by_extension); + let purged_suffix = if report.purged_deleted_files > 0 { + format!(" purged {}", report.purged_deleted_files) + } else { + String::new() + }; println!( - "scanned {} new {} updated {} skipped {}{} errors {} ({} ms)", + "scanned {} new {} updated {} skipped {}{} errors {}{} ({} ms)", report.scanned, report.new, report.updated, report.skipped, skipped_breakdown, report.errors, + purged_suffix, report.duration_ms ); } diff --git a/crates/kebab-cli/src/wire.rs b/crates/kebab-cli/src/wire.rs index 6a2fce0..14a3623 100644 --- a/crates/kebab-cli/src/wire.rs +++ b/crates/kebab-cli/src/wire.rs @@ -260,6 +260,7 @@ mod tests { skipped_generated: 0, skipped_size_exceeded: 0, skip_examples: SkipExamples::default(), + purged_deleted_files: 0, items: None, }; let v = wire_ingest(&r); diff --git a/crates/kebab-core/src/ingest.rs b/crates/kebab-core/src/ingest.rs index a3a9916..8fff25b 100644 --- a/crates/kebab-core/src/ingest.rs +++ b/crates/kebab-core/src/ingest.rs @@ -47,6 +47,12 @@ pub struct IngestReport { /// p10-1A-1: sample file paths per skip category (≤ 5 each). #[serde(default)] pub skip_examples: SkipExamples, + /// Dogfood: docs whose on-disk file was deleted since the last ingest + /// and were therefore removed from the store. Additive field — older + /// wire consumers that pre-date this field read it as 0 via + /// `#[serde(default)]`. + #[serde(default)] + pub purged_deleted_files: u32, /// `None` ↔ wire `items: null` (`--summary-only`). pub items: Option>, } @@ -136,6 +142,7 @@ mod tests { builtin_blacklist: vec!["node_modules/x.js".into()], gitignore: vec![], }, + purged_deleted_files: 0, items: None, }; let v = serde_json::to_value(&r).unwrap(); diff --git a/crates/kebab-core/src/traits.rs b/crates/kebab-core/src/traits.rs index 74ed71d..fee79ad 100644 --- a/crates/kebab-core/src/traits.rs +++ b/crates/kebab-core/src/traits.rs @@ -183,6 +183,16 @@ pub trait DocumentStore { &self, path: &WorkspacePath, ) -> anyhow::Result>; + + /// Return every `workspace_path` stored in the `documents` table. + /// + /// Used by the post-walker sweep in `kebab-app::ingest` to detect + /// documents whose source file has been deleted from the filesystem. + /// The set difference `(stored - scanned)` yields orphan candidates; + /// each candidate is then existence-checked on disk so that + /// out-of-scope files (config narrowing) are NOT purged — only truly + /// absent files trigger the purge. + fn all_workspace_paths(&self) -> anyhow::Result>; } pub trait VectorStore { diff --git a/crates/kebab-store-sqlite/snapshots/ingest_report.snapshot.json b/crates/kebab-store-sqlite/snapshots/ingest_report.snapshot.json index f637f94..3fb065f 100644 --- a/crates/kebab-store-sqlite/snapshots/ingest_report.snapshot.json +++ b/crates/kebab-store-sqlite/snapshots/ingest_report.snapshot.json @@ -56,5 +56,6 @@ "skipped_kebabignore": 0, "skipped_size_exceeded": 0, "unchanged": 0, + "purged_deleted_files": 0, "updated": 1 } diff --git a/crates/kebab-store-sqlite/src/documents.rs b/crates/kebab-store-sqlite/src/documents.rs index d7d2a0a..fe0921a 100644 --- a/crates/kebab-store-sqlite/src/documents.rs +++ b/crates/kebab-store-sqlite/src/documents.rs @@ -352,6 +352,22 @@ impl kebab_core::DocumentStore for SqliteStore { })) } + fn all_workspace_paths(&self) -> Result> { + let conn = self.lock_conn(); + let mut stmt = conn + .prepare("SELECT workspace_path FROM documents") + .map_err(StoreError::from)?; + let rows = stmt + .query_map([], |r| r.get::<_, String>(0)) + .map_err(StoreError::from)?; + let mut out = Vec::new(); + for row in rows { + let path = row.map_err(StoreError::from)?; + out.push(kebab_core::WorkspacePath(path)); + } + Ok(out) + } + fn list_documents( &self, filter: &kebab_core::DocFilter, diff --git a/crates/kebab-store-sqlite/src/lib.rs b/crates/kebab-store-sqlite/src/lib.rs index 89c0fa3..e1b9fb9 100644 --- a/crates/kebab-store-sqlite/src/lib.rs +++ b/crates/kebab-store-sqlite/src/lib.rs @@ -35,4 +35,4 @@ pub use error::StoreError; pub use eval::{EvalQueryResultRecord, EvalRunRecord, EvalRunRow}; pub use fts::rebuild_chunks_fts; pub use jobs::IngestRunRow; -pub use store::{CountSummary, NotIndexed, SqliteStore}; +pub use store::{CountSummary, NotIndexed, SqliteStore, purge_deleted_workspace_path}; diff --git a/crates/kebab-store-sqlite/src/store.rs b/crates/kebab-store-sqlite/src/store.rs index 8db087b..d6621b4 100644 --- a/crates/kebab-store-sqlite/src/store.rs +++ b/crates/kebab-store-sqlite/src/store.rs @@ -540,6 +540,114 @@ pub(crate) fn purge_orphan_at_workspace_path( Ok(()) } +/// Purge all stored data for a document whose on-disk file has been +/// deleted (as opposed to content-changed, which is handled by +/// `purge_orphan_at_workspace_path`). +/// +/// Returns the `chunk_id`s that were associated with the document so +/// the caller can issue a matching `VectorStore::delete_by_chunk_ids` +/// on the LanceDB side. +/// +/// Deletion order: +/// 1. Collect chunk_ids (before cascade removes them). +/// 2. DELETE the `documents` row → CASCADE clears `blocks`, `chunks`, +/// `embedding_records`. +/// 3. DELETE the `assets` row **only if no other document still +/// references it** (twin-file protection — `assets` can be shared +/// across identical-content files via the blake3 PK). +/// 4. If the asset was `storage_kind = 'copied'`, best-effort delete +/// the on-disk byte file at `storage_path`. +/// +/// Returns `Ok(vec![])` when no document exists at `workspace_path` +/// (idempotent — caller doesn't need to pre-check). +pub fn purge_deleted_workspace_path( + store: &SqliteStore, + workspace_path: &kebab_core::WorkspacePath, +) -> anyhow::Result> { + let conn = store.lock_conn(); + + // Look up the document + its asset_id. + let doc_row: Option<(String, String)> = conn + .query_row( + "SELECT doc_id, asset_id FROM documents WHERE workspace_path = ?", + rusqlite::params![workspace_path.0], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + .optional() + .map_err(StoreError::from)?; + + let Some((doc_id, asset_id)) = doc_row else { + return Ok(Vec::new()); + }; + + // 1. Collect chunk_ids before CASCADE removes them. + let mut stmt = conn + .prepare("SELECT chunk_id FROM chunks WHERE doc_id = ?") + .map_err(StoreError::from)?; + let rows = stmt + .query_map(rusqlite::params![doc_id], |r| r.get::<_, String>(0)) + .map_err(StoreError::from)?; + let chunk_ids: Vec = rows + .map(|r| r.map(kebab_core::ChunkId)) + .collect::>>() + .map_err(StoreError::from)?; + drop(stmt); + + // 2. DELETE the document row (CASCADE clears blocks / chunks / + // embedding_records via the FK constraints in V001). + conn.execute( + "DELETE FROM documents WHERE doc_id = ?", + rusqlite::params![doc_id], + ) + .map_err(StoreError::from)?; + + // 3. Delete the asset row only when no other document still + // references it (twin-file safety: two files with identical + // bytes share a single asset row via the blake3 PK). + let remaining_refs: i64 = conn + .query_row( + "SELECT COUNT(*) FROM documents WHERE asset_id = ?", + rusqlite::params![asset_id], + |r| r.get(0), + ) + .map_err(StoreError::from)?; + + if remaining_refs == 0 { + // 4. Capture storage details before deleting the row. + let asset_storage: Option<(String, String)> = conn + .query_row( + "SELECT storage_kind, storage_path FROM assets WHERE asset_id = ?", + rusqlite::params![asset_id], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + .optional() + .map_err(StoreError::from)?; + + conn.execute( + "DELETE FROM assets WHERE asset_id = ?", + rusqlite::params![asset_id], + ) + .map_err(StoreError::from)?; + + // 5. Best-effort: remove the on-disk copied asset file. + if let Some((storage_kind, storage_path)) = asset_storage { + if storage_kind == "copied" { + let _ = std::fs::remove_file(&storage_path); + } + } + } + + tracing::debug!( + target: "kebab-store-sqlite", + workspace_path = %workspace_path.0, + doc_id = %doc_id, + chunk_count = chunk_ids.len(), + "purged deleted-file document from store" + ); + + Ok(chunk_ids) +} + /// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes` /// path (which has bytes + computed `storage_kind/path`) and the /// `DocumentStore::put_asset` path (which only has the `RawAsset` and diff --git a/crates/kebab-store-sqlite/tests/ingest_report_snapshot.rs b/crates/kebab-store-sqlite/tests/ingest_report_snapshot.rs index 5caf9dc..6b15bc0 100644 --- a/crates/kebab-store-sqlite/tests/ingest_report_snapshot.rs +++ b/crates/kebab-store-sqlite/tests/ingest_report_snapshot.rs @@ -41,6 +41,7 @@ fn fixture_report() -> IngestReport { skipped_generated: 0, skipped_size_exceeded: 0, skip_examples: kebab_core::SkipExamples::default(), + purged_deleted_files: 0, items: Some(vec![ IngestItem { kind: IngestItemKind::New,