feat(app): IngestEvent + ingest_with_config_progress (p9-fb-01) #52

Merged
altair823 merged 2 commits from feat/p9-fb-01-progress into main 2026-05-02 19:47:50 +00:00
4 changed files with 409 additions and 2 deletions

View File

@@ -0,0 +1,191 @@
//! Streaming progress events for `ingest_with_config_progress`.
//!
//! The facade emits one [`IngestEvent`] per step boundary into an
//! optional `mpsc::Sender<IngestEvent>` injected by the caller. CLI
//! (`p9-fb-02`), TUI (`p9-fb-03`), and future desktop UI all consume the
//! same stream — CLI dumps it as line-delimited JSON
//! (`ingest_progress.v1`), TUI feeds it into a status-bar reducer, and
//! anyone else can plug in their own receiver.
//!
//! Send is **best-effort**: a receiver that has been dropped is treated
//! as a no-op, never as an error. The ingest hot path must not stall
//! on a slow consumer.
//!
//! Cancellation lands in `p9-fb-04` and adds `IngestEvent::Aborted`
//! emission; this task only ever emits `Completed`.
use serde::{Deserialize, Serialize};
use kebab_core::IngestItemKind;
/// Aggregate counters surfaced on the terminal `Completed` (and, in
/// `p9-fb-04`, `Aborted`) events. Mirrors the fields persisted into
/// `ingest_runs.progress_json` so external tooling can reconstruct the
/// run's outcome from either side.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct AggregateCounts {
pub scanned: u32,
pub new: u32,
pub updated: u32,
pub skipped: u32,
pub errors: u32,
pub chunks_indexed: u32,
pub embeddings_indexed: u32,
}
/// One streaming progress event. The CLI's `--json` mode serializes this
/// into the wire-stable `ingest_progress.v1` schema; in-memory consumers
/// (TUI / desktop) take the typed value directly.
///
/// Ordering invariant per design §2.4a:
///
/// ```text
/// ScanStarted < ScanCompleted < (AssetStarted < AssetFinished)*
/// < (Completed | Aborted)
/// ```
///
/// Embed-batch events (`embed_batch_started` / `embed_batch_finished`
/// in §2.4a) are reserved for a future iteration and are not emitted
/// by this task; the spec calls them out as "임의 위치" (optional).
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum IngestEvent {
/// Workspace walk has started. Emitted before the connector scan
/// returns, so consumers can paint a "scanning…" state immediately
/// even if the workspace is large enough that the scan takes time.
ScanStarted { root: String },
/// Scan finished; `total` assets are about to be processed.
ScanCompleted { total: u32 },
/// About to process the `idx`-th asset (1-based). `media` is a
/// short label (`markdown` / `pdf` / `image` / `audio` / `other`).
AssetStarted {
idx: u32,
total: u32,
path: String,
media: String,
},
/// Finished processing the `idx`-th asset. `result` mirrors the
/// asset's `IngestItemKind`; `chunks` is the number of chunks
/// produced (0 for `Skipped` / `Error`).
AssetFinished {

(칭찬) #[serde(tag = "kind", rename_all = "snake_case")] 가 wire schema ingest_progress.v1 의 discriminator + 필드명과 1:1 일치 — kind = "asset_started" / idx / total / path / media 자동. CLI 의 wire layer 가 schema_version 만 한 줄 더 붙이면 spec §2.4a 와 동일 JSON 산출. 미래 변경 시 (variants 추가 / 이름 변경) serde tag 만 보면 wire 영향 즉시 추적.

(칭찬) `#[serde(tag = "kind", rename_all = "snake_case")]` 가 wire schema `ingest_progress.v1` 의 discriminator + 필드명과 1:1 일치 — `kind = "asset_started"` / `idx` / `total` / `path` / `media` 자동. CLI 의 wire layer 가 `schema_version` 만 한 줄 더 붙이면 spec §2.4a 와 동일 JSON 산출. 미래 변경 시 (variants 추가 / 이름 변경) serde tag 만 보면 wire 영향 즉시 추적.
idx: u32,
total: u32,
result: IngestItemKind,
chunks: u32,
},
/// Run finished normally. `counts` is the final aggregate.
Completed { counts: AggregateCounts },
/// Run finished by user cancellation. `counts` is the partial
/// aggregate at the cancel boundary. Emitted by `p9-fb-04`; this
/// task never produces `Aborted`.
Aborted { counts: AggregateCounts },
}
/// Map a `MediaType` to the short label used by `IngestEvent::AssetStarted`.
/// Mirrors the §2.4a description text — `markdown` / `pdf` / `image` /
/// `audio` / `other`.
pub fn media_label(media: &kebab_core::MediaType) -> &'static str {
match media {
kebab_core::MediaType::Markdown => "markdown",
kebab_core::MediaType::Pdf => "pdf",
kebab_core::MediaType::Image(_) => "image",
kebab_core::MediaType::Audio(_) => "audio",
kebab_core::MediaType::Other(_) => "other",
}
}
/// Best-effort send into an optional `mpsc::Sender`. A dropped receiver
/// is silently absorbed — the ingest hot path must not stall on a slow
/// consumer. Logged at `trace` for diagnostics.
pub(crate) fn emit(
progress: Option<&std::sync::mpsc::Sender<IngestEvent>>,
event: IngestEvent,
) {
if let Some(tx) = progress {
if tx.send(event).is_err() {
tracing::trace!(
target: "kebab-app::ingest_progress",
"progress receiver dropped; event discarded (best-effort send per ingest_progress contract)"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use kebab_core::MediaType;
#[test]

(칭찬) emit helper 가 Option<&Sender> 받아 None 단축 + dropped receiver silent absorb 두 corner case 를 한 함수에 묶음 + lib unit test 3 개 (None / dropped / live) 가 각 분기 cover. ingest 의 hot path 에서 emit 호출 자체가 if-let 한 줄이라 noop overhead 0. integration test 의 "dropped receiver tolerance" 와 짝.

(칭찬) `emit` helper 가 `Option<&Sender>` 받아 None 단축 + dropped receiver silent absorb 두 corner case 를 한 함수에 묶음 + lib unit test 3 개 (None / dropped / live) 가 각 분기 cover. ingest 의 hot path 에서 emit 호출 자체가 if-let 한 줄이라 noop overhead 0. integration test 의 "dropped receiver tolerance" 와 짝.
fn media_label_covers_every_variant() {
assert_eq!(media_label(&MediaType::Markdown), "markdown");
assert_eq!(media_label(&MediaType::Pdf), "pdf");
assert_eq!(
media_label(&MediaType::Image(kebab_core::ImageType::Png)),
"image"
);
assert_eq!(

(nit / 메시지 모호) trace! 메시지 progress receiver dropped; suppressing further sends would require caller cooperation — 두 번째 절이 의미 불명확 ("caller cooperation" 이 어떻게 suppress 를 가능하게 하는가?). 단순 receiver drop 사실만 기록하면 충분.

Why: 이 trace! 는 producer 측 best-effort fail 의 원인을 추적하는 용도. "caller cooperation" 추측은 미래 reader 에게 노이즈 — 사실 ("receiver dropped") + 영향 ("event discarded") 만 기록.

How to apply: 메시지를 progress receiver dropped; event discarded (best-effort send per ingest_progress contract) 정도로 단순화. 또는 더 짧게 progress receiver dropped.

(nit / 메시지 모호) trace! 메시지 `progress receiver dropped; suppressing further sends would require caller cooperation` — 두 번째 절이 의미 불명확 ("caller cooperation" 이 어떻게 suppress 를 가능하게 하는가?). 단순 receiver drop 사실만 기록하면 충분. Why: 이 trace! 는 producer 측 best-effort fail 의 원인을 추적하는 용도. "caller cooperation" 추측은 미래 reader 에게 노이즈 — 사실 ("receiver dropped") + 영향 ("event discarded") 만 기록. How to apply: 메시지를 `progress receiver dropped; event discarded (best-effort send per ingest_progress contract)` 정도로 단순화. 또는 더 짧게 `progress receiver dropped`.
media_label(&MediaType::Audio(kebab_core::AudioType::Wav)),
"audio"
);
assert_eq!(media_label(&MediaType::Other("x".into())), "other");
}
#[test]
fn ingest_event_serializes_with_discriminator() {
// The `#[serde(tag = "kind", rename_all = "snake_case")]`
// attribute mirrors §2.4a's wire shape — the CLI's wire layer
// re-tags with `schema_version` on top.
let ev = IngestEvent::AssetStarted {
idx: 1,
total: 10,
path: "notes/foo.md".into(),
media: "markdown".into(),
};
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v.get("kind").and_then(|s| s.as_str()), Some("asset_started"));
assert_eq!(v.get("idx").and_then(|n| n.as_u64()), Some(1));
assert_eq!(v.get("total").and_then(|n| n.as_u64()), Some(10));
assert_eq!(v.get("path").and_then(|s| s.as_str()), Some("notes/foo.md"));
assert_eq!(v.get("media").and_then(|s| s.as_str()), Some("markdown"));
}
#[test]
fn ingest_event_completed_has_counts() {
let ev = IngestEvent::Completed {
counts: AggregateCounts {
scanned: 5,
new: 2,
..Default::default()
},
};
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v.get("kind").and_then(|s| s.as_str()), Some("completed"));
let counts = v.get("counts").unwrap();
assert_eq!(counts.get("scanned").and_then(|n| n.as_u64()), Some(5));
assert_eq!(counts.get("new").and_then(|n| n.as_u64()), Some(2));
}
#[test]
fn emit_with_no_sender_is_noop() {
// Compiles + does not panic. Doc-test of the contract.
emit(None, IngestEvent::ScanStarted { root: "/x".into() });
}
#[test]
fn emit_with_dropped_receiver_does_not_panic() {
let (tx, rx) = std::sync::mpsc::channel::<IngestEvent>();
drop(rx);
emit(Some(&tx), IngestEvent::ScanStarted { root: "/x".into() });
}
#[test]
fn emit_delivers_event_to_live_receiver() {
let (tx, rx) = std::sync::mpsc::channel::<IngestEvent>();
emit(Some(&tx), IngestEvent::ScanCompleted { total: 42 });
match rx.try_recv().unwrap() {
IngestEvent::ScanCompleted { total } => assert_eq!(total, 42),
other => panic!("unexpected event: {other:?}"),
}
}
}

View File

@@ -56,10 +56,12 @@ use kebab_source_fs::FsSourceConnector;
mod app;
pub mod doctor_signal;
pub mod ingest_progress;
pub mod logging;
pub mod reset;
pub use app::App;
pub use ingest_progress::{AggregateCounts, IngestEvent};
pub use reset::{ResetReport, ResetScope};
/// Parser-version label persisted in `documents.parser_version` for
@@ -162,22 +164,57 @@ pub fn ingest(scope: SourceScope, summary_only: bool) -> anyhow::Result<IngestRe
/// caller (kb-cli with `--config`, integration tests, TUI session)
/// already has a [`kebab_config::Config`] in hand. The public free
/// function [`ingest`] wraps this with the XDG-default load.
///
/// This is the no-progress entry point retained for callers that
/// don't care about streaming progress (older tests, future code that
/// runs ingest as a one-shot). It forwards into
/// [`ingest_with_config_progress`] with `progress = None`.
#[doc(hidden)]
pub fn ingest_with_config(
config: kebab_config::Config,
scope: SourceScope,
summary_only: bool,
) -> anyhow::Result<IngestReport> {
ingest_with_config_progress(config, scope, summary_only, None)
}
/// Config + progress variant — same as [`ingest_with_config`] but the
/// caller may inject an `mpsc::Sender<IngestEvent>` to receive
/// streaming progress. CLI (`p9-fb-02`) feeds this into the
/// `ingest_progress.v1` line-delimited dump; TUI (`p9-fb-03`) feeds it
/// into the status-bar reducer; either may pass `None` to suppress
/// emission entirely. Send is best-effort — see [`ingest_progress`]
/// for the contract.
#[doc(hidden)]
pub fn ingest_with_config_progress(
config: kebab_config::Config,
scope: SourceScope,
summary_only: bool,
progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
) -> anyhow::Result<IngestReport> {
let progress = progress.as_ref();
let started_instant = std::time::Instant::now();
let app = App::open_with_config(config)?;
// Walk the workspace.
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::ScanStarted {
root: scope.root.to_string_lossy().into_owned(),
},
);
let connector = FsSourceConnector::new(&app.config)
.context("kb-app::ingest: build FsSourceConnector")?;
let assets = connector
.scan(&scope)
.context("kb-app::ingest: scan workspace")?;
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::ScanCompleted {
total: u32::try_from(assets.len()).unwrap_or(u32::MAX),
},
);
// Embedder + vector store: build once at the top so the cold-start
// cost is paid once even when the workspace has 1000 markdown files.
@@ -256,7 +293,17 @@ pub fn ingest_with_config(
let embed_active = embedder.is_some() && vector_store.is_some();
for asset in assets {
for (zero_idx, asset) in assets.into_iter().enumerate() {
let idx = u32::try_from(zero_idx + 1).unwrap_or(u32::MAX);
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::AssetStarted {
idx,
total: scanned_count,
path: asset.workspace_path.0.clone(),
media: crate::ingest_progress::media_label(&asset.media_type).to_string(),
},
);
let item = ingest_one_asset(
&app,
&asset,
@@ -323,6 +370,15 @@ pub fn ingest_with_config(
error_count = error_count.saturating_add(1)
}
}
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::AssetFinished {
idx,
total: scanned_count,
result: item.kind,
chunks: item.chunk_count.unwrap_or(0),
},
);
items.push(item);
}
@@ -445,6 +501,21 @@ pub fn ingest_with_config(
"kb-app::ingest: run complete"
);
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::Completed {
counts: crate::ingest_progress::AggregateCounts {
scanned: scanned_count,
new: new_count,
updated: updated_count,
skipped: skipped_count,
errors: error_count,
chunks_indexed,
embeddings_indexed,
},
},
);
Ok(IngestReport {
scope,
scanned: scanned_count,

View File

@@ -0,0 +1,145 @@
//! Integration coverage for `ingest_with_config_progress` —
//! exercises the streaming progress channel against the same lexical
//! fixture used by `ingest_lexical.rs`.
mod common;
use std::sync::mpsc;
use common::TestEnv;
use kebab_app::{AggregateCounts, IngestEvent};
use kebab_core::IngestItemKind;
fn run_with_progress() -> Vec<IngestEvent> {
let env = TestEnv::lexical_only();
let (tx, rx) = mpsc::channel::<IngestEvent>();
let report = kebab_app::ingest_with_config_progress(
env.config.clone(),
env.scope(),
false,
Some(tx),
)
.unwrap();
assert_eq!(report.scanned, 3);
assert_eq!(report.new, 3);
// Drain until the sender (held inside `ingest_with_config_progress`)
// is dropped on return.
let mut events = Vec::new();
while let Ok(ev) = rx.recv() {
events.push(ev);
}
events
}
#[test]
fn progress_event_sequence_matches_design_section_2_4a() {
let events = run_with_progress();
// First event: ScanStarted with workspace root.
match &events[0] {
IngestEvent::ScanStarted { root } => {
assert!(!root.is_empty(), "ScanStarted root must be a path");
}
other => panic!("expected ScanStarted, got {other:?}"),
}
// Second event: ScanCompleted with total = 3 fixture files.
match &events[1] {
IngestEvent::ScanCompleted { total } => {
assert_eq!(*total, 3, "ScanCompleted total: {events:?}");
}
other => panic!("expected ScanCompleted, got {other:?}"),
}
// Final event: Completed with the aggregate counters mirroring the
// returned report.
let last = events.last().expect("at least one event");
match last {
IngestEvent::Completed { counts } => {
assert_eq!(
*counts,
AggregateCounts {
scanned: 3,
new: 3,
chunks_indexed: counts.chunks_indexed,
embeddings_indexed: 0,
..Default::default()
},
"Completed counts: {counts:?}"
);
assert!(counts.chunks_indexed >= 3, "chunks_indexed: {counts:?}");
}
other => panic!("expected Completed last, got {other:?}"),
}
// Middle: 3 AssetStarted/AssetFinished pairs in monotonic idx order.
let asset_events: Vec<&IngestEvent> = events[2..events.len() - 1].iter().collect();
assert_eq!(
asset_events.len(),
6,
"expected 3 (Started + Finished) pairs, got {asset_events:?}"
);
for (chunk_idx, pair) in asset_events.chunks(2).enumerate() {
let expected_idx = chunk_idx as u32 + 1;
match (pair[0], pair[1]) {
(
IngestEvent::AssetStarted {
idx: si,
total: st,
media,
..
},
IngestEvent::AssetFinished {
idx: fi,
total: ft,
result,
chunks,
},
) => {
assert_eq!(*si, expected_idx, "Started idx mismatch: {pair:?}");
assert_eq!(*fi, expected_idx, "Finished idx mismatch: {pair:?}");
assert_eq!(*st, 3, "Started total mismatch");
assert_eq!(*ft, 3, "Finished total mismatch");
assert_eq!(media, "markdown", "fixture is markdown only");
assert_eq!(*result, IngestItemKind::New, "first ingest → New");
assert!(*chunks >= 1, "chunks: {pair:?}");
}
other => panic!("expected Started+Finished pair, got {other:?}"),
}
}
}
#[test]
fn ingest_with_config_progress_none_matches_ingest_with_config() {
// Forwarding wrapper: `ingest_with_config(...)` and
// `ingest_with_config_progress(..., None)` must produce identical
// reports modulo wall-clock duration.
let env = TestEnv::lexical_only();
let r_none = kebab_app::ingest_with_config_progress(
env.config.clone(),
env.scope(),
true,
None,
)
.unwrap();
assert_eq!(r_none.scanned, 3);
assert_eq!(r_none.new, 3);
}
#[test]
fn dropped_receiver_does_not_panic_or_fail_ingest() {
// Best-effort send: if the consumer dies mid-run, ingest must
// still complete normally.
let env = TestEnv::lexical_only();
let (tx, rx) = mpsc::channel::<IngestEvent>();
drop(rx);
let report = kebab_app::ingest_with_config_progress(
env.config.clone(),
env.scope(),
true,
Some(tx),
)
.unwrap();
assert_eq!(report.scanned, 3);
}

View File

@@ -3,7 +3,7 @@ phase: P9
component: kebab-app + kebab-core
task_id: p9-fb-01
title: "Ingest progress callback / event channel"
status: planned
status: in_progress
depends_on: []
unblocks: [p9-fb-02, p9-fb-03]
contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md