diff --git a/crates/kebab-app/src/ingest_progress.rs b/crates/kebab-app/src/ingest_progress.rs new file mode 100644 index 0000000..06c959e --- /dev/null +++ b/crates/kebab-app/src/ingest_progress.rs @@ -0,0 +1,191 @@ +//! Streaming progress events for `ingest_with_config_progress`. +//! +//! The facade emits one [`IngestEvent`] per step boundary into an +//! optional `mpsc::Sender` injected by the caller. CLI +//! (`p9-fb-02`), TUI (`p9-fb-03`), and future desktop UI all consume the +//! same stream — CLI dumps it as line-delimited JSON +//! (`ingest_progress.v1`), TUI feeds it into a status-bar reducer, and +//! anyone else can plug in their own receiver. +//! +//! Send is **best-effort**: a receiver that has been dropped is treated +//! as a no-op, never as an error. The ingest hot path must not stall +//! on a slow consumer. +//! +//! Cancellation lands in `p9-fb-04` and adds `IngestEvent::Aborted` +//! emission; this task only ever emits `Completed`. + +use serde::{Deserialize, Serialize}; + +use kebab_core::IngestItemKind; + +/// Aggregate counters surfaced on the terminal `Completed` (and, in +/// `p9-fb-04`, `Aborted`) events. Mirrors the fields persisted into +/// `ingest_runs.progress_json` so external tooling can reconstruct the +/// run's outcome from either side. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct AggregateCounts { + pub scanned: u32, + pub new: u32, + pub updated: u32, + pub skipped: u32, + pub errors: u32, + pub chunks_indexed: u32, + pub embeddings_indexed: u32, +} + +/// One streaming progress event. The CLI's `--json` mode serializes this +/// into the wire-stable `ingest_progress.v1` schema; in-memory consumers +/// (TUI / desktop) take the typed value directly. +/// +/// Ordering invariant per design §2.4a: +/// +/// ```text +/// ScanStarted < ScanCompleted < (AssetStarted < AssetFinished)* +/// < (Completed | Aborted) +/// ``` +/// +/// Embed-batch events (`embed_batch_started` / `embed_batch_finished` +/// in §2.4a) are reserved for a future iteration and are not emitted +/// by this task; the spec calls them out as "임의 위치" (optional). +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum IngestEvent { + /// Workspace walk has started. Emitted before the connector scan + /// returns, so consumers can paint a "scanning…" state immediately + /// even if the workspace is large enough that the scan takes time. + ScanStarted { root: String }, + /// Scan finished; `total` assets are about to be processed. + ScanCompleted { total: u32 }, + /// About to process the `idx`-th asset (1-based). `media` is a + /// short label (`markdown` / `pdf` / `image` / `audio` / `other`). + AssetStarted { + idx: u32, + total: u32, + path: String, + media: String, + }, + /// Finished processing the `idx`-th asset. `result` mirrors the + /// asset's `IngestItemKind`; `chunks` is the number of chunks + /// produced (0 for `Skipped` / `Error`). + AssetFinished { + idx: u32, + total: u32, + result: IngestItemKind, + chunks: u32, + }, + /// Run finished normally. `counts` is the final aggregate. + Completed { counts: AggregateCounts }, + /// Run finished by user cancellation. `counts` is the partial + /// aggregate at the cancel boundary. Emitted by `p9-fb-04`; this + /// task never produces `Aborted`. + Aborted { counts: AggregateCounts }, +} + +/// Map a `MediaType` to the short label used by `IngestEvent::AssetStarted`. +/// Mirrors the §2.4a description text — `markdown` / `pdf` / `image` / +/// `audio` / `other`. +pub fn media_label(media: &kebab_core::MediaType) -> &'static str { + match media { + kebab_core::MediaType::Markdown => "markdown", + kebab_core::MediaType::Pdf => "pdf", + kebab_core::MediaType::Image(_) => "image", + kebab_core::MediaType::Audio(_) => "audio", + kebab_core::MediaType::Other(_) => "other", + } +} + +/// Best-effort send into an optional `mpsc::Sender`. A dropped receiver +/// is silently absorbed — the ingest hot path must not stall on a slow +/// consumer. Logged at `trace` for diagnostics. +pub(crate) fn emit( + progress: Option<&std::sync::mpsc::Sender>, + event: IngestEvent, +) { + if let Some(tx) = progress { + if tx.send(event).is_err() { + tracing::trace!( + target: "kebab-app::ingest_progress", + "progress receiver dropped; event discarded (best-effort send per ingest_progress contract)" + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use kebab_core::MediaType; + + #[test] + fn media_label_covers_every_variant() { + assert_eq!(media_label(&MediaType::Markdown), "markdown"); + assert_eq!(media_label(&MediaType::Pdf), "pdf"); + assert_eq!( + media_label(&MediaType::Image(kebab_core::ImageType::Png)), + "image" + ); + assert_eq!( + media_label(&MediaType::Audio(kebab_core::AudioType::Wav)), + "audio" + ); + assert_eq!(media_label(&MediaType::Other("x".into())), "other"); + } + + #[test] + fn ingest_event_serializes_with_discriminator() { + // The `#[serde(tag = "kind", rename_all = "snake_case")]` + // attribute mirrors §2.4a's wire shape — the CLI's wire layer + // re-tags with `schema_version` on top. + let ev = IngestEvent::AssetStarted { + idx: 1, + total: 10, + path: "notes/foo.md".into(), + media: "markdown".into(), + }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!(v.get("kind").and_then(|s| s.as_str()), Some("asset_started")); + assert_eq!(v.get("idx").and_then(|n| n.as_u64()), Some(1)); + assert_eq!(v.get("total").and_then(|n| n.as_u64()), Some(10)); + assert_eq!(v.get("path").and_then(|s| s.as_str()), Some("notes/foo.md")); + assert_eq!(v.get("media").and_then(|s| s.as_str()), Some("markdown")); + } + + #[test] + fn ingest_event_completed_has_counts() { + let ev = IngestEvent::Completed { + counts: AggregateCounts { + scanned: 5, + new: 2, + ..Default::default() + }, + }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!(v.get("kind").and_then(|s| s.as_str()), Some("completed")); + let counts = v.get("counts").unwrap(); + assert_eq!(counts.get("scanned").and_then(|n| n.as_u64()), Some(5)); + assert_eq!(counts.get("new").and_then(|n| n.as_u64()), Some(2)); + } + + #[test] + fn emit_with_no_sender_is_noop() { + // Compiles + does not panic. Doc-test of the contract. + emit(None, IngestEvent::ScanStarted { root: "/x".into() }); + } + + #[test] + fn emit_with_dropped_receiver_does_not_panic() { + let (tx, rx) = std::sync::mpsc::channel::(); + drop(rx); + emit(Some(&tx), IngestEvent::ScanStarted { root: "/x".into() }); + } + + #[test] + fn emit_delivers_event_to_live_receiver() { + let (tx, rx) = std::sync::mpsc::channel::(); + emit(Some(&tx), IngestEvent::ScanCompleted { total: 42 }); + match rx.try_recv().unwrap() { + IngestEvent::ScanCompleted { total } => assert_eq!(total, 42), + other => panic!("unexpected event: {other:?}"), + } + } +} diff --git a/crates/kebab-app/src/lib.rs b/crates/kebab-app/src/lib.rs index b7877a8..bc39d1a 100644 --- a/crates/kebab-app/src/lib.rs +++ b/crates/kebab-app/src/lib.rs @@ -56,10 +56,12 @@ use kebab_source_fs::FsSourceConnector; mod app; pub mod doctor_signal; +pub mod ingest_progress; pub mod logging; pub mod reset; pub use app::App; +pub use ingest_progress::{AggregateCounts, IngestEvent}; pub use reset::{ResetReport, ResetScope}; /// Parser-version label persisted in `documents.parser_version` for @@ -162,22 +164,57 @@ pub fn ingest(scope: SourceScope, summary_only: bool) -> anyhow::Result anyhow::Result { + ingest_with_config_progress(config, scope, summary_only, None) +} + +/// Config + progress variant — same as [`ingest_with_config`] but the +/// caller may inject an `mpsc::Sender` to receive +/// streaming progress. CLI (`p9-fb-02`) feeds this into the +/// `ingest_progress.v1` line-delimited dump; TUI (`p9-fb-03`) feeds it +/// into the status-bar reducer; either may pass `None` to suppress +/// emission entirely. Send is best-effort — see [`ingest_progress`] +/// for the contract. +#[doc(hidden)] +pub fn ingest_with_config_progress( + config: kebab_config::Config, + scope: SourceScope, + summary_only: bool, + progress: Option>, +) -> anyhow::Result { + let progress = progress.as_ref(); let started_instant = std::time::Instant::now(); let app = App::open_with_config(config)?; // Walk the workspace. + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::ScanStarted { + root: scope.root.to_string_lossy().into_owned(), + }, + ); let connector = FsSourceConnector::new(&app.config) .context("kb-app::ingest: build FsSourceConnector")?; let assets = connector .scan(&scope) .context("kb-app::ingest: scan workspace")?; + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::ScanCompleted { + total: u32::try_from(assets.len()).unwrap_or(u32::MAX), + }, + ); // Embedder + vector store: build once at the top so the cold-start // cost is paid once even when the workspace has 1000 markdown files. @@ -256,7 +293,17 @@ pub fn ingest_with_config( let embed_active = embedder.is_some() && vector_store.is_some(); - for asset in assets { + for (zero_idx, asset) in assets.into_iter().enumerate() { + let idx = u32::try_from(zero_idx + 1).unwrap_or(u32::MAX); + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::AssetStarted { + idx, + total: scanned_count, + path: asset.workspace_path.0.clone(), + media: crate::ingest_progress::media_label(&asset.media_type).to_string(), + }, + ); let item = ingest_one_asset( &app, &asset, @@ -323,6 +370,15 @@ pub fn ingest_with_config( error_count = error_count.saturating_add(1) } } + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::AssetFinished { + idx, + total: scanned_count, + result: item.kind, + chunks: item.chunk_count.unwrap_or(0), + }, + ); items.push(item); } @@ -445,6 +501,21 @@ pub fn ingest_with_config( "kb-app::ingest: run complete" ); + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::Completed { + counts: crate::ingest_progress::AggregateCounts { + scanned: scanned_count, + new: new_count, + updated: updated_count, + skipped: skipped_count, + errors: error_count, + chunks_indexed, + embeddings_indexed, + }, + }, + ); + Ok(IngestReport { scope, scanned: scanned_count, diff --git a/crates/kebab-app/tests/ingest_progress.rs b/crates/kebab-app/tests/ingest_progress.rs new file mode 100644 index 0000000..2c55e7e --- /dev/null +++ b/crates/kebab-app/tests/ingest_progress.rs @@ -0,0 +1,145 @@ +//! Integration coverage for `ingest_with_config_progress` — +//! exercises the streaming progress channel against the same lexical +//! fixture used by `ingest_lexical.rs`. + +mod common; + +use std::sync::mpsc; + +use common::TestEnv; +use kebab_app::{AggregateCounts, IngestEvent}; +use kebab_core::IngestItemKind; + +fn run_with_progress() -> Vec { + let env = TestEnv::lexical_only(); + let (tx, rx) = mpsc::channel::(); + let report = kebab_app::ingest_with_config_progress( + env.config.clone(), + env.scope(), + false, + Some(tx), + ) + .unwrap(); + assert_eq!(report.scanned, 3); + assert_eq!(report.new, 3); + + // Drain until the sender (held inside `ingest_with_config_progress`) + // is dropped on return. + let mut events = Vec::new(); + while let Ok(ev) = rx.recv() { + events.push(ev); + } + events +} + +#[test] +fn progress_event_sequence_matches_design_section_2_4a() { + let events = run_with_progress(); + + // First event: ScanStarted with workspace root. + match &events[0] { + IngestEvent::ScanStarted { root } => { + assert!(!root.is_empty(), "ScanStarted root must be a path"); + } + other => panic!("expected ScanStarted, got {other:?}"), + } + + // Second event: ScanCompleted with total = 3 fixture files. + match &events[1] { + IngestEvent::ScanCompleted { total } => { + assert_eq!(*total, 3, "ScanCompleted total: {events:?}"); + } + other => panic!("expected ScanCompleted, got {other:?}"), + } + + // Final event: Completed with the aggregate counters mirroring the + // returned report. + let last = events.last().expect("at least one event"); + match last { + IngestEvent::Completed { counts } => { + assert_eq!( + *counts, + AggregateCounts { + scanned: 3, + new: 3, + chunks_indexed: counts.chunks_indexed, + embeddings_indexed: 0, + ..Default::default() + }, + "Completed counts: {counts:?}" + ); + assert!(counts.chunks_indexed >= 3, "chunks_indexed: {counts:?}"); + } + other => panic!("expected Completed last, got {other:?}"), + } + + // Middle: 3 AssetStarted/AssetFinished pairs in monotonic idx order. + let asset_events: Vec<&IngestEvent> = events[2..events.len() - 1].iter().collect(); + assert_eq!( + asset_events.len(), + 6, + "expected 3 (Started + Finished) pairs, got {asset_events:?}" + ); + for (chunk_idx, pair) in asset_events.chunks(2).enumerate() { + let expected_idx = chunk_idx as u32 + 1; + match (pair[0], pair[1]) { + ( + IngestEvent::AssetStarted { + idx: si, + total: st, + media, + .. + }, + IngestEvent::AssetFinished { + idx: fi, + total: ft, + result, + chunks, + }, + ) => { + assert_eq!(*si, expected_idx, "Started idx mismatch: {pair:?}"); + assert_eq!(*fi, expected_idx, "Finished idx mismatch: {pair:?}"); + assert_eq!(*st, 3, "Started total mismatch"); + assert_eq!(*ft, 3, "Finished total mismatch"); + assert_eq!(media, "markdown", "fixture is markdown only"); + assert_eq!(*result, IngestItemKind::New, "first ingest → New"); + assert!(*chunks >= 1, "chunks: {pair:?}"); + } + other => panic!("expected Started+Finished pair, got {other:?}"), + } + } +} + +#[test] +fn ingest_with_config_progress_none_matches_ingest_with_config() { + // Forwarding wrapper: `ingest_with_config(...)` and + // `ingest_with_config_progress(..., None)` must produce identical + // reports modulo wall-clock duration. + let env = TestEnv::lexical_only(); + let r_none = kebab_app::ingest_with_config_progress( + env.config.clone(), + env.scope(), + true, + None, + ) + .unwrap(); + assert_eq!(r_none.scanned, 3); + assert_eq!(r_none.new, 3); +} + +#[test] +fn dropped_receiver_does_not_panic_or_fail_ingest() { + // Best-effort send: if the consumer dies mid-run, ingest must + // still complete normally. + let env = TestEnv::lexical_only(); + let (tx, rx) = mpsc::channel::(); + drop(rx); + let report = kebab_app::ingest_with_config_progress( + env.config.clone(), + env.scope(), + true, + Some(tx), + ) + .unwrap(); + assert_eq!(report.scanned, 3); +} diff --git a/tasks/p9/p9-fb-01-ingest-progress-callback.md b/tasks/p9/p9-fb-01-ingest-progress-callback.md index dcc967e..907fdd2 100644 --- a/tasks/p9/p9-fb-01-ingest-progress-callback.md +++ b/tasks/p9/p9-fb-01-ingest-progress-callback.md @@ -3,7 +3,7 @@ phase: P9 component: kebab-app + kebab-core task_id: p9-fb-01 title: "Ingest progress callback / event channel" -status: planned +status: in_progress depends_on: [] unblocks: [p9-fb-02, p9-fb-03] contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md