diff --git a/Cargo.lock b/Cargo.lock index aae0e90..73f09f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4724,7 +4724,7 @@ dependencies = [ [[package]] name = "kebab-app" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "base64 0.22.1", @@ -4771,7 +4771,7 @@ dependencies = [ [[package]] name = "kebab-chunk" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "blake3", @@ -4789,7 +4789,7 @@ dependencies = [ [[package]] name = "kebab-cli" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "clap", @@ -4810,7 +4810,7 @@ dependencies = [ [[package]] name = "kebab-config" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "dirs 5.0.1", @@ -4826,7 +4826,7 @@ dependencies = [ [[package]] name = "kebab-core" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "blake3", @@ -4840,7 +4840,7 @@ dependencies = [ [[package]] name = "kebab-embed" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "blake3", @@ -4854,7 +4854,7 @@ dependencies = [ [[package]] name = "kebab-embed-candle" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "candle-core", @@ -4873,7 +4873,7 @@ dependencies = [ [[package]] name = "kebab-embed-local" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "fastembed", @@ -4886,7 +4886,7 @@ dependencies = [ [[package]] name = "kebab-eval" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "kebab-app", @@ -4905,7 +4905,7 @@ dependencies = [ [[package]] name = "kebab-llm" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "kebab-core", @@ -4914,7 +4914,7 @@ dependencies = [ [[package]] name = "kebab-llm-local" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "kebab-config", @@ -4931,7 +4931,7 @@ dependencies = [ [[package]] name = "kebab-mcp" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "kebab-app", @@ -4949,7 +4949,7 @@ dependencies = [ [[package]] name = "kebab-nli" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "hf-hub", @@ -4964,7 +4964,7 @@ dependencies = [ [[package]] name = "kebab-parse-code" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "gix", @@ -4987,7 +4987,7 @@ dependencies = [ [[package]] name = "kebab-parse-image" -version = "0.23.1" +version = "0.24.0" dependencies = [ "ab_glyph", "anyhow", @@ -5011,7 +5011,7 @@ dependencies = [ [[package]] name = "kebab-parse-md" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "kebab-core", @@ -5028,7 +5028,7 @@ dependencies = [ [[package]] name = "kebab-parse-pdf" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "blake3", @@ -5043,7 +5043,7 @@ dependencies = [ [[package]] name = "kebab-rag" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "blake3", @@ -5065,7 +5065,7 @@ dependencies = [ [[package]] name = "kebab-search" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "globset", @@ -5084,7 +5084,7 @@ dependencies = [ [[package]] name = "kebab-source-fs" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "blake3", @@ -5102,7 +5102,7 @@ dependencies = [ [[package]] name = "kebab-store-sqlite" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "blake3", @@ -5122,7 +5122,7 @@ dependencies = [ [[package]] name = "kebab-store-vector" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "arrow", @@ -5146,7 +5146,7 @@ dependencies = [ [[package]] name = "kebab-tui" -version = "0.23.1" +version = "0.24.0" dependencies = [ "anyhow", "crossterm", diff --git a/Cargo.toml b/Cargo.toml index 02fc277..6f8021e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ edition = "2024" rust-version = "1.85" license = "MIT OR Apache-2.0" repository = "https://github.com/altair823/kebab" -version = "0.23.1" # v0.23.1 — ingest 시작 시 임베딩 백엔드/디바이스 한 줄 표시(터미널, --json/--quiet 존중) + README 에 KB 이전(어떤 파일/어느 config 키) 설명. 동작·schema 변경 없음. — CLAUDE.md §Release +version = "0.24.0" # v0.24.0 — 상세 ingest 진행 로깅: 신규 wire 이벤트 asset_chunked / expansion_progress / asset_timings (ingest_progress.v1 additive), CLI 진행바 sub-message + phase timing 한 줄. asset 내부 parse/chunk/expansion/embed/store 가시화. wire v1 backward-compat. — CLAUDE.md §Release # pre-v0.18 workspace-wide cleanup: enable clippy::pedantic group with # intentional allow-list. The allowed lints are either cosmetic (doc style), diff --git a/README.md b/README.md index 10f6d0e..9ef472a 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,7 @@ Markdown · PDF · 이미지(OCR + caption) · 소스코드(Rust/Python/TS/JS/Go | 명령 | 동작 | |------|------| | `kebab init` | XDG 경로에 데이터 디렉토리 + config.toml 생성 | -| `kebab ingest []` | 워크스페이스 스캔 후 새/변경 문서 색인 (idempotent · incremental, `--force-reingest` 로 강제 재처리). 미지원 확장자는 자동 skip | +| `kebab ingest []` | 워크스페이스 스캔 후 새/변경 문서 색인 (idempotent · incremental, `--force-reingest` 로 강제 재처리). 미지원 확장자는 자동 skip. 진행바는 문서별 청크 수 · 별칭 확장 라이브 카운터 · 문서 종료 시 phase별 소요시간(parse/chunk/expand/embed/store)을 표시 (`--json` 은 `asset_chunked`/`expansion_progress`/`asset_timings` 이벤트로) | | `kebab ingest-file ` | 단일 파일 ingest (workspace 외부 가능 — `_external/` 로 deterministic copy) | | `kebab ingest-stdin --title ` | stdin 의 markdown 본문 ingest | | `kebab search --mode {lexical,vector,hybrid} "" [flags]` | 검색 (default hybrid = RRF fusion, citation 포함). 필터/budget flag 는 `--help` | diff --git a/crates/kebab-app/src/ingest_progress.rs b/crates/kebab-app/src/ingest_progress.rs index b877a37..a797547 100644 --- a/crates/kebab-app/src/ingest_progress.rs +++ b/crates/kebab-app/src/ingest_progress.rs @@ -47,11 +47,21 @@ pub struct AggregateCounts { /// /// ```text /// ScanStarted < ScanCompleted -/// < (AssetStarted [< (PdfOcrStarted < PdfOcrFinished)*] < AssetFinished)* +/// < ( AssetStarted +/// [< (PdfOcrStarted < PdfOcrFinished)*] +/// [< AssetChunked] +/// [< ExpansionProgress*] +/// [< AssetTimings] +/// < AssetFinished )* /// < (Completed | Aborted) /// ``` /// -/// `[]` = optional, per-PDF asset only (v0.20.0 sub-item 1). +/// `[]` = optional. `PdfOcr*` is per-PDF asset only (v0.20.0 sub-item 1). +/// `AssetChunked` / `ExpansionProgress` / `AssetTimings` are the v0.24.0 +/// asset-internal phase events: `AssetChunked` fires once right after +/// chunking (markdown / image / PDF); `ExpansionProgress` is a throttled +/// counter through the alias-expansion loop (markdown, expansion enabled +/// only); `AssetTimings` reports per-phase wall-clock once (markdown only). /// /// Embed-batch events (`embed_batch_started` / `embed_batch_finished` /// in §2.4a) are reserved for a future iteration and are not emitted @@ -82,6 +92,41 @@ pub enum IngestEvent { result: IngestItemKind, chunks: u32, }, + /// v0.24.0 (additive): emitted right after an asset is chunked, before + /// expansion / embed / store. Surfaces "this document is N chunks" + /// immediately so a single large document no longer looks frozen at + /// `idx/total` while its per-chunk phases churn. `chunks` is the chunk + /// count for asset `idx`. + AssetChunked { idx: u32, total: u32, chunks: u32 }, + /// v0.24.0 (additive): throttled progress through the per-chunk + /// expansion (alias-LLM) loop — the slowest inner phase for large + /// documents (~1–4s per chunk against a remote GPU Ollama). `done` is + /// the number of chunks processed so far (cache hits included, so the + /// counter still advances on a warm re-run); `chunks` is the asset's + /// total chunk count. Emitted at most every 25 chunks or once per + /// second (see the loop in `ingest_one_asset`), plus a final + /// `done == chunks` frame. + ExpansionProgress { + idx: u32, + total: u32, + done: u32, + chunks: u32, + }, + /// v0.24.0 (additive): per-phase wall-clock (milliseconds) for asset + /// `idx`, emitted once the asset's markdown pipeline finishes. Lets a + /// user see *where* the time went (parse / chunk / expansion / embed / + /// store) without parsing logs. Only the markdown path emits this; the + /// image / PDF paths surface `AssetChunked` but skip phase timing (their + /// phase shapes differ — OCR / caption rather than expansion). + AssetTimings { + idx: u32, + total: u32, + parse_ms: u64, + chunk_ms: u64, + expansion_ms: u64, + embed_ms: u64, + store_ms: u64, + }, /// Run finished normally. `counts` is the final aggregate. Completed { counts: AggregateCounts }, /// Run finished by user cancellation. `counts` is the partial @@ -199,6 +244,79 @@ mod tests { assert_eq!(v.get("media").and_then(|s| s.as_str()), Some("markdown")); } + #[test] + fn asset_chunked_serializes_with_discriminator() { + // v0.24.0 additive variant — `kind` must be snake_case + // `asset_chunked` so wire v1 consumers branch on it cleanly. + let ev = IngestEvent::AssetChunked { + idx: 3, + total: 10, + chunks: 142, + }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!( + v.get("kind").and_then(|s| s.as_str()), + Some("asset_chunked") + ); + assert_eq!(v.get("idx").and_then(serde_json::Value::as_u64), Some(3)); + assert_eq!( + v.get("chunks").and_then(serde_json::Value::as_u64), + Some(142) + ); + } + + #[test] + fn expansion_progress_serializes_with_discriminator() { + let ev = IngestEvent::ExpansionProgress { + idx: 1, + total: 5, + done: 25, + chunks: 200, + }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!( + v.get("kind").and_then(|s| s.as_str()), + Some("expansion_progress") + ); + assert_eq!(v.get("done").and_then(serde_json::Value::as_u64), Some(25)); + assert_eq!( + v.get("chunks").and_then(serde_json::Value::as_u64), + Some(200) + ); + } + + #[test] + fn asset_timings_serializes_all_phase_fields() { + let ev = IngestEvent::AssetTimings { + idx: 2, + total: 7, + parse_ms: 12, + chunk_ms: 3, + expansion_ms: 45_000, + embed_ms: 800, + store_ms: 20, + }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!( + v.get("kind").and_then(|s| s.as_str()), + Some("asset_timings") + ); + // All five phase fields are present (plain u64, always serialized). + for (field, want) in [ + ("parse_ms", 12u64), + ("chunk_ms", 3), + ("expansion_ms", 45_000), + ("embed_ms", 800), + ("store_ms", 20), + ] { + assert_eq!( + v.get(field).and_then(serde_json::Value::as_u64), + Some(want), + "field {field}" + ); + } + } + #[test] fn ingest_event_completed_has_counts() { let ev = IngestEvent::Completed { diff --git a/crates/kebab-app/src/lib.rs b/crates/kebab-app/src/lib.rs index 018ddb9..3939ddf 100644 --- a/crates/kebab-app/src/lib.rs +++ b/crates/kebab-app/src/lib.rs @@ -480,6 +480,8 @@ pub fn ingest_with_config_opts( let item = ingest_one_asset( &app, &asset, + idx, + scanned_count, &parser_version, &chunk_policy, embedder.as_ref(), @@ -1100,6 +1102,8 @@ fn embed_with_cache( fn ingest_one_asset( app: &App, asset: &RawAsset, + idx: u32, + total: u32, parser_version: &ParserVersion, chunk_policy: &ChunkPolicy, embedder: Option<&Arc>, @@ -1132,18 +1136,23 @@ fn ingest_one_asset( return ingest_one_image_asset( app, asset, + idx, + total, chunk_policy, embedder, vector_store, existing_doc_ids, image_pipeline, force_reingest, + progress, ); } MediaType::Pdf => { return ingest_one_pdf_asset( app, asset, + idx, + total, chunk_policy, embedder, vector_store, @@ -1252,6 +1261,10 @@ fn ingest_one_asset( return Ok(item); } + // v0.24.0 phase timing: parse spans from here (byte read) through + // `build_canonical_document`, i.e. everything before the chunker runs. + let t_parse = std::time::Instant::now(); + let bytes = std::fs::read(&path) .with_context(|| format!("read asset bytes from {}", path.display()))?; @@ -1286,9 +1299,26 @@ fn ingest_one_asset( build_canonical_document(asset, metadata, parsed_blocks, parser_version, all_warnings) .context("kb-parse-md::build_canonical_document")?; + let parse_ms = u64::try_from(t_parse.elapsed().as_millis()).unwrap_or(u64::MAX); + + let t_chunk = std::time::Instant::now(); let mut chunks = MdHeadingV1Chunker .chunk(&canonical, chunk_policy) .context("kb-chunk::MdHeadingV1Chunker::chunk")?; + let chunk_ms = u64::try_from(t_chunk.elapsed().as_millis()).unwrap_or(u64::MAX); + + // v0.24.0: surface the chunk count immediately, before the (potentially + // very slow) expansion / embed phases — so a single large document no + // longer looks frozen at `idx/total` while its chunks churn. + let total_chunks = u32::try_from(chunks.len()).unwrap_or(u32::MAX); + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::AssetChunked { + idx, + total, + chunks: total_chunks, + }, + ); // Phase 2 doc-side expansion: flag on 이면 청크당 별칭 생성 (fail-soft). // derivation cache(§3.4): 같은 청크 text + 같은 alias version_key 면 LLM @@ -1296,6 +1326,7 @@ fn ingest_one_asset( let mut alias_cache_hit = 0_usize; let mut alias_cache_miss = 0_usize; let mut alias_touch_keys: Vec = Vec::new(); + let t_expansion = std::time::Instant::now(); if app.config.ingest.expansion.enabled { let exp = &app.config.ingest.expansion; let alias_version_key = format!( @@ -1313,6 +1344,12 @@ fn ingest_one_asset( Ok(llm) => { let generator = crate::expansion::ExpansionGenerator::new(&llm, exp.max_aliases_per_chunk); + // v0.24.0: throttled live counter through the per-chunk + // expansion loop. Emit at most every 25 chunks or once per + // second — never per chunk (would flood the mpsc channel). + let mut done: u32 = 0; + let mut last_emit = std::time::Instant::now(); + let mut last_done: u32 = 0; for chunk in &mut chunks { let key = kebab_core::derivation_cache_key( "alias", @@ -1345,6 +1382,40 @@ fn ingest_one_asset( .derivation_cache_put(&key, "alias", a.as_bytes())?; } } + // Cache hits count toward `done` too (the brief: show the + // warm-run fast-forward). Throttle: every 25 chunks or + // ≥1s since the last emit. + done += 1; + if done % 25 == 0 + || last_emit.elapsed() >= std::time::Duration::from_secs(1) + { + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::ExpansionProgress { + idx, + total, + done, + chunks: total_chunks, + }, + ); + last_emit = std::time::Instant::now(); + last_done = done; + } + } + // Final frame so the counter lands on done == total — but only + // if the last in-loop emit didn't already report this `done` + // (avoids a duplicate frame when chunks is a multiple of the + // throttle, and skips a 0/0 frame when there are no chunks). + if done != last_done { + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::ExpansionProgress { + idx, + total, + done, + chunks: total_chunks, + }, + ); } } Err(e) => { @@ -1355,6 +1426,7 @@ fn ingest_one_asset( } } } + let expansion_ms = u64::try_from(t_expansion.elapsed().as_millis()).unwrap_or(u64::MAX); // Stamp chunker + embedding versions so Task 7's skip detection has // data on the second run. @@ -1367,7 +1439,7 @@ fn ingest_one_asset( // (per-document tx semantics per design §5.8); composing them is // the kb-app job. A failure mid-way leaves the DB in a state the // next ingest run can re-converge (UPSERT + DELETE-then-INSERT). - purge_vector_orphans_for_workspace_path(app, asset, vector_store)?; + let t_store = std::time::Instant::now(); app.sqlite .put_asset_with_bytes(asset, &bytes) .context("DocumentStore::put_asset_with_bytes")?; @@ -1380,8 +1452,16 @@ fn ingest_one_asset( app.sqlite .put_chunks(&canonical.doc_id, &chunks) .context("DocumentStore::put_chunks")?; + let store_ms = u64::try_from(t_store.elapsed().as_millis()).unwrap_or(u64::MAX); // Embed + vector upsert (only when both sides are configured). + let t_embed = std::time::Instant::now(); + // Stale-vector purge is LanceDB I/O, so it belongs to the embed/vector + // phase — not the SQLite `store` phase. Keeping it here makes `store_ms` + // mean "SQLite persist only" and `embed_ms` cover all vector-store work + // (purge + upsert), so per-phase timings attribute the bottleneck + // correctly (review fix). Runs before any new upsert, as before. + purge_vector_orphans_for_workspace_path(app, asset, vector_store)?; let mut emb_cache_hit = 0_usize; let mut emb_cache_miss = 0_usize; if let (Some(emb), Some(vec_store)) = (embedder, vector_store) { @@ -1511,6 +1591,22 @@ fn ingest_one_asset( } } + let embed_ms = u64::try_from(t_embed.elapsed().as_millis()).unwrap_or(u64::MAX); + + // v0.24.0: phase-timing breakdown for this asset (markdown path only). + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::AssetTimings { + idx, + total, + parse_ms, + chunk_ms, + expansion_ms, + embed_ms, + store_ms, + }, + ); + // 히트한 alias 키들의 last_used_at 갱신(LRU 보존, §3.5). app.sqlite.derivation_cache_touch(&alias_touch_keys)?; @@ -1564,12 +1660,15 @@ fn ingest_one_asset( fn ingest_one_image_asset( app: &App, asset: &RawAsset, + idx: u32, + total: u32, chunk_policy: &ChunkPolicy, embedder: Option<&Arc>, vector_store: Option<&Arc>, existing_doc_ids: &std::collections::HashSet, image_pipeline: &ImagePipeline<'_>, force_reingest: bool, + progress: Option<&std::sync::mpsc::Sender>, ) -> anyhow::Result { let ocr_engine = image_pipeline.ocr_engine; let caption_llm = image_pipeline.caption_llm; @@ -1722,6 +1821,17 @@ fn ingest_one_image_asset( .chunk(&canonical, chunk_policy) .context("kb-chunk::MdHeadingV1Chunker::chunk (image)")?; + // v0.24.0: surface chunk count for the image path too (phase timing is + // markdown-only, but AssetChunked is consistent across media). + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::AssetChunked { + idx, + total, + chunks: u32::try_from(chunks.len()).unwrap_or(u32::MAX), + }, + ); + // 5. Persist + embed — identical sequence to markdown. // Stamp chunker + embedding versions (image uses MdHeadingV1Chunker // for its single-block doc, so we record that version). @@ -2127,6 +2237,8 @@ fn sweep_deleted_files( fn ingest_one_pdf_asset( app: &App, asset: &RawAsset, + idx: u32, + total: u32, chunk_policy: &ChunkPolicy, embedder: Option<&Arc>, vector_store: Option<&Arc>, @@ -2330,6 +2442,16 @@ fn ingest_one_pdf_asset( .chunk(&canonical, chunk_policy) .context("kb-chunk::PdfPageV1Chunker::chunk")?; + // v0.24.0: surface chunk count for the PDF path too. + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::AssetChunked { + idx, + total, + chunks: u32::try_from(chunks.len()).unwrap_or(u32::MAX), + }, + ); + // Stamp chunker + embedding versions so Task 7's skip detection has // data on the second run. canonical.last_chunker_version = Some(chunker.chunker_version()); diff --git a/crates/kebab-app/tests/ingest_progress.rs b/crates/kebab-app/tests/ingest_progress.rs index 0ceb00a..515ce2e 100644 --- a/crates/kebab-app/tests/ingest_progress.rs +++ b/crates/kebab-app/tests/ingest_progress.rs @@ -69,40 +69,74 @@ fn progress_event_sequence_matches_design_section_2_4a() { other => panic!("expected Completed last, got {other:?}"), } - // Middle: 3 AssetStarted/AssetFinished pairs in monotonic idx order. - let asset_events: Vec<&IngestEvent> = events[2..events.len() - 1].iter().collect(); - assert_eq!( - asset_events.len(), - 6, - "expected 3 (Started + Finished) pairs, got {asset_events:?}" - ); - for (chunk_idx, pair) in asset_events.chunks(2).enumerate() { - let expected_idx = chunk_idx as u32 + 1; - match (pair[0], pair[1]) { - ( - IngestEvent::AssetStarted { - idx: si, - total: st, - media, - .. - }, - IngestEvent::AssetFinished { - idx: fi, - total: ft, - result, - chunks, - }, - ) => { - assert_eq!(*si, expected_idx, "Started idx mismatch: {pair:?}"); - assert_eq!(*fi, expected_idx, "Finished idx mismatch: {pair:?}"); - assert_eq!(*st, 3, "Started total mismatch"); - assert_eq!(*ft, 3, "Finished total mismatch"); - assert_eq!(media, "markdown", "fixture is markdown only"); - assert_eq!(*result, IngestItemKind::New, "first ingest → New"); - assert!(*chunks >= 1, "chunks: {pair:?}"); + // Middle (v0.24.0 ordering invariant §2.4a): per asset the stream is + // AssetStarted < AssetChunked < [ExpansionProgress*] < AssetTimings + // < AssetFinished + // Expansion is disabled in the lexical fixture, so no ExpansionProgress + // frames appear here — but AssetChunked + AssetTimings are emitted for + // every markdown asset. + let middle = &events[2..events.len() - 1]; + + // 3 AssetStarted events, monotonic idx 1..=3, all markdown, total = 3. + let started: Vec = middle + .iter() + .filter_map(|e| match e { + IngestEvent::AssetStarted { + idx, total, media, .. + } => { + assert_eq!(*total, 3, "Started total mismatch: {e:?}"); + assert_eq!(media, "markdown", "fixture is markdown only: {e:?}"); + Some(*idx) } - other => panic!("expected Started+Finished pair, got {other:?}"), - } + _ => None, + }) + .collect(); + assert_eq!(started, vec![1, 2, 3], "AssetStarted idx order: {middle:?}"); + + // 3 AssetFinished events, monotonic idx 1..=3, each New with ≥1 chunk. + let finished: Vec = middle + .iter() + .filter_map(|e| match e { + IngestEvent::AssetFinished { + idx, + total, + result, + chunks, + } => { + assert_eq!(*total, 3, "Finished total mismatch: {e:?}"); + assert_eq!(*result, IngestItemKind::New, "first ingest → New: {e:?}"); + assert!(*chunks >= 1, "chunks: {e:?}"); + Some(*idx) + } + _ => None, + }) + .collect(); + assert_eq!(finished, vec![1, 2, 3], "AssetFinished idx order: {middle:?}"); + + // v0.24.0 additive events: exactly one AssetChunked + one AssetTimings + // per asset, each strictly bracketed by that asset's Started / Finished. + for target in 1u32..=3 { + let started_at = middle + .iter() + .position(|e| matches!(e, IngestEvent::AssetStarted { idx, .. } if *idx == target)) + .unwrap_or_else(|| panic!("missing AssetStarted for idx {target}: {middle:?}")); + let finished_at = middle + .iter() + .position(|e| matches!(e, IngestEvent::AssetFinished { idx, .. } if *idx == target)) + .unwrap_or_else(|| panic!("missing AssetFinished for idx {target}: {middle:?}")); + let chunked_at = middle + .iter() + .position(|e| matches!(e, IngestEvent::AssetChunked { idx, chunks, .. } if *idx == target && *chunks >= 1)) + .unwrap_or_else(|| panic!("missing AssetChunked for idx {target}: {middle:?}")); + let timings_at = middle + .iter() + .position(|e| matches!(e, IngestEvent::AssetTimings { idx, .. } if *idx == target)) + .unwrap_or_else(|| panic!("missing AssetTimings for idx {target}: {middle:?}")); + assert!( + started_at < chunked_at && chunked_at < timings_at && timings_at < finished_at, + "idx {target} ordering: started={started_at} chunked={chunked_at} \ + timings={timings_at} finished={finished_at}: {middle:?}" + ); } } diff --git a/crates/kebab-cli/src/progress.rs b/crates/kebab-cli/src/progress.rs index 4e96ceb..9c1babd 100644 --- a/crates/kebab-cli/src/progress.rs +++ b/crates/kebab-cli/src/progress.rs @@ -157,6 +157,54 @@ impl ProgressDisplay { // in Completed handles the final state. No per-asset bar update // here avoids the duplicate-frame artifact in TTY scrollback. } + // v0.24.0: asset-internal phase visibility. AssetChunked / + // ExpansionProgress use the bar *message* (live sub-progress for + // the current asset) — distinct from the per-file position draw, + // so a single large document no longer looks frozen. AssetTimings + // prints a one-line breakdown when the asset finishes. + IngestEvent::AssetChunked { idx, total, chunks } => { + if let Some(bar) = self.bar.as_ref() { + bar.set_message(format!("→ {chunks} chunks")); + } + if !tty && !quiet { + let mut err = std::io::stderr().lock(); + let _ = writeln!(err, "ingest: {idx}/{total} → {chunks} chunks"); + } + } + IngestEvent::ExpansionProgress { + done, chunks, .. + } => { + if let Some(bar) = self.bar.as_ref() { + bar.set_message(format!("별칭 확장 {done}/{chunks}")); + } + // Non-TTY: suppressed by default — throttled though it is, one + // line per emit would still spam CI logs. The bar message + // covers the interactive case; --json carries every frame. + } + IngestEvent::AssetTimings { + parse_ms, + chunk_ms, + expansion_ms, + embed_ms, + store_ms, + .. + } => { + if let Some(bar) = self.bar.as_ref() { + bar.set_message(""); + } + if !quiet { + let mut err = std::io::stderr().lock(); + let _ = writeln!( + err, + " ⏱ parse {} · chunk {} · expand {} · embed {} · store {}", + fmt_ms(*parse_ms), + fmt_ms(*chunk_ms), + fmt_ms(*expansion_ms), + fmt_ms(*embed_ms), + fmt_ms(*store_ms), + ); + } + } IngestEvent::Completed { counts } => { if let Some(bar) = self.bar.take() { bar.finish_and_clear(); @@ -239,6 +287,17 @@ fn emit_json(event: &IngestEvent) -> anyhow::Result<()> { Ok(()) } +/// Render a phase duration (milliseconds) compactly for the human-mode +/// `AssetTimings` line: `< 1000ms` stays in `ms`, larger spans collapse to +/// one-decimal seconds so a 45-second expansion reads `45.0s`, not `45000ms`. +fn fmt_ms(ms: u64) -> String { + if ms >= 1000 { + format!("{:.1}s", ms as f64 / 1000.0) + } else { + format!("{ms}ms") + } +} + /// Format the current wall-clock as RFC 3339 — used by `wire_ingest_progress` /// so every emitted event carries an `ts` field per §2.4a / the wire schema. pub(crate) fn now_rfc3339() -> anyhow::Result { @@ -285,6 +344,15 @@ mod tests { } } + #[test] + fn fmt_ms_switches_unit_at_one_second() { + assert_eq!(fmt_ms(0), "0ms"); + assert_eq!(fmt_ms(999), "999ms"); + assert_eq!(fmt_ms(1000), "1.0s"); + assert_eq!(fmt_ms(45_000), "45.0s"); + assert_eq!(fmt_ms(1500), "1.5s"); + } + #[test] fn now_rfc3339_parses_back() { let s = now_rfc3339().unwrap(); diff --git a/crates/kebab-tui/src/ingest_progress.rs b/crates/kebab-tui/src/ingest_progress.rs index eab7f78..b94510c 100644 --- a/crates/kebab-tui/src/ingest_progress.rs +++ b/crates/kebab-tui/src/ingest_progress.rs @@ -154,7 +154,14 @@ fn apply_event(state: &mut IngestState, event: IngestEvent) { } // v0.20.0 sub-item 1: per-page PDF OCR events — TUI does not // surface per-page OCR progress in v1; no counter to update. - IngestEvent::PdfOcrStarted { .. } | IngestEvent::PdfOcrFinished { .. } => {} + IngestEvent::PdfOcrStarted { .. } + | IngestEvent::PdfOcrFinished { .. } + // v0.24.0 asset-internal phase events: the status-bar reducer tracks + // per-asset counters, not sub-asset phase progress, so these are + // no-ops here (the CLI / --json surfaces render them). + | IngestEvent::AssetChunked { .. } + | IngestEvent::ExpansionProgress { .. } + | IngestEvent::AssetTimings { .. } => {} } } diff --git a/docs/wire-schema/v1/ingest_progress.schema.json b/docs/wire-schema/v1/ingest_progress.schema.json index 833cfaa..ef2889a 100644 --- a/docs/wire-schema/v1/ingest_progress.schema.json +++ b/docs/wire-schema/v1/ingest_progress.schema.json @@ -14,6 +14,9 @@ "scan_completed", "asset_started", "asset_finished", + "asset_chunked", + "expansion_progress", + "asset_timings", "embed_batch_started", "embed_batch_finished", "pdf_ocr_started", @@ -33,7 +36,13 @@ "enum": ["new", "updated", "skipped", "error"], "description": "asset_finished: per-asset outcome (mirrors `ingest_report.v1.items[].kind`)." }, - "chunks": { "type": "integer", "minimum": 0, "description": "asset_finished: chunk count produced for this asset." }, + "chunks": { "type": "integer", "minimum": 0, "description": "asset_finished / asset_chunked / expansion_progress (v0.24.0): chunk count produced for this asset." }, + "done": { "type": "integer", "minimum": 0, "description": "expansion_progress (v0.24.0, additive): chunks processed so far in the per-chunk alias-expansion loop (cache hits included). Throttled: emitted at most every 25 chunks or once per second, plus a final frame where done == chunks." }, + "parse_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): parse phase wall-clock (ms). Markdown path only." }, + "chunk_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): chunk phase wall-clock (ms). Markdown path only." }, + "expansion_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): alias-expansion phase wall-clock (ms). Markdown path only; 0 when expansion is disabled." }, + "embed_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): embed + vector phase wall-clock (ms) — embedding, vector upsert, and stale-vector purge. Markdown path only." }, + "store_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): SQLite persist phase wall-clock (ms) — put_asset/document/blocks/chunks only. Markdown path only." }, "n_chunks": { "type": "integer", "minimum": 0, "description": "embed_batch_started / embed_batch_finished: chunks in this embedding batch." }, "ms": { "type": "integer", "minimum": 0, "description": "embed_batch_finished / pdf_ocr_finished: wall-clock duration (ms). pdf_ocr_finished skip path 의 의미는 mixed (DCTDecode 부재 시 0, engine 실패 시 latency-before-bail)." }, "chars": { "type": "integer", "minimum": 0, "description": "pdf_ocr_finished: char count of OCR result. Skip 시 0." }, diff --git a/tasks/HOTFIXES.md b/tasks/HOTFIXES.md index f27f311..5425c0a 100644 --- a/tasks/HOTFIXES.md +++ b/tasks/HOTFIXES.md @@ -14,6 +14,61 @@ historical contract that was implemented; this file accumulates the deltas so phase 5+ readers can find the live behavior without diffing git history. +## 2026-06-02 — 상세 ingest 진행 로깅 (asset 내부 phase 가시화, v0.24.0) + +**무엇이 문제였나.** ingest 진행 이벤트가 asset(문서) 단위(`asset_started` / +`asset_finished`)뿐이라 한 문서 내부의 parse / chunk / **expansion(별칭 LLM, +청크당 순차 호출)** / embed / store 가 깜깜했다. expansion 은 청크당 ~1~4s +(원격 GPU Ollama)이고 큰 문서는 청크 수백~천 개 → 그 한 문서에서 수십 분이 +걸리는데, 진행바는 `1/5150` 에 멈춘 듯 보여 사용자가 병목을 못 봤다. + +**무엇을 추가했나 (wire `ingest_progress.v1` additive, 호환 유지).** +`IngestEvent` 에 세 변이 추가 — `#[serde(tag="kind")]` 라 신규 `kind` 추가는 +wire v1 호환: + +- `asset_chunked { idx, total, chunks }` — 청킹 직후(expansion/embed 전) 즉시 + "이 문서가 N청크" 노출. markdown / image / pdf 세 경로 모두 emit. +- `expansion_progress { idx, total, done, chunks }` — expansion 루프 중 + **스로틀** 발신(매 25청크 또는 ≥1s, 종료 시 `done == chunks` 1프레임 더). + 캐시 히트 청크도 `done` 에 포함(warm 재색인 fast-forward 가시화). 채널 폭주 + 방지 — 매 청크 emit 금지. +- `asset_timings { idx, total, parse_ms, chunk_ms, expansion_ms, embed_ms, + store_ms }` — asset 처리 phase 별 소요시간. **markdown 경로만** emit + (image/pdf 는 phase shape 가 달라 생략; AssetChunked 만 emit). + +**설계 결정 — AssetTimings 이벤트 vs AssetFinished 필드.** IMPL_BRIEF §1 은 +`AssetFinished` 에 optional phase-timing 필드를, §2 는 대안으로 신규 +`AssetTimings` 이벤트를 제시(권장). 후자를 택함 — `AssetFinished` 는 호출부 +(`ingest_with_config_progress` 루프)에서 만들어지는데 timing 데이터는 +`ingest_one_asset` 내부에만 있어, 필드를 채우려면 `kebab_core::IngestItem` +(wire-stable struct) 변경 또는 별도 plumbing 이 필요. `ingest_one_asset` 가 +`progress` 핸들을 이미 들고 있으므로 새 이벤트를 직접 emit 하는 쪽이 crate +경계(kebab-core 불변)도 지키고 더 깔끔. `AssetFinished` 는 손대지 않음. + +**CLI 렌더(`kebab-cli` progress.rs).** `asset_chunked` → 진행바 message `→ N +chunks`. `expansion_progress` → message `별칭 확장 {done}/{chunks}` (라이브). +`asset_timings` → asset 종료 시 `⏱ parse Xs · chunk Ys · expand Zs · embed Ws +· store Vs` 한 줄(`fmt_ms`: <1s 는 ms, ≥1s 는 1-decimal 초). `--json` 은 +`emit_json` 이 임의 이벤트를 직렬화하므로 자동 처리. `--quiet` 억제, 비-TTY +expansion_progress 는 로그 폭주 방지로 기본 억제(진행바 message 로 커버). + +**검증.** `cargo clippy --workspace --all-targets -- -D warnings` exit 0, +`cargo test -p kebab-app -p kebab-cli` exit 0. 단위 테스트: ingest_progress.rs +(3 신규 변이 직렬화 `kind` 판별 + 순서 불변식 재작성), progress.rs(`fmt_ms` 단위 +전환), 통합(`--json`/human stderr 에 새 이벤트 흐름). 실동작 smoke: 2-문서 ingest +의 `--json` 에 `asset_chunked`/`asset_timings` 출현 + human `⏱ parse…·store…` 라인 +확인. expansion 라이브 카운터는 원격 LLM 필요라 단위/통합으로 커버. + +**리뷰 반영.** (1) `store_ms` 경계 정정 — stale-vector orphan purge(LanceDB I/O)를 +`store_ms`(SQLite persist 전용)에서 빼 `embed_ms`(vector phase)로 이동. 진단 +정확도: store_ms 가 이제 SQLite put_* 만 의미(편집 재색인 시 920ms 가 실은 벡터 +삭제였던 오귀속 제거). purge 는 여전히 unconditional + 새 upsert 이전 실행 — +기능 동등. (2) 최종 `expansion_progress` 프레임을 `done != last_done` 로 가드 — +chunks 가 throttle 배수일 때의 중복 프레임 + chunks==0 시 0/0 프레임 제거. + +**알려진 한계.** image/pdf 경로는 phase timing 없음(AssetChunked 만). +expansion_progress 비-TTY 억제는 의도적(필요 시 `--json` 으로 전량 관측). + ## 2026-06-02 — ingest 백엔드/디바이스 표시 + KB 이전 문서 (v0.23.1) **동기.** Metal 빌드가 실제로 GPU 를 쓰는지 사용자가 터미널에서 못 봐서 Activity