feat(app): IngestEvent + ingest_with_config_progress (p9-fb-01) #52
191
crates/kebab-app/src/ingest_progress.rs
Normal file
@@ -0,0 +1,191 @@
|
||||
//! Streaming progress events for `ingest_with_config_progress`.
|
||||
//!
|
||||
//! The facade emits one [`IngestEvent`] per step boundary into an
|
||||
//! optional `mpsc::Sender<IngestEvent>` injected by the caller. CLI
|
||||
//! (`p9-fb-02`), TUI (`p9-fb-03`), and future desktop UI all consume the
|
||||
//! same stream — CLI dumps it as line-delimited JSON
|
||||
//! (`ingest_progress.v1`), TUI feeds it into a status-bar reducer, and
|
||||
//! anyone else can plug in their own receiver.
|
||||
//!
|
||||
//! Send is **best-effort**: a receiver that has been dropped is treated
|
||||
//! as a no-op, never as an error. The ingest hot path must not stall
|
||||
//! on a slow consumer.
|
||||
//!
|
||||
//! Cancellation lands in `p9-fb-04` and adds `IngestEvent::Aborted`
|
||||
//! emission; this task only ever emits `Completed`.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use kebab_core::IngestItemKind;
|
||||
|
||||
/// Aggregate counters surfaced on the terminal `Completed` (and, in
|
||||
/// `p9-fb-04`, `Aborted`) events. Mirrors the fields persisted into
|
||||
/// `ingest_runs.progress_json` so external tooling can reconstruct the
|
||||
/// run's outcome from either side.
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AggregateCounts {
|
||||
pub scanned: u32,
|
||||
pub new: u32,
|
||||
pub updated: u32,
|
||||
pub skipped: u32,
|
||||
pub errors: u32,
|
||||
pub chunks_indexed: u32,
|
||||
pub embeddings_indexed: u32,
|
||||
}
|
||||
|
||||
/// One streaming progress event. The CLI's `--json` mode serializes this
|
||||
/// into the wire-stable `ingest_progress.v1` schema; in-memory consumers
|
||||
/// (TUI / desktop) take the typed value directly.
|
||||
///
|
||||
/// Ordering invariant per design §2.4a:
|
||||
///
|
||||
/// ```text
|
||||
/// ScanStarted < ScanCompleted < (AssetStarted < AssetFinished)*
|
||||
/// < (Completed | Aborted)
|
||||
/// ```
|
||||
///
|
||||
/// Embed-batch events (`embed_batch_started` / `embed_batch_finished`
|
||||
/// in §2.4a) are reserved for a future iteration and are not emitted
|
||||
/// by this task; the spec calls them out as "임의 위치" (optional).
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
pub enum IngestEvent {
|
||||
/// Workspace walk has started. Emitted before the connector scan
|
||||
/// returns, so consumers can paint a "scanning…" state immediately
|
||||
/// even if the workspace is large enough that the scan takes time.
|
||||
ScanStarted { root: String },
|
||||
/// Scan finished; `total` assets are about to be processed.
|
||||
ScanCompleted { total: u32 },
|
||||
/// About to process the `idx`-th asset (1-based). `media` is a
|
||||
/// short label (`markdown` / `pdf` / `image` / `audio` / `other`).
|
||||
AssetStarted {
|
||||
idx: u32,
|
||||
total: u32,
|
||||
path: String,
|
||||
media: String,
|
||||
},
|
||||
/// Finished processing the `idx`-th asset. `result` mirrors the
|
||||
/// asset's `IngestItemKind`; `chunks` is the number of chunks
|
||||
/// produced (0 for `Skipped` / `Error`).
|
||||
AssetFinished {
|
||||
|
|
||||
idx: u32,
|
||||
total: u32,
|
||||
result: IngestItemKind,
|
||||
chunks: u32,
|
||||
},
|
||||
/// Run finished normally. `counts` is the final aggregate.
|
||||
Completed { counts: AggregateCounts },
|
||||
/// Run finished by user cancellation. `counts` is the partial
|
||||
/// aggregate at the cancel boundary. Emitted by `p9-fb-04`; this
|
||||
/// task never produces `Aborted`.
|
||||
Aborted { counts: AggregateCounts },
|
||||
}
|
||||
|
||||
/// Map a `MediaType` to the short label used by `IngestEvent::AssetStarted`.
|
||||
/// Mirrors the §2.4a description text — `markdown` / `pdf` / `image` /
|
||||
/// `audio` / `other`.
|
||||
pub fn media_label(media: &kebab_core::MediaType) -> &'static str {
|
||||
match media {
|
||||
kebab_core::MediaType::Markdown => "markdown",
|
||||
kebab_core::MediaType::Pdf => "pdf",
|
||||
kebab_core::MediaType::Image(_) => "image",
|
||||
kebab_core::MediaType::Audio(_) => "audio",
|
||||
kebab_core::MediaType::Other(_) => "other",
|
||||
}
|
||||
}
|
||||
|
||||
/// Best-effort send into an optional `mpsc::Sender`. A dropped receiver
|
||||
/// is silently absorbed — the ingest hot path must not stall on a slow
|
||||
/// consumer. Logged at `trace` for diagnostics.
|
||||
pub(crate) fn emit(
|
||||
progress: Option<&std::sync::mpsc::Sender<IngestEvent>>,
|
||||
event: IngestEvent,
|
||||
) {
|
||||
if let Some(tx) = progress {
|
||||
if tx.send(event).is_err() {
|
||||
tracing::trace!(
|
||||
target: "kebab-app::ingest_progress",
|
||||
"progress receiver dropped; event discarded (best-effort send per ingest_progress contract)"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use kebab_core::MediaType;
|
||||
|
||||
#[test]
|
||||
|
claude-reviewer-01
commented
(칭찬) (칭찬) `emit` helper 가 `Option<&Sender>` 받아 None 단축 + dropped receiver silent absorb 두 corner case 를 한 함수에 묶음 + lib unit test 3 개 (None / dropped / live) 가 각 분기 cover. ingest 의 hot path 에서 emit 호출 자체가 if-let 한 줄이라 noop overhead 0. integration test 의 "dropped receiver tolerance" 와 짝.
|
||||
fn media_label_covers_every_variant() {
|
||||
assert_eq!(media_label(&MediaType::Markdown), "markdown");
|
||||
assert_eq!(media_label(&MediaType::Pdf), "pdf");
|
||||
assert_eq!(
|
||||
media_label(&MediaType::Image(kebab_core::ImageType::Png)),
|
||||
"image"
|
||||
);
|
||||
assert_eq!(
|
||||
|
claude-reviewer-01
commented
(nit / 메시지 모호) trace! 메시지 Why: 이 trace! 는 producer 측 best-effort fail 의 원인을 추적하는 용도. "caller cooperation" 추측은 미래 reader 에게 노이즈 — 사실 ("receiver dropped") + 영향 ("event discarded") 만 기록. How to apply: 메시지를 (nit / 메시지 모호) trace! 메시지 `progress receiver dropped; suppressing further sends would require caller cooperation` — 두 번째 절이 의미 불명확 ("caller cooperation" 이 어떻게 suppress 를 가능하게 하는가?). 단순 receiver drop 사실만 기록하면 충분.
Why: 이 trace! 는 producer 측 best-effort fail 의 원인을 추적하는 용도. "caller cooperation" 추측은 미래 reader 에게 노이즈 — 사실 ("receiver dropped") + 영향 ("event discarded") 만 기록.
How to apply: 메시지를 `progress receiver dropped; event discarded (best-effort send per ingest_progress contract)` 정도로 단순화. 또는 더 짧게 `progress receiver dropped`.
|
||||
media_label(&MediaType::Audio(kebab_core::AudioType::Wav)),
|
||||
"audio"
|
||||
);
|
||||
assert_eq!(media_label(&MediaType::Other("x".into())), "other");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_event_serializes_with_discriminator() {
|
||||
// The `#[serde(tag = "kind", rename_all = "snake_case")]`
|
||||
// attribute mirrors §2.4a's wire shape — the CLI's wire layer
|
||||
// re-tags with `schema_version` on top.
|
||||
let ev = IngestEvent::AssetStarted {
|
||||
idx: 1,
|
||||
total: 10,
|
||||
path: "notes/foo.md".into(),
|
||||
media: "markdown".into(),
|
||||
};
|
||||
let v = serde_json::to_value(&ev).unwrap();
|
||||
assert_eq!(v.get("kind").and_then(|s| s.as_str()), Some("asset_started"));
|
||||
assert_eq!(v.get("idx").and_then(|n| n.as_u64()), Some(1));
|
||||
assert_eq!(v.get("total").and_then(|n| n.as_u64()), Some(10));
|
||||
assert_eq!(v.get("path").and_then(|s| s.as_str()), Some("notes/foo.md"));
|
||||
assert_eq!(v.get("media").and_then(|s| s.as_str()), Some("markdown"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_event_completed_has_counts() {
|
||||
let ev = IngestEvent::Completed {
|
||||
counts: AggregateCounts {
|
||||
scanned: 5,
|
||||
new: 2,
|
||||
..Default::default()
|
||||
},
|
||||
};
|
||||
let v = serde_json::to_value(&ev).unwrap();
|
||||
assert_eq!(v.get("kind").and_then(|s| s.as_str()), Some("completed"));
|
||||
let counts = v.get("counts").unwrap();
|
||||
assert_eq!(counts.get("scanned").and_then(|n| n.as_u64()), Some(5));
|
||||
assert_eq!(counts.get("new").and_then(|n| n.as_u64()), Some(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_with_no_sender_is_noop() {
|
||||
// Compiles + does not panic. Doc-test of the contract.
|
||||
emit(None, IngestEvent::ScanStarted { root: "/x".into() });
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_with_dropped_receiver_does_not_panic() {
|
||||
let (tx, rx) = std::sync::mpsc::channel::<IngestEvent>();
|
||||
drop(rx);
|
||||
emit(Some(&tx), IngestEvent::ScanStarted { root: "/x".into() });
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_delivers_event_to_live_receiver() {
|
||||
let (tx, rx) = std::sync::mpsc::channel::<IngestEvent>();
|
||||
emit(Some(&tx), IngestEvent::ScanCompleted { total: 42 });
|
||||
match rx.try_recv().unwrap() {
|
||||
IngestEvent::ScanCompleted { total } => assert_eq!(total, 42),
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -56,10 +56,12 @@ use kebab_source_fs::FsSourceConnector;
|
||||
|
||||
mod app;
|
||||
pub mod doctor_signal;
|
||||
pub mod ingest_progress;
|
||||
pub mod logging;
|
||||
pub mod reset;
|
||||
|
||||
pub use app::App;
|
||||
pub use ingest_progress::{AggregateCounts, IngestEvent};
|
||||
pub use reset::{ResetReport, ResetScope};
|
||||
|
||||
/// Parser-version label persisted in `documents.parser_version` for
|
||||
@@ -162,22 +164,57 @@ pub fn ingest(scope: SourceScope, summary_only: bool) -> anyhow::Result<IngestRe
|
||||
/// caller (kb-cli with `--config`, integration tests, TUI session)
|
||||
/// already has a [`kebab_config::Config`] in hand. The public free
|
||||
/// function [`ingest`] wraps this with the XDG-default load.
|
||||
///
|
||||
/// This is the no-progress entry point retained for callers that
|
||||
/// don't care about streaming progress (older tests, future code that
|
||||
/// runs ingest as a one-shot). It forwards into
|
||||
/// [`ingest_with_config_progress`] with `progress = None`.
|
||||
#[doc(hidden)]
|
||||
pub fn ingest_with_config(
|
||||
config: kebab_config::Config,
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
ingest_with_config_progress(config, scope, summary_only, None)
|
||||
}
|
||||
|
||||
/// Config + progress variant — same as [`ingest_with_config`] but the
|
||||
/// caller may inject an `mpsc::Sender<IngestEvent>` to receive
|
||||
/// streaming progress. CLI (`p9-fb-02`) feeds this into the
|
||||
/// `ingest_progress.v1` line-delimited dump; TUI (`p9-fb-03`) feeds it
|
||||
/// into the status-bar reducer; either may pass `None` to suppress
|
||||
/// emission entirely. Send is best-effort — see [`ingest_progress`]
|
||||
/// for the contract.
|
||||
#[doc(hidden)]
|
||||
pub fn ingest_with_config_progress(
|
||||
config: kebab_config::Config,
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
let progress = progress.as_ref();
|
||||
let started_instant = std::time::Instant::now();
|
||||
|
||||
let app = App::open_with_config(config)?;
|
||||
|
||||
// Walk the workspace.
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::ScanStarted {
|
||||
root: scope.root.to_string_lossy().into_owned(),
|
||||
},
|
||||
);
|
||||
let connector = FsSourceConnector::new(&app.config)
|
||||
.context("kb-app::ingest: build FsSourceConnector")?;
|
||||
let assets = connector
|
||||
.scan(&scope)
|
||||
.context("kb-app::ingest: scan workspace")?;
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::ScanCompleted {
|
||||
total: u32::try_from(assets.len()).unwrap_or(u32::MAX),
|
||||
},
|
||||
);
|
||||
|
||||
// Embedder + vector store: build once at the top so the cold-start
|
||||
// cost is paid once even when the workspace has 1000 markdown files.
|
||||
@@ -256,7 +293,17 @@ pub fn ingest_with_config(
|
||||
|
||||
let embed_active = embedder.is_some() && vector_store.is_some();
|
||||
|
||||
for asset in assets {
|
||||
for (zero_idx, asset) in assets.into_iter().enumerate() {
|
||||
let idx = u32::try_from(zero_idx + 1).unwrap_or(u32::MAX);
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetStarted {
|
||||
idx,
|
||||
total: scanned_count,
|
||||
path: asset.workspace_path.0.clone(),
|
||||
media: crate::ingest_progress::media_label(&asset.media_type).to_string(),
|
||||
},
|
||||
);
|
||||
let item = ingest_one_asset(
|
||||
&app,
|
||||
&asset,
|
||||
@@ -323,6 +370,15 @@ pub fn ingest_with_config(
|
||||
error_count = error_count.saturating_add(1)
|
||||
}
|
||||
}
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetFinished {
|
||||
idx,
|
||||
total: scanned_count,
|
||||
result: item.kind,
|
||||
chunks: item.chunk_count.unwrap_or(0),
|
||||
},
|
||||
);
|
||||
items.push(item);
|
||||
}
|
||||
|
||||
@@ -445,6 +501,21 @@ pub fn ingest_with_config(
|
||||
"kb-app::ingest: run complete"
|
||||
);
|
||||
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::Completed {
|
||||
counts: crate::ingest_progress::AggregateCounts {
|
||||
scanned: scanned_count,
|
||||
new: new_count,
|
||||
updated: updated_count,
|
||||
skipped: skipped_count,
|
||||
errors: error_count,
|
||||
chunks_indexed,
|
||||
embeddings_indexed,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
Ok(IngestReport {
|
||||
scope,
|
||||
scanned: scanned_count,
|
||||
|
||||
145
crates/kebab-app/tests/ingest_progress.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
//! Integration coverage for `ingest_with_config_progress` —
|
||||
//! exercises the streaming progress channel against the same lexical
|
||||
//! fixture used by `ingest_lexical.rs`.
|
||||
|
||||
mod common;
|
||||
|
||||
use std::sync::mpsc;
|
||||
|
||||
use common::TestEnv;
|
||||
use kebab_app::{AggregateCounts, IngestEvent};
|
||||
use kebab_core::IngestItemKind;
|
||||
|
||||
fn run_with_progress() -> Vec<IngestEvent> {
|
||||
let env = TestEnv::lexical_only();
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
let report = kebab_app::ingest_with_config_progress(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
false,
|
||||
Some(tx),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(report.scanned, 3);
|
||||
assert_eq!(report.new, 3);
|
||||
|
||||
// Drain until the sender (held inside `ingest_with_config_progress`)
|
||||
// is dropped on return.
|
||||
let mut events = Vec::new();
|
||||
while let Ok(ev) = rx.recv() {
|
||||
events.push(ev);
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn progress_event_sequence_matches_design_section_2_4a() {
|
||||
let events = run_with_progress();
|
||||
|
||||
// First event: ScanStarted with workspace root.
|
||||
match &events[0] {
|
||||
IngestEvent::ScanStarted { root } => {
|
||||
assert!(!root.is_empty(), "ScanStarted root must be a path");
|
||||
}
|
||||
other => panic!("expected ScanStarted, got {other:?}"),
|
||||
}
|
||||
|
||||
// Second event: ScanCompleted with total = 3 fixture files.
|
||||
match &events[1] {
|
||||
IngestEvent::ScanCompleted { total } => {
|
||||
assert_eq!(*total, 3, "ScanCompleted total: {events:?}");
|
||||
}
|
||||
other => panic!("expected ScanCompleted, got {other:?}"),
|
||||
}
|
||||
|
||||
// Final event: Completed with the aggregate counters mirroring the
|
||||
// returned report.
|
||||
let last = events.last().expect("at least one event");
|
||||
match last {
|
||||
IngestEvent::Completed { counts } => {
|
||||
assert_eq!(
|
||||
*counts,
|
||||
AggregateCounts {
|
||||
scanned: 3,
|
||||
new: 3,
|
||||
chunks_indexed: counts.chunks_indexed,
|
||||
embeddings_indexed: 0,
|
||||
..Default::default()
|
||||
},
|
||||
"Completed counts: {counts:?}"
|
||||
);
|
||||
assert!(counts.chunks_indexed >= 3, "chunks_indexed: {counts:?}");
|
||||
}
|
||||
other => panic!("expected Completed last, got {other:?}"),
|
||||
}
|
||||
|
||||
// Middle: 3 AssetStarted/AssetFinished pairs in monotonic idx order.
|
||||
let asset_events: Vec<&IngestEvent> = events[2..events.len() - 1].iter().collect();
|
||||
assert_eq!(
|
||||
asset_events.len(),
|
||||
6,
|
||||
"expected 3 (Started + Finished) pairs, got {asset_events:?}"
|
||||
);
|
||||
for (chunk_idx, pair) in asset_events.chunks(2).enumerate() {
|
||||
let expected_idx = chunk_idx as u32 + 1;
|
||||
match (pair[0], pair[1]) {
|
||||
(
|
||||
IngestEvent::AssetStarted {
|
||||
idx: si,
|
||||
total: st,
|
||||
media,
|
||||
..
|
||||
},
|
||||
IngestEvent::AssetFinished {
|
||||
idx: fi,
|
||||
total: ft,
|
||||
result,
|
||||
chunks,
|
||||
},
|
||||
) => {
|
||||
assert_eq!(*si, expected_idx, "Started idx mismatch: {pair:?}");
|
||||
assert_eq!(*fi, expected_idx, "Finished idx mismatch: {pair:?}");
|
||||
assert_eq!(*st, 3, "Started total mismatch");
|
||||
assert_eq!(*ft, 3, "Finished total mismatch");
|
||||
assert_eq!(media, "markdown", "fixture is markdown only");
|
||||
assert_eq!(*result, IngestItemKind::New, "first ingest → New");
|
||||
assert!(*chunks >= 1, "chunks: {pair:?}");
|
||||
}
|
||||
other => panic!("expected Started+Finished pair, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_with_config_progress_none_matches_ingest_with_config() {
|
||||
// Forwarding wrapper: `ingest_with_config(...)` and
|
||||
// `ingest_with_config_progress(..., None)` must produce identical
|
||||
// reports modulo wall-clock duration.
|
||||
let env = TestEnv::lexical_only();
|
||||
let r_none = kebab_app::ingest_with_config_progress(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(r_none.scanned, 3);
|
||||
assert_eq!(r_none.new, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dropped_receiver_does_not_panic_or_fail_ingest() {
|
||||
// Best-effort send: if the consumer dies mid-run, ingest must
|
||||
// still complete normally.
|
||||
let env = TestEnv::lexical_only();
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
drop(rx);
|
||||
let report = kebab_app::ingest_with_config_progress(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
true,
|
||||
Some(tx),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(report.scanned, 3);
|
||||
}
|
||||
@@ -3,7 +3,7 @@ phase: P9
|
||||
component: kebab-app + kebab-core
|
||||
task_id: p9-fb-01
|
||||
title: "Ingest progress callback / event channel"
|
||||
status: planned
|
||||
status: in_progress
|
||||
depends_on: []
|
||||
unblocks: [p9-fb-02, p9-fb-03]
|
||||
contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md
|
||||
|
||||
(칭찬)
#[serde(tag = "kind", rename_all = "snake_case")]가 wire schemaingest_progress.v1의 discriminator + 필드명과 1:1 일치 —kind = "asset_started"/idx/total/path/media자동. CLI 의 wire layer 가schema_version만 한 줄 더 붙이면 spec §2.4a 와 동일 JSON 산출. 미래 변경 시 (variants 추가 / 이름 변경) serde tag 만 보면 wire 영향 즉시 추적.