From 8bfa4ba76edea60cfd18c94fdf709cf8cdf96824 Mon Sep 17 00:00:00 2001 From: altair823 Date: Tue, 2 Jun 2026 14:49:02 +0000 Subject: [PATCH] =?UTF-8?q?fix(ingest-progress):=20=EB=A6=AC=EB=B7=B0=20?= =?UTF-8?q?=EB=B0=98=EC=98=81=20=E2=80=94=20store=5Fms=20=EA=B2=BD?= =?UTF-8?q?=EA=B3=84=20=EC=A0=95=EC=A0=95=20+=20=EC=A4=91=EB=B3=B5=20expan?= =?UTF-8?q?sion=20=ED=94=84=EB=A0=88=EC=9E=84=20=EA=B0=80=EB=93=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - store_ms 에서 stale-vector orphan purge(LanceDB I/O) 제거 → embed/vector phase (embed_ms)로 이동. store_ms 가 이제 SQLite put_* 만 의미(진단 정확도; 편집 재색인 시 920ms 오귀속 제거). purge 는 여전히 unconditional + upsert 이전. - 최종 expansion_progress 프레임을 done != last_done 로 가드 (throttle 배수 시 중복 프레임 + chunks==0 시 0/0 프레임 제거). - schema/HOTFIXES: store_ms/embed_ms 설명 정정 + dangling IMPL_REPORT 참조 제거. clippy -D warnings 0, test 312 passed. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/kebab-app/src/lib.rs | 34 +++++++++++++------ .../v1/ingest_progress.schema.json | 4 +-- tasks/HOTFIXES.md | 17 +++++++--- 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/crates/kebab-app/src/lib.rs b/crates/kebab-app/src/lib.rs index 354da3e..3939ddf 100644 --- a/crates/kebab-app/src/lib.rs +++ b/crates/kebab-app/src/lib.rs @@ -1349,6 +1349,7 @@ fn ingest_one_asset( // second — never per chunk (would flood the mpsc channel). let mut done: u32 = 0; let mut last_emit = std::time::Instant::now(); + let mut last_done: u32 = 0; for chunk in &mut chunks { let key = kebab_core::derivation_cache_key( "alias", @@ -1398,18 +1399,24 @@ fn ingest_one_asset( }, ); last_emit = std::time::Instant::now(); + last_done = done; } } - // Final frame so the counter always lands on done == total. - crate::ingest_progress::emit( - progress, - crate::ingest_progress::IngestEvent::ExpansionProgress { - idx, - total, - done, - chunks: total_chunks, - }, - ); + // Final frame so the counter lands on done == total — but only + // if the last in-loop emit didn't already report this `done` + // (avoids a duplicate frame when chunks is a multiple of the + // throttle, and skips a 0/0 frame when there are no chunks). + if done != last_done { + crate::ingest_progress::emit( + progress, + crate::ingest_progress::IngestEvent::ExpansionProgress { + idx, + total, + done, + chunks: total_chunks, + }, + ); + } } Err(e) => { tracing::warn!( @@ -1433,7 +1440,6 @@ fn ingest_one_asset( // the kb-app job. A failure mid-way leaves the DB in a state the // next ingest run can re-converge (UPSERT + DELETE-then-INSERT). let t_store = std::time::Instant::now(); - purge_vector_orphans_for_workspace_path(app, asset, vector_store)?; app.sqlite .put_asset_with_bytes(asset, &bytes) .context("DocumentStore::put_asset_with_bytes")?; @@ -1450,6 +1456,12 @@ fn ingest_one_asset( // Embed + vector upsert (only when both sides are configured). let t_embed = std::time::Instant::now(); + // Stale-vector purge is LanceDB I/O, so it belongs to the embed/vector + // phase — not the SQLite `store` phase. Keeping it here makes `store_ms` + // mean "SQLite persist only" and `embed_ms` cover all vector-store work + // (purge + upsert), so per-phase timings attribute the bottleneck + // correctly (review fix). Runs before any new upsert, as before. + purge_vector_orphans_for_workspace_path(app, asset, vector_store)?; let mut emb_cache_hit = 0_usize; let mut emb_cache_miss = 0_usize; if let (Some(emb), Some(vec_store)) = (embedder, vector_store) { diff --git a/docs/wire-schema/v1/ingest_progress.schema.json b/docs/wire-schema/v1/ingest_progress.schema.json index 29e1998..ef2889a 100644 --- a/docs/wire-schema/v1/ingest_progress.schema.json +++ b/docs/wire-schema/v1/ingest_progress.schema.json @@ -41,8 +41,8 @@ "parse_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): parse phase wall-clock (ms). Markdown path only." }, "chunk_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): chunk phase wall-clock (ms). Markdown path only." }, "expansion_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): alias-expansion phase wall-clock (ms). Markdown path only; 0 when expansion is disabled." }, - "embed_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): embed + vector-upsert phase wall-clock (ms). Markdown path only." }, - "store_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): SQLite persist phase wall-clock (ms). Markdown path only." }, + "embed_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): embed + vector phase wall-clock (ms) — embedding, vector upsert, and stale-vector purge. Markdown path only." }, + "store_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): SQLite persist phase wall-clock (ms) — put_asset/document/blocks/chunks only. Markdown path only." }, "n_chunks": { "type": "integer", "minimum": 0, "description": "embed_batch_started / embed_batch_finished: chunks in this embedding batch." }, "ms": { "type": "integer", "minimum": 0, "description": "embed_batch_finished / pdf_ocr_finished: wall-clock duration (ms). pdf_ocr_finished skip path 의 의미는 mixed (DCTDecode 부재 시 0, engine 실패 시 latency-before-bail)." }, "chars": { "type": "integer", "minimum": 0, "description": "pdf_ocr_finished: char count of OCR result. Skip 시 0." }, diff --git a/tasks/HOTFIXES.md b/tasks/HOTFIXES.md index 37ac1e2..5425c0a 100644 --- a/tasks/HOTFIXES.md +++ b/tasks/HOTFIXES.md @@ -52,10 +52,19 @@ chunks`. `expansion_progress` → message `별칭 확장 {done}/{chunks}` (라 `emit_json` 이 임의 이벤트를 직렬화하므로 자동 처리. `--quiet` 억제, 비-TTY expansion_progress 는 로그 폭주 방지로 기본 억제(진행바 message 로 커버). -**검증.** 단위 테스트: ingest_progress.rs(3 신규 변이 직렬화 `kind` 판별), -progress.rs(`fmt_ms` 단위 전환). clippy/test exit code 는 같은 PR 의 -IMPL_REPORT 참조. 실동작은 단위/통합으로 충분(expansion 라이브 카운터는 원격 -LLM 필요). +**검증.** `cargo clippy --workspace --all-targets -- -D warnings` exit 0, +`cargo test -p kebab-app -p kebab-cli` exit 0. 단위 테스트: ingest_progress.rs +(3 신규 변이 직렬화 `kind` 판별 + 순서 불변식 재작성), progress.rs(`fmt_ms` 단위 +전환), 통합(`--json`/human stderr 에 새 이벤트 흐름). 실동작 smoke: 2-문서 ingest +의 `--json` 에 `asset_chunked`/`asset_timings` 출현 + human `⏱ parse…·store…` 라인 +확인. expansion 라이브 카운터는 원격 LLM 필요라 단위/통합으로 커버. + +**리뷰 반영.** (1) `store_ms` 경계 정정 — stale-vector orphan purge(LanceDB I/O)를 +`store_ms`(SQLite persist 전용)에서 빼 `embed_ms`(vector phase)로 이동. 진단 +정확도: store_ms 가 이제 SQLite put_* 만 의미(편집 재색인 시 920ms 가 실은 벡터 +삭제였던 오귀속 제거). purge 는 여전히 unconditional + 새 upsert 이전 실행 — +기능 동등. (2) 최종 `expansion_progress` 프레임을 `done != last_done` 로 가드 — +chunks 가 throttle 배수일 때의 중복 프레임 + chunks==0 시 0/0 프레임 제거. **알려진 한계.** image/pdf 경로는 phase timing 없음(AssetChunked 만). expansion_progress 비-TTY 억제는 의도적(필요 시 `--json` 으로 전량 관측).