feat(ingest): asset 내부 phase 진행 로깅 (asset_chunked/expansion_progress/asset_timings) + v0.24.0

asset(문서) 단위뿐이던 ingest 진행 이벤트에 문서 내부 phase 가시성을 추가.
큰 문서가 expansion(별칭 LLM, 청크당 순차)으로 수십 분 걸려도 진행바가
1/N 에 멈춘 듯 보이던 문제 해결.

wire ingest_progress.v1 additive (backward-compat):
- asset_chunked {idx,total,chunks} — 청킹 직후, markdown/image/pdf 전 경로
- expansion_progress {idx,total,done,chunks} — expansion 루프 스로틀
  (25청크 또는 1s, 종료 시 done==chunks). 캐시 히트도 done 에 포함
- asset_timings {idx,total,parse_ms,chunk_ms,expansion_ms,embed_ms,store_ms}
  — markdown 경로 phase별 wall-clock

설계: timing 은 kebab_core::IngestItem(wire-stable) 변경을 피해 신규
AssetTimings 이벤트로 ingest_one_asset 가 직접 emit (AssetFinished 무변경).

CLI(progress.rs): 진행바 sub-message(→ N chunks / 별칭 확장 done/chunks) +
asset 종료 시 phase timing 한 줄(fmt_ms). TUI reducer no-op arm.

검증: clippy -D warnings exit 0; cargo test -p kebab-app -p kebab-cli
312 passed/0 failed. ordering-invariant 테스트 재작성 + 신규 직렬화 테스트.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-02 13:58:27 +00:00
parent 581e1d5d55
commit a48b055358
10 changed files with 454 additions and 62 deletions

View File

@@ -47,11 +47,21 @@ pub struct AggregateCounts {
///
/// ```text
/// ScanStarted < ScanCompleted
/// < (AssetStarted [< (PdfOcrStarted < PdfOcrFinished)*] < AssetFinished)*
/// < ( AssetStarted
/// [< (PdfOcrStarted < PdfOcrFinished)*]
/// [< AssetChunked]
/// [< ExpansionProgress*]
/// [< AssetTimings]
/// < AssetFinished )*
/// < (Completed | Aborted)
/// ```
///
/// `[]` = optional, per-PDF asset only (v0.20.0 sub-item 1).
/// `[]` = optional. `PdfOcr*` is per-PDF asset only (v0.20.0 sub-item 1).
/// `AssetChunked` / `ExpansionProgress` / `AssetTimings` are the v0.24.0
/// asset-internal phase events: `AssetChunked` fires once right after
/// chunking (markdown / image / PDF); `ExpansionProgress` is a throttled
/// counter through the alias-expansion loop (markdown, expansion enabled
/// only); `AssetTimings` reports per-phase wall-clock once (markdown only).
///
/// Embed-batch events (`embed_batch_started` / `embed_batch_finished`
/// in §2.4a) are reserved for a future iteration and are not emitted
@@ -82,6 +92,41 @@ pub enum IngestEvent {
result: IngestItemKind,
chunks: u32,
},
/// v0.24.0 (additive): emitted right after an asset is chunked, before
/// expansion / embed / store. Surfaces "this document is N chunks"
/// immediately so a single large document no longer looks frozen at
/// `idx/total` while its per-chunk phases churn. `chunks` is the chunk
/// count for asset `idx`.
AssetChunked { idx: u32, total: u32, chunks: u32 },
/// v0.24.0 (additive): throttled progress through the per-chunk
/// expansion (alias-LLM) loop — the slowest inner phase for large
/// documents (~14s per chunk against a remote GPU Ollama). `done` is
/// the number of chunks processed so far (cache hits included, so the
/// counter still advances on a warm re-run); `chunks` is the asset's
/// total chunk count. Emitted at most every 25 chunks or once per
/// second (see the loop in `ingest_one_asset`), plus a final
/// `done == chunks` frame.
ExpansionProgress {
idx: u32,
total: u32,
done: u32,
chunks: u32,
},
/// v0.24.0 (additive): per-phase wall-clock (milliseconds) for asset
/// `idx`, emitted once the asset's markdown pipeline finishes. Lets a
/// user see *where* the time went (parse / chunk / expansion / embed /
/// store) without parsing logs. Only the markdown path emits this; the
/// image / PDF paths surface `AssetChunked` but skip phase timing (their
/// phase shapes differ — OCR / caption rather than expansion).
AssetTimings {
idx: u32,
total: u32,
parse_ms: u64,
chunk_ms: u64,
expansion_ms: u64,
embed_ms: u64,
store_ms: u64,
},
/// Run finished normally. `counts` is the final aggregate.
Completed { counts: AggregateCounts },
/// Run finished by user cancellation. `counts` is the partial
@@ -199,6 +244,79 @@ mod tests {
assert_eq!(v.get("media").and_then(|s| s.as_str()), Some("markdown"));
}
#[test]
fn asset_chunked_serializes_with_discriminator() {
// v0.24.0 additive variant — `kind` must be snake_case
// `asset_chunked` so wire v1 consumers branch on it cleanly.
let ev = IngestEvent::AssetChunked {
idx: 3,
total: 10,
chunks: 142,
};
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(
v.get("kind").and_then(|s| s.as_str()),
Some("asset_chunked")
);
assert_eq!(v.get("idx").and_then(serde_json::Value::as_u64), Some(3));
assert_eq!(
v.get("chunks").and_then(serde_json::Value::as_u64),
Some(142)
);
}
#[test]
fn expansion_progress_serializes_with_discriminator() {
let ev = IngestEvent::ExpansionProgress {
idx: 1,
total: 5,
done: 25,
chunks: 200,
};
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(
v.get("kind").and_then(|s| s.as_str()),
Some("expansion_progress")
);
assert_eq!(v.get("done").and_then(serde_json::Value::as_u64), Some(25));
assert_eq!(
v.get("chunks").and_then(serde_json::Value::as_u64),
Some(200)
);
}
#[test]
fn asset_timings_serializes_all_phase_fields() {
let ev = IngestEvent::AssetTimings {
idx: 2,
total: 7,
parse_ms: 12,
chunk_ms: 3,
expansion_ms: 45_000,
embed_ms: 800,
store_ms: 20,
};
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(
v.get("kind").and_then(|s| s.as_str()),
Some("asset_timings")
);
// All five phase fields are present (plain u64, always serialized).
for (field, want) in [
("parse_ms", 12u64),
("chunk_ms", 3),
("expansion_ms", 45_000),
("embed_ms", 800),
("store_ms", 20),
] {
assert_eq!(
v.get(field).and_then(serde_json::Value::as_u64),
Some(want),
"field {field}"
);
}
}
#[test]
fn ingest_event_completed_has_counts() {
let ev = IngestEvent::Completed {

View File

@@ -480,6 +480,8 @@ pub fn ingest_with_config_opts(
let item = ingest_one_asset(
&app,
&asset,
idx,
scanned_count,
&parser_version,
&chunk_policy,
embedder.as_ref(),
@@ -1100,6 +1102,8 @@ fn embed_with_cache(
fn ingest_one_asset(
app: &App,
asset: &RawAsset,
idx: u32,
total: u32,
parser_version: &ParserVersion,
chunk_policy: &ChunkPolicy,
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
@@ -1132,18 +1136,23 @@ fn ingest_one_asset(
return ingest_one_image_asset(
app,
asset,
idx,
total,
chunk_policy,
embedder,
vector_store,
existing_doc_ids,
image_pipeline,
force_reingest,
progress,
);
}
MediaType::Pdf => {
return ingest_one_pdf_asset(
app,
asset,
idx,
total,
chunk_policy,
embedder,
vector_store,
@@ -1252,6 +1261,10 @@ fn ingest_one_asset(
return Ok(item);
}
// v0.24.0 phase timing: parse spans from here (byte read) through
// `build_canonical_document`, i.e. everything before the chunker runs.
let t_parse = std::time::Instant::now();
let bytes = std::fs::read(&path)
.with_context(|| format!("read asset bytes from {}", path.display()))?;
@@ -1286,9 +1299,26 @@ fn ingest_one_asset(
build_canonical_document(asset, metadata, parsed_blocks, parser_version, all_warnings)
.context("kb-parse-md::build_canonical_document")?;
let parse_ms = u64::try_from(t_parse.elapsed().as_millis()).unwrap_or(u64::MAX);
let t_chunk = std::time::Instant::now();
let mut chunks = MdHeadingV1Chunker
.chunk(&canonical, chunk_policy)
.context("kb-chunk::MdHeadingV1Chunker::chunk")?;
let chunk_ms = u64::try_from(t_chunk.elapsed().as_millis()).unwrap_or(u64::MAX);
// v0.24.0: surface the chunk count immediately, before the (potentially
// very slow) expansion / embed phases — so a single large document no
// longer looks frozen at `idx/total` while its chunks churn.
let total_chunks = u32::try_from(chunks.len()).unwrap_or(u32::MAX);
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::AssetChunked {
idx,
total,
chunks: total_chunks,
},
);
// Phase 2 doc-side expansion: flag on 이면 청크당 별칭 생성 (fail-soft).
// derivation cache(§3.4): 같은 청크 text + 같은 alias version_key 면 LLM
@@ -1296,6 +1326,7 @@ fn ingest_one_asset(
let mut alias_cache_hit = 0_usize;
let mut alias_cache_miss = 0_usize;
let mut alias_touch_keys: Vec<String> = Vec::new();
let t_expansion = std::time::Instant::now();
if app.config.ingest.expansion.enabled {
let exp = &app.config.ingest.expansion;
let alias_version_key = format!(
@@ -1313,6 +1344,11 @@ fn ingest_one_asset(
Ok(llm) => {
let generator =
crate::expansion::ExpansionGenerator::new(&llm, exp.max_aliases_per_chunk);
// v0.24.0: throttled live counter through the per-chunk
// expansion loop. Emit at most every 25 chunks or once per
// second — never per chunk (would flood the mpsc channel).
let mut done: u32 = 0;
let mut last_emit = std::time::Instant::now();
for chunk in &mut chunks {
let key = kebab_core::derivation_cache_key(
"alias",
@@ -1345,7 +1381,35 @@ fn ingest_one_asset(
.derivation_cache_put(&key, "alias", a.as_bytes())?;
}
}
// Cache hits count toward `done` too (the brief: show the
// warm-run fast-forward). Throttle: every 25 chunks or
// ≥1s since the last emit.
done += 1;
if done % 25 == 0
|| last_emit.elapsed() >= std::time::Duration::from_secs(1)
{
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::ExpansionProgress {
idx,
total,
done,
chunks: total_chunks,
},
);
last_emit = std::time::Instant::now();
}
}
// Final frame so the counter always lands on done == total.
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::ExpansionProgress {
idx,
total,
done,
chunks: total_chunks,
},
);
}
Err(e) => {
tracing::warn!(
@@ -1355,6 +1419,7 @@ fn ingest_one_asset(
}
}
}
let expansion_ms = u64::try_from(t_expansion.elapsed().as_millis()).unwrap_or(u64::MAX);
// Stamp chunker + embedding versions so Task 7's skip detection has
// data on the second run.
@@ -1367,6 +1432,7 @@ fn ingest_one_asset(
// (per-document tx semantics per design §5.8); composing them is
// the kb-app job. A failure mid-way leaves the DB in a state the
// next ingest run can re-converge (UPSERT + DELETE-then-INSERT).
let t_store = std::time::Instant::now();
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
app.sqlite
.put_asset_with_bytes(asset, &bytes)
@@ -1380,8 +1446,10 @@ fn ingest_one_asset(
app.sqlite
.put_chunks(&canonical.doc_id, &chunks)
.context("DocumentStore::put_chunks")?;
let store_ms = u64::try_from(t_store.elapsed().as_millis()).unwrap_or(u64::MAX);
// Embed + vector upsert (only when both sides are configured).
let t_embed = std::time::Instant::now();
let mut emb_cache_hit = 0_usize;
let mut emb_cache_miss = 0_usize;
if let (Some(emb), Some(vec_store)) = (embedder, vector_store) {
@@ -1511,6 +1579,22 @@ fn ingest_one_asset(
}
}
let embed_ms = u64::try_from(t_embed.elapsed().as_millis()).unwrap_or(u64::MAX);
// v0.24.0: phase-timing breakdown for this asset (markdown path only).
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::AssetTimings {
idx,
total,
parse_ms,
chunk_ms,
expansion_ms,
embed_ms,
store_ms,
},
);
// 히트한 alias 키들의 last_used_at 갱신(LRU 보존, §3.5).
app.sqlite.derivation_cache_touch(&alias_touch_keys)?;
@@ -1564,12 +1648,15 @@ fn ingest_one_asset(
fn ingest_one_image_asset(
app: &App,
asset: &RawAsset,
idx: u32,
total: u32,
chunk_policy: &ChunkPolicy,
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
existing_doc_ids: &std::collections::HashSet<String>,
image_pipeline: &ImagePipeline<'_>,
force_reingest: bool,
progress: Option<&std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
) -> anyhow::Result<kebab_core::IngestItem> {
let ocr_engine = image_pipeline.ocr_engine;
let caption_llm = image_pipeline.caption_llm;
@@ -1722,6 +1809,17 @@ fn ingest_one_image_asset(
.chunk(&canonical, chunk_policy)
.context("kb-chunk::MdHeadingV1Chunker::chunk (image)")?;
// v0.24.0: surface chunk count for the image path too (phase timing is
// markdown-only, but AssetChunked is consistent across media).
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::AssetChunked {
idx,
total,
chunks: u32::try_from(chunks.len()).unwrap_or(u32::MAX),
},
);
// 5. Persist + embed — identical sequence to markdown.
// Stamp chunker + embedding versions (image uses MdHeadingV1Chunker
// for its single-block doc, so we record that version).
@@ -2127,6 +2225,8 @@ fn sweep_deleted_files(
fn ingest_one_pdf_asset(
app: &App,
asset: &RawAsset,
idx: u32,
total: u32,
chunk_policy: &ChunkPolicy,
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
@@ -2330,6 +2430,16 @@ fn ingest_one_pdf_asset(
.chunk(&canonical, chunk_policy)
.context("kb-chunk::PdfPageV1Chunker::chunk")?;
// v0.24.0: surface chunk count for the PDF path too.
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::AssetChunked {
idx,
total,
chunks: u32::try_from(chunks.len()).unwrap_or(u32::MAX),
},
);
// Stamp chunker + embedding versions so Task 7's skip detection has
// data on the second run.
canonical.last_chunker_version = Some(chunker.chunker_version());

View File

@@ -69,40 +69,74 @@ fn progress_event_sequence_matches_design_section_2_4a() {
other => panic!("expected Completed last, got {other:?}"),
}
// Middle: 3 AssetStarted/AssetFinished pairs in monotonic idx order.
let asset_events: Vec<&IngestEvent> = events[2..events.len() - 1].iter().collect();
assert_eq!(
asset_events.len(),
6,
"expected 3 (Started + Finished) pairs, got {asset_events:?}"
);
for (chunk_idx, pair) in asset_events.chunks(2).enumerate() {
let expected_idx = chunk_idx as u32 + 1;
match (pair[0], pair[1]) {
(
IngestEvent::AssetStarted {
idx: si,
total: st,
media,
..
},
IngestEvent::AssetFinished {
idx: fi,
total: ft,
result,
chunks,
},
) => {
assert_eq!(*si, expected_idx, "Started idx mismatch: {pair:?}");
assert_eq!(*fi, expected_idx, "Finished idx mismatch: {pair:?}");
assert_eq!(*st, 3, "Started total mismatch");
assert_eq!(*ft, 3, "Finished total mismatch");
assert_eq!(media, "markdown", "fixture is markdown only");
assert_eq!(*result, IngestItemKind::New, "first ingest → New");
assert!(*chunks >= 1, "chunks: {pair:?}");
// Middle (v0.24.0 ordering invariant §2.4a): per asset the stream is
// AssetStarted < AssetChunked < [ExpansionProgress*] < AssetTimings
// < AssetFinished
// Expansion is disabled in the lexical fixture, so no ExpansionProgress
// frames appear here — but AssetChunked + AssetTimings are emitted for
// every markdown asset.
let middle = &events[2..events.len() - 1];
// 3 AssetStarted events, monotonic idx 1..=3, all markdown, total = 3.
let started: Vec<u32> = middle
.iter()
.filter_map(|e| match e {
IngestEvent::AssetStarted {
idx, total, media, ..
} => {
assert_eq!(*total, 3, "Started total mismatch: {e:?}");
assert_eq!(media, "markdown", "fixture is markdown only: {e:?}");
Some(*idx)
}
other => panic!("expected Started+Finished pair, got {other:?}"),
}
_ => None,
})
.collect();
assert_eq!(started, vec![1, 2, 3], "AssetStarted idx order: {middle:?}");
// 3 AssetFinished events, monotonic idx 1..=3, each New with ≥1 chunk.
let finished: Vec<u32> = middle
.iter()
.filter_map(|e| match e {
IngestEvent::AssetFinished {
idx,
total,
result,
chunks,
} => {
assert_eq!(*total, 3, "Finished total mismatch: {e:?}");
assert_eq!(*result, IngestItemKind::New, "first ingest → New: {e:?}");
assert!(*chunks >= 1, "chunks: {e:?}");
Some(*idx)
}
_ => None,
})
.collect();
assert_eq!(finished, vec![1, 2, 3], "AssetFinished idx order: {middle:?}");
// v0.24.0 additive events: exactly one AssetChunked + one AssetTimings
// per asset, each strictly bracketed by that asset's Started / Finished.
for target in 1u32..=3 {
let started_at = middle
.iter()
.position(|e| matches!(e, IngestEvent::AssetStarted { idx, .. } if *idx == target))
.unwrap_or_else(|| panic!("missing AssetStarted for idx {target}: {middle:?}"));
let finished_at = middle
.iter()
.position(|e| matches!(e, IngestEvent::AssetFinished { idx, .. } if *idx == target))
.unwrap_or_else(|| panic!("missing AssetFinished for idx {target}: {middle:?}"));
let chunked_at = middle
.iter()
.position(|e| matches!(e, IngestEvent::AssetChunked { idx, chunks, .. } if *idx == target && *chunks >= 1))
.unwrap_or_else(|| panic!("missing AssetChunked for idx {target}: {middle:?}"));
let timings_at = middle
.iter()
.position(|e| matches!(e, IngestEvent::AssetTimings { idx, .. } if *idx == target))
.unwrap_or_else(|| panic!("missing AssetTimings for idx {target}: {middle:?}"));
assert!(
started_at < chunked_at && chunked_at < timings_at && timings_at < finished_at,
"idx {target} ordering: started={started_at} chunked={chunked_at} \
timings={timings_at} finished={finished_at}: {middle:?}"
);
}
}

View File

@@ -157,6 +157,54 @@ impl ProgressDisplay {
// in Completed handles the final state. No per-asset bar update
// here avoids the duplicate-frame artifact in TTY scrollback.
}
// v0.24.0: asset-internal phase visibility. AssetChunked /
// ExpansionProgress use the bar *message* (live sub-progress for
// the current asset) — distinct from the per-file position draw,
// so a single large document no longer looks frozen. AssetTimings
// prints a one-line breakdown when the asset finishes.
IngestEvent::AssetChunked { idx, total, chunks } => {
if let Some(bar) = self.bar.as_ref() {
bar.set_message(format!("{chunks} chunks"));
}
if !tty && !quiet {
let mut err = std::io::stderr().lock();
let _ = writeln!(err, "ingest: {idx}/{total} → {chunks} chunks");
}
}
IngestEvent::ExpansionProgress {
done, chunks, ..
} => {
if let Some(bar) = self.bar.as_ref() {
bar.set_message(format!("별칭 확장 {done}/{chunks}"));
}
// Non-TTY: suppressed by default — throttled though it is, one
// line per emit would still spam CI logs. The bar message
// covers the interactive case; --json carries every frame.
}
IngestEvent::AssetTimings {
parse_ms,
chunk_ms,
expansion_ms,
embed_ms,
store_ms,
..
} => {
if let Some(bar) = self.bar.as_ref() {
bar.set_message("");
}
if !quiet {
let mut err = std::io::stderr().lock();
let _ = writeln!(
err,
" ⏱ parse {} · chunk {} · expand {} · embed {} · store {}",
fmt_ms(*parse_ms),
fmt_ms(*chunk_ms),
fmt_ms(*expansion_ms),
fmt_ms(*embed_ms),
fmt_ms(*store_ms),
);
}
}
IngestEvent::Completed { counts } => {
if let Some(bar) = self.bar.take() {
bar.finish_and_clear();
@@ -239,6 +287,17 @@ fn emit_json(event: &IngestEvent) -> anyhow::Result<()> {
Ok(())
}
/// Render a phase duration (milliseconds) compactly for the human-mode
/// `AssetTimings` line: `< 1000ms` stays in `ms`, larger spans collapse to
/// one-decimal seconds so a 45-second expansion reads `45.0s`, not `45000ms`.
fn fmt_ms(ms: u64) -> String {
if ms >= 1000 {
format!("{:.1}s", ms as f64 / 1000.0)
} else {
format!("{ms}ms")
}
}
/// Format the current wall-clock as RFC 3339 — used by `wire_ingest_progress`
/// so every emitted event carries an `ts` field per §2.4a / the wire schema.
pub(crate) fn now_rfc3339() -> anyhow::Result<String> {
@@ -285,6 +344,15 @@ mod tests {
}
}
#[test]
fn fmt_ms_switches_unit_at_one_second() {
assert_eq!(fmt_ms(0), "0ms");
assert_eq!(fmt_ms(999), "999ms");
assert_eq!(fmt_ms(1000), "1.0s");
assert_eq!(fmt_ms(45_000), "45.0s");
assert_eq!(fmt_ms(1500), "1.5s");
}
#[test]
fn now_rfc3339_parses_back() {
let s = now_rfc3339().unwrap();

View File

@@ -154,7 +154,14 @@ fn apply_event(state: &mut IngestState, event: IngestEvent) {
}
// v0.20.0 sub-item 1: per-page PDF OCR events — TUI does not
// surface per-page OCR progress in v1; no counter to update.
IngestEvent::PdfOcrStarted { .. } | IngestEvent::PdfOcrFinished { .. } => {}
IngestEvent::PdfOcrStarted { .. }
| IngestEvent::PdfOcrFinished { .. }
// v0.24.0 asset-internal phase events: the status-bar reducer tracks
// per-asset counters, not sub-asset phase progress, so these are
// no-ops here (the CLI / --json surfaces render them).
| IngestEvent::AssetChunked { .. }
| IngestEvent::ExpansionProgress { .. }
| IngestEvent::AssetTimings { .. } => {}
}
}