feat(ingest): 진행 로그 개선 — 파일명/phase/heartbeat/slowest 요약
OCR/caption 켜진 볼트 ingest 가 중간부터 느릴 때 TTY 진행바가 파일명·phase·
모델·경과시간을 안 보여 "멈춤"처럼 보이던 문제 해결.
- 신규 wire AssetPhase{idx,total,phase,model} + AssetTimings.ocr_ms/caption_ms
(additive, ingest_progress.v1 유지)
- app: apply_ocr/apply_caption/embed 진입 시 AssetPhase emit + ocr/caption 시간 측정
- cli: TTY 진행바에 현재 파일명 + phase(model) + asset 경과초(heartbeat),
종료 시 최장 소요 파일 top-5 요약(quiet 여도 출력, --json 미출력)
- wire schema / README / HANDOFF / HOTFIXES 동기화, version 0.26.0 → 0.27.0
검증(리더): clippy 0, kebab-app/cli 61그룹·parse-image/tui 14그룹 0실패(-j8).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -96,14 +96,33 @@ pub enum IngestEvent {
|
||||
/// `idx/total` while its per-chunk phases churn. `chunks` is the chunk
|
||||
/// count for asset `idx`.
|
||||
AssetChunked { idx: u32, total: u32, chunks: u32 },
|
||||
/// v0.26.0 (additive): emitted when an asset enters a *slow* internal
|
||||
/// phase, so the interactive progress bar can show **which** phase
|
||||
/// (and which model) is currently running instead of looking frozen.
|
||||
/// `phase` ∈ {`"ocr"`, `"caption"`, `"embed"`}; short phases
|
||||
/// (parse / chunk / store) are intentionally *not* emitted to avoid
|
||||
/// noise. `model` is the model performing the phase — the vision LLM
|
||||
/// id for `ocr` / `caption`, the embedder `model_id` for `embed`
|
||||
/// (`None` when the phase runs without a configured model, e.g. embed
|
||||
/// with no embedder wired). Emitted once per (asset, phase); no
|
||||
/// throttle needed (low frequency). Wire v1 consumers that predate
|
||||
/// this variant simply ignore the unknown `asset_phase` kind.
|
||||
AssetPhase {
|
||||
idx: u32,
|
||||
total: u32,
|
||||
phase: String,
|
||||
model: Option<String>,
|
||||
},
|
||||
/// v0.24.0 (additive): per-phase wall-clock (milliseconds) for asset
|
||||
/// `idx`, emitted once the asset's markdown pipeline finishes. Lets a
|
||||
/// user see *where* the time went (parse / chunk / embed / store)
|
||||
/// without parsing logs. Only the markdown path emits this; the
|
||||
/// image / PDF paths surface `AssetChunked` but skip phase timing (their
|
||||
/// phase shapes differ — OCR / caption). `expansion_ms` is retained for
|
||||
/// wire compatibility but is always 0 since doc-side expansion was
|
||||
/// removed (HOTFIXES 2026-06-03).
|
||||
/// `idx`, emitted once the asset's pipeline finishes. Lets a user see
|
||||
/// *where* the time went (parse / chunk / ocr / caption / embed /
|
||||
/// store) without parsing logs. The markdown path leaves `ocr_ms` /
|
||||
/// `caption_ms` at 0 (no image analysis); the image / PDF paths fill
|
||||
/// them so the slowest-asset summary attributes vision-model time
|
||||
/// correctly. `expansion_ms` is retained for wire compatibility but is
|
||||
/// always 0 since doc-side expansion was removed (HOTFIXES 2026-06-03).
|
||||
/// `ocr_ms` / `caption_ms` (v0.26.0) are additive with serde default 0
|
||||
/// so pre-v0.26.0 consumers deserialize cleanly.
|
||||
AssetTimings {
|
||||
idx: u32,
|
||||
total: u32,
|
||||
@@ -112,6 +131,10 @@ pub enum IngestEvent {
|
||||
expansion_ms: u64,
|
||||
embed_ms: u64,
|
||||
store_ms: u64,
|
||||
#[serde(default)]
|
||||
ocr_ms: u64,
|
||||
#[serde(default)]
|
||||
caption_ms: u64,
|
||||
},
|
||||
/// Run finished normally. `counts` is the final aggregate.
|
||||
Completed { counts: AggregateCounts },
|
||||
@@ -261,19 +284,23 @@ mod tests {
|
||||
expansion_ms: 45_000,
|
||||
embed_ms: 800,
|
||||
store_ms: 20,
|
||||
ocr_ms: 1_200,
|
||||
caption_ms: 3_400,
|
||||
};
|
||||
let v = serde_json::to_value(&ev).unwrap();
|
||||
assert_eq!(
|
||||
v.get("kind").and_then(|s| s.as_str()),
|
||||
Some("asset_timings")
|
||||
);
|
||||
// All five phase fields are present (plain u64, always serialized).
|
||||
// All phase fields are present (plain u64, always serialized).
|
||||
for (field, want) in [
|
||||
("parse_ms", 12u64),
|
||||
("chunk_ms", 3),
|
||||
("expansion_ms", 45_000),
|
||||
("embed_ms", 800),
|
||||
("store_ms", 20),
|
||||
("ocr_ms", 1_200),
|
||||
("caption_ms", 3_400),
|
||||
] {
|
||||
assert_eq!(
|
||||
v.get(field).and_then(serde_json::Value::as_u64),
|
||||
@@ -283,6 +310,64 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn asset_timings_ocr_caption_default_to_zero_for_legacy_wire() {
|
||||
// v0.26.0 additive: a pre-v0.26.0 wire payload omits ocr_ms /
|
||||
// caption_ms; serde `default` must fill 0 so old producers stay
|
||||
// compatible.
|
||||
let legacy = serde_json::json!({
|
||||
"kind": "asset_timings",
|
||||
"idx": 1, "total": 1,
|
||||
"parse_ms": 5, "chunk_ms": 2, "expansion_ms": 0,
|
||||
"embed_ms": 10, "store_ms": 3
|
||||
});
|
||||
let ev: IngestEvent = serde_json::from_value(legacy).unwrap();
|
||||
match ev {
|
||||
IngestEvent::AssetTimings {
|
||||
ocr_ms,
|
||||
caption_ms,
|
||||
embed_ms,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(ocr_ms, 0);
|
||||
assert_eq!(caption_ms, 0);
|
||||
assert_eq!(embed_ms, 10);
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn asset_phase_serializes_with_discriminator() {
|
||||
// v0.26.0 additive variant — `kind` must be snake_case
|
||||
// `asset_phase`, `phase` is the slow-phase label, `model` the
|
||||
// model id (nullable).
|
||||
let ev = IngestEvent::AssetPhase {
|
||||
idx: 4,
|
||||
total: 12,
|
||||
phase: "ocr".into(),
|
||||
model: Some("gemma4:e4b".into()),
|
||||
};
|
||||
let v = serde_json::to_value(&ev).unwrap();
|
||||
assert_eq!(v.get("kind").and_then(|s| s.as_str()), Some("asset_phase"));
|
||||
assert_eq!(v.get("idx").and_then(serde_json::Value::as_u64), Some(4));
|
||||
assert_eq!(v.get("phase").and_then(|s| s.as_str()), Some("ocr"));
|
||||
assert_eq!(v.get("model").and_then(|s| s.as_str()), Some("gemma4:e4b"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn asset_phase_model_none_serializes_as_null() {
|
||||
let ev = IngestEvent::AssetPhase {
|
||||
idx: 1,
|
||||
total: 1,
|
||||
phase: "embed".into(),
|
||||
model: None,
|
||||
};
|
||||
let v = serde_json::to_value(&ev).unwrap();
|
||||
assert_eq!(v.get("phase").and_then(|s| s.as_str()), Some("embed"));
|
||||
assert!(v.get("model").is_some_and(serde_json::Value::is_null));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_event_completed_has_counts() {
|
||||
let ev = IngestEvent::Completed {
|
||||
|
||||
@@ -1350,6 +1350,17 @@ fn ingest_one_asset(
|
||||
let store_ms = u64::try_from(t_store.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// Embed + vector upsert (only when both sides are configured).
|
||||
// v0.26.0: surface the embed phase + model so a long embed run reads as
|
||||
// "embedding(<model>)…" rather than a frozen bar (markdown path too).
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetPhase {
|
||||
idx,
|
||||
total,
|
||||
phase: "embed".to_string(),
|
||||
model: embedder.map(|e| e.model_id().0),
|
||||
},
|
||||
);
|
||||
let t_embed = std::time::Instant::now();
|
||||
// Stale-vector purge is LanceDB I/O, so it belongs to the embed/vector
|
||||
// phase — not the SQLite `store` phase. Keeping it here makes `store_ms`
|
||||
@@ -1414,7 +1425,8 @@ fn ingest_one_asset(
|
||||
|
||||
let embed_ms = u64::try_from(t_embed.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// v0.24.0: phase-timing breakdown for this asset (markdown path only).
|
||||
// v0.24.0: phase-timing breakdown for this asset (markdown path).
|
||||
// ocr_ms / caption_ms are 0 — markdown has no image-analysis phases.
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetTimings {
|
||||
@@ -1425,6 +1437,8 @@ fn ingest_one_asset(
|
||||
expansion_ms,
|
||||
embed_ms,
|
||||
store_ms,
|
||||
ocr_ms: 0,
|
||||
caption_ms: 0,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1545,9 +1559,11 @@ fn ingest_one_image_asset(
|
||||
workspace_root: &workspace_root,
|
||||
config: &extract_config,
|
||||
};
|
||||
let t_parse = std::time::Instant::now();
|
||||
let mut canonical = app
|
||||
.extract_for(&asset.media_type, &ctx, &bytes)
|
||||
.context("kb-app::extract_for (image)")?;
|
||||
let parse_ms = u64::try_from(t_parse.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// 2 + 3. Apply OCR / caption when their adapters exist. Both are
|
||||
// Lenient — failure is captured into Provenance Warning,
|
||||
@@ -1562,44 +1578,74 @@ fn ingest_one_image_asset(
|
||||
let lang_hint = lang_hint_from_doc(&canonical);
|
||||
let now = time::OffsetDateTime::now_utc();
|
||||
let mut warning_notes: Vec<String> = Vec::new();
|
||||
// v0.26.0: vision phases (OCR / caption) are the usual bottleneck on an
|
||||
// image-heavy vault and emitted no progress before — so the bar looked
|
||||
// frozen. Surface each as an `AssetPhase` and measure its wall-clock for
|
||||
// the slowest-asset summary.
|
||||
let mut ocr_ms = 0_u64;
|
||||
let mut caption_ms = 0_u64;
|
||||
match canonical.blocks.first_mut() {
|
||||
Some(Block::ImageRef(block)) => {
|
||||
if let Some(engine) = ocr_engine
|
||||
&& let Err(e) = apply_ocr(
|
||||
if let Some(engine) = ocr_engine {
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetPhase {
|
||||
idx,
|
||||
total,
|
||||
phase: "ocr".to_string(),
|
||||
model: Some(engine.model().to_string()),
|
||||
},
|
||||
);
|
||||
let t_ocr = std::time::Instant::now();
|
||||
let res = apply_ocr(
|
||||
engine,
|
||||
&bytes,
|
||||
block,
|
||||
lang_hint.as_ref(),
|
||||
&mut canonical.provenance.events,
|
||||
)
|
||||
{
|
||||
record_image_analysis_failure(
|
||||
asset,
|
||||
&mut canonical.provenance.events,
|
||||
&mut warning_notes,
|
||||
"OcrFailed",
|
||||
e,
|
||||
now,
|
||||
);
|
||||
ocr_ms = u64::try_from(t_ocr.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
if let Err(e) = res {
|
||||
record_image_analysis_failure(
|
||||
asset,
|
||||
&mut canonical.provenance.events,
|
||||
&mut warning_notes,
|
||||
"OcrFailed",
|
||||
e,
|
||||
now,
|
||||
);
|
||||
}
|
||||
}
|
||||
if let Some(llm) = caption_llm
|
||||
&& let Err(e) = apply_caption(
|
||||
if let Some(llm) = caption_llm {
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetPhase {
|
||||
idx,
|
||||
total,
|
||||
phase: "caption".to_string(),
|
||||
model: Some(llm.model_ref().id),
|
||||
},
|
||||
);
|
||||
let t_caption = std::time::Instant::now();
|
||||
let res = apply_caption(
|
||||
llm,
|
||||
&bytes,
|
||||
block,
|
||||
lang_hint.as_ref(),
|
||||
&app.config,
|
||||
&mut canonical.provenance.events,
|
||||
)
|
||||
{
|
||||
record_image_analysis_failure(
|
||||
asset,
|
||||
&mut canonical.provenance.events,
|
||||
&mut warning_notes,
|
||||
"CaptionFailed",
|
||||
e,
|
||||
now,
|
||||
);
|
||||
caption_ms = u64::try_from(t_caption.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
if let Err(e) = res {
|
||||
record_image_analysis_failure(
|
||||
asset,
|
||||
&mut canonical.provenance.events,
|
||||
&mut warning_notes,
|
||||
"CaptionFailed",
|
||||
e,
|
||||
now,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// P6-1 contract: image documents always have exactly one
|
||||
@@ -1634,12 +1680,13 @@ fn ingest_one_image_asset(
|
||||
// `Block::ImageRef` arm already produces a single chunk per
|
||||
// image (P1-5). The chunk text now follows the (β) plain-concat
|
||||
// contract per the kebab-chunk render_block_text update.
|
||||
let t_chunk = std::time::Instant::now();
|
||||
let chunks = MdHeadingV1Chunker
|
||||
.chunk(&canonical, chunk_policy)
|
||||
.context("kb-chunk::MdHeadingV1Chunker::chunk (image)")?;
|
||||
let chunk_ms = u64::try_from(t_chunk.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// v0.24.0: surface chunk count for the image path too (phase timing is
|
||||
// markdown-only, but AssetChunked is consistent across media).
|
||||
// v0.24.0: surface chunk count for the image path too.
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetChunked {
|
||||
@@ -1656,6 +1703,7 @@ fn ingest_one_image_asset(
|
||||
if let Some(emb) = embedder {
|
||||
canonical.last_embedding_version = Some(emb.model_version());
|
||||
}
|
||||
let t_store = std::time::Instant::now();
|
||||
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
|
||||
app.sqlite
|
||||
.put_asset_with_bytes(asset, &bytes)
|
||||
@@ -1669,7 +1717,18 @@ fn ingest_one_image_asset(
|
||||
app.sqlite
|
||||
.put_chunks(&canonical.doc_id, &chunks)
|
||||
.context("DocumentStore::put_chunks (image)")?;
|
||||
let store_ms = u64::try_from(t_store.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetPhase {
|
||||
idx,
|
||||
total,
|
||||
phase: "embed".to_string(),
|
||||
model: embedder.map(|e| e.model_id().0),
|
||||
},
|
||||
);
|
||||
let t_embed = std::time::Instant::now();
|
||||
if let (Some(emb), Some(vec_store)) = (embedder, vector_store)
|
||||
&& !chunks.is_empty()
|
||||
{
|
||||
@@ -1710,6 +1769,25 @@ fn ingest_one_image_asset(
|
||||
.upsert(&records)
|
||||
.context("VectorStore::upsert (image)")?;
|
||||
}
|
||||
let embed_ms = u64::try_from(t_embed.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// v0.26.0: per-phase timing for the image path — ocr_ms / caption_ms
|
||||
// carry the vision-model cost so the slowest-asset summary attributes
|
||||
// an image-heavy run's bottleneck correctly.
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetTimings {
|
||||
idx,
|
||||
total,
|
||||
parse_ms,
|
||||
chunk_ms,
|
||||
expansion_ms: 0,
|
||||
embed_ms,
|
||||
store_ms,
|
||||
ocr_ms,
|
||||
caption_ms,
|
||||
},
|
||||
);
|
||||
|
||||
let kind = if existing_doc_ids.contains(&canonical.doc_id.0) {
|
||||
kebab_core::IngestItemKind::Updated
|
||||
@@ -2053,9 +2131,11 @@ fn ingest_one_pdf_asset(
|
||||
workspace_root: &workspace_root,
|
||||
config: &extract_config,
|
||||
};
|
||||
let t_parse = std::time::Instant::now();
|
||||
let mut canonical = app
|
||||
.extract_for(&asset.media_type, &ctx, &bytes)
|
||||
.context("kb-app::extract_for (pdf)")?;
|
||||
let parse_ms = u64::try_from(t_parse.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// v0.20 sub-item 1: post-extract OCR enrichment (PR #187 registry
|
||||
// dispatch invariant 보존 — extract_for 가 normal entry).
|
||||
@@ -2191,9 +2271,11 @@ fn ingest_one_pdf_asset(
|
||||
// validates every block carries `SourceSpan::Page`; failure here
|
||||
// means the parser drifted from its contract.
|
||||
let chunker = PdfPageV1Chunker;
|
||||
let t_chunk = std::time::Instant::now();
|
||||
let chunks = chunker
|
||||
.chunk(&canonical, chunk_policy)
|
||||
.context("kb-chunk::PdfPageV1Chunker::chunk")?;
|
||||
let chunk_ms = u64::try_from(t_chunk.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// v0.24.0: surface chunk count for the PDF path too.
|
||||
crate::ingest_progress::emit(
|
||||
@@ -2212,6 +2294,7 @@ fn ingest_one_pdf_asset(
|
||||
canonical.last_embedding_version = Some(emb.model_version());
|
||||
}
|
||||
|
||||
let t_store = std::time::Instant::now();
|
||||
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
|
||||
app.sqlite
|
||||
.put_asset_with_bytes(asset, &bytes)
|
||||
@@ -2225,7 +2308,18 @@ fn ingest_one_pdf_asset(
|
||||
app.sqlite
|
||||
.put_chunks(&canonical.doc_id, &chunks)
|
||||
.context("DocumentStore::put_chunks (pdf)")?;
|
||||
let store_ms = u64::try_from(t_store.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetPhase {
|
||||
idx,
|
||||
total,
|
||||
phase: "embed".to_string(),
|
||||
model: embedder.map(|e| e.model_id().0),
|
||||
},
|
||||
);
|
||||
let t_embed = std::time::Instant::now();
|
||||
if let (Some(emb), Some(vec_store)) = (embedder, vector_store)
|
||||
&& !chunks.is_empty()
|
||||
{
|
||||
@@ -2264,6 +2358,25 @@ fn ingest_one_pdf_asset(
|
||||
.upsert(&records)
|
||||
.context("VectorStore::upsert (pdf)")?;
|
||||
}
|
||||
let embed_ms = u64::try_from(t_embed.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// v0.26.0: per-phase timing for the PDF path. `ocr_ms` reuses the
|
||||
// page-OCR total already computed above so a scanned-PDF run's OCR cost
|
||||
// shows up in the slowest-asset summary; caption is markdown/image-only.
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetTimings {
|
||||
idx,
|
||||
total,
|
||||
parse_ms,
|
||||
chunk_ms,
|
||||
expansion_ms: 0,
|
||||
embed_ms,
|
||||
store_ms,
|
||||
ocr_ms: pdf_ocr_ms_total.unwrap_or(0),
|
||||
caption_ms: 0,
|
||||
},
|
||||
);
|
||||
|
||||
let kind = if existing_doc_ids.contains(&canonical.doc_id.0) {
|
||||
kebab_core::IngestItemKind::Updated
|
||||
|
||||
Reference in New Issue
Block a user