fix(ingest-progress): 리뷰 반영 — store_ms 경계 정정 + 중복 expansion 프레임 가드

- store_ms 에서 stale-vector orphan purge(LanceDB I/O) 제거 → embed/vector phase
  (embed_ms)로 이동. store_ms 가 이제 SQLite put_* 만 의미(진단 정확도; 편집
  재색인 시 920ms 오귀속 제거). purge 는 여전히 unconditional + upsert 이전.
- 최종 expansion_progress 프레임을 done != last_done 로 가드 (throttle 배수 시
  중복 프레임 + chunks==0 시 0/0 프레임 제거).
- schema/HOTFIXES: store_ms/embed_ms 설명 정정 + dangling IMPL_REPORT 참조 제거.

clippy -D warnings 0, test 312 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-02 14:49:02 +00:00
parent ad0ccf4ccf
commit 8bfa4ba76e
3 changed files with 38 additions and 17 deletions

View File

@@ -1349,6 +1349,7 @@ fn ingest_one_asset(
// second — never per chunk (would flood the mpsc channel).
let mut done: u32 = 0;
let mut last_emit = std::time::Instant::now();
let mut last_done: u32 = 0;
for chunk in &mut chunks {
let key = kebab_core::derivation_cache_key(
"alias",
@@ -1398,18 +1399,24 @@ fn ingest_one_asset(
},
);
last_emit = std::time::Instant::now();
last_done = done;
}
}
// Final frame so the counter always lands on done == total.
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::ExpansionProgress {
idx,
total,
done,
chunks: total_chunks,
},
);
// Final frame so the counter lands on done == total — but only
// if the last in-loop emit didn't already report this `done`
// (avoids a duplicate frame when chunks is a multiple of the
// throttle, and skips a 0/0 frame when there are no chunks).
if done != last_done {
crate::ingest_progress::emit(
progress,
crate::ingest_progress::IngestEvent::ExpansionProgress {
idx,
total,
done,
chunks: total_chunks,
},
);
}
}
Err(e) => {
tracing::warn!(
@@ -1433,7 +1440,6 @@ fn ingest_one_asset(
// the kb-app job. A failure mid-way leaves the DB in a state the
// next ingest run can re-converge (UPSERT + DELETE-then-INSERT).
let t_store = std::time::Instant::now();
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
app.sqlite
.put_asset_with_bytes(asset, &bytes)
.context("DocumentStore::put_asset_with_bytes")?;
@@ -1450,6 +1456,12 @@ fn ingest_one_asset(
// Embed + vector upsert (only when both sides are configured).
let t_embed = std::time::Instant::now();
// Stale-vector purge is LanceDB I/O, so it belongs to the embed/vector
// phase — not the SQLite `store` phase. Keeping it here makes `store_ms`
// mean "SQLite persist only" and `embed_ms` cover all vector-store work
// (purge + upsert), so per-phase timings attribute the bottleneck
// correctly (review fix). Runs before any new upsert, as before.
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
let mut emb_cache_hit = 0_usize;
let mut emb_cache_miss = 0_usize;
if let (Some(emb), Some(vec_store)) = (embedder, vector_store) {

View File

@@ -41,8 +41,8 @@
"parse_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): parse phase wall-clock (ms). Markdown path only." },
"chunk_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): chunk phase wall-clock (ms). Markdown path only." },
"expansion_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): alias-expansion phase wall-clock (ms). Markdown path only; 0 when expansion is disabled." },
"embed_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): embed + vector-upsert phase wall-clock (ms). Markdown path only." },
"store_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): SQLite persist phase wall-clock (ms). Markdown path only." },
"embed_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): embed + vector phase wall-clock (ms) — embedding, vector upsert, and stale-vector purge. Markdown path only." },
"store_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): SQLite persist phase wall-clock (ms) — put_asset/document/blocks/chunks only. Markdown path only." },
"n_chunks": { "type": "integer", "minimum": 0, "description": "embed_batch_started / embed_batch_finished: chunks in this embedding batch." },
"ms": { "type": "integer", "minimum": 0, "description": "embed_batch_finished / pdf_ocr_finished: wall-clock duration (ms). pdf_ocr_finished skip path 의 의미는 mixed (DCTDecode 부재 시 0, engine 실패 시 latency-before-bail)." },
"chars": { "type": "integer", "minimum": 0, "description": "pdf_ocr_finished: char count of OCR result. Skip 시 0." },

View File

@@ -52,10 +52,19 @@ chunks`. `expansion_progress` → message `별칭 확장 {done}/{chunks}` (라
`emit_json` 이 임의 이벤트를 직렬화하므로 자동 처리. `--quiet` 억제, 비-TTY
expansion_progress 는 로그 폭주 방지로 기본 억제(진행바 message 로 커버).
**검증.** 단위 테스트: ingest_progress.rs(3 신규 변이 직렬화 `kind` 판별),
progress.rs(`fmt_ms` 단위 전환). clippy/test exit code 는 같은 PR 의
IMPL_REPORT 참조. 실동작은 단위/통합으로 충분(expansion 라이브 카운터는 원격
LLM 필요).
**검증.** `cargo clippy --workspace --all-targets -- -D warnings` exit 0,
`cargo test -p kebab-app -p kebab-cli` exit 0. 단위 테스트: ingest_progress.rs
(3 신규 변이 직렬화 `kind` 판별 + 순서 불변식 재작성), progress.rs(`fmt_ms` 단위
전환), 통합(`--json`/human stderr 에 새 이벤트 흐름). 실동작 smoke: 2-문서 ingest
의 `--json` 에 `asset_chunked`/`asset_timings` 출현 + human `⏱ parse…·store…` 라인
확인. expansion 라이브 카운터는 원격 LLM 필요라 단위/통합으로 커버.
**리뷰 반영.** (1) `store_ms` 경계 정정 — stale-vector orphan purge(LanceDB I/O)를
`store_ms`(SQLite persist 전용)에서 빼 `embed_ms`(vector phase)로 이동. 진단
정확도: store_ms 가 이제 SQLite put_* 만 의미(편집 재색인 시 920ms 가 실은 벡터
삭제였던 오귀속 제거). purge 는 여전히 unconditional + 새 upsert 이전 실행 —
기능 동등. (2) 최종 `expansion_progress` 프레임을 `done != last_done` 로 가드 —
chunks 가 throttle 배수일 때의 중복 프레임 + chunks==0 시 0/0 프레임 제거.
**알려진 한계.** image/pdf 경로는 phase timing 없음(AssetChunked 만).
expansion_progress 비-TTY 억제는 의도적(필요 시 `--json` 으로 전량 관측).