Merge pull request 'feat(ingest): asset 내부 phase 진행 로깅 (asset_chunked/expansion_progress/asset_timings)' (#201) from feat/ingest-progress-detail into main
Reviewed-on: #201
This commit was merged in pull request #201.
This commit is contained in:
46
Cargo.lock
generated
46
Cargo.lock
generated
@@ -4724,7 +4724,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-app"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.22.1",
|
||||
@@ -4771,7 +4771,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-chunk"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake3",
|
||||
@@ -4789,7 +4789,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-cli"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
@@ -4810,7 +4810,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-config"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"dirs 5.0.1",
|
||||
@@ -4826,7 +4826,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-core"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake3",
|
||||
@@ -4840,7 +4840,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-embed"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake3",
|
||||
@@ -4854,7 +4854,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-embed-candle"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"candle-core",
|
||||
@@ -4873,7 +4873,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-embed-local"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"fastembed",
|
||||
@@ -4886,7 +4886,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-eval"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"kebab-app",
|
||||
@@ -4905,7 +4905,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-llm"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"kebab-core",
|
||||
@@ -4914,7 +4914,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-llm-local"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"kebab-config",
|
||||
@@ -4931,7 +4931,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-mcp"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"kebab-app",
|
||||
@@ -4949,7 +4949,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-nli"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"hf-hub",
|
||||
@@ -4964,7 +4964,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-parse-code"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"gix",
|
||||
@@ -4987,7 +4987,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-parse-image"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"ab_glyph",
|
||||
"anyhow",
|
||||
@@ -5011,7 +5011,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-parse-md"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"kebab-core",
|
||||
@@ -5028,7 +5028,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-parse-pdf"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake3",
|
||||
@@ -5043,7 +5043,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-rag"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake3",
|
||||
@@ -5065,7 +5065,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-search"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"globset",
|
||||
@@ -5084,7 +5084,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-source-fs"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake3",
|
||||
@@ -5102,7 +5102,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-store-sqlite"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake3",
|
||||
@@ -5122,7 +5122,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-store-vector"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrow",
|
||||
@@ -5146,7 +5146,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kebab-tui"
|
||||
version = "0.23.1"
|
||||
version = "0.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"crossterm",
|
||||
|
||||
@@ -31,7 +31,7 @@ edition = "2024"
|
||||
rust-version = "1.85"
|
||||
license = "MIT OR Apache-2.0"
|
||||
repository = "https://github.com/altair823/kebab"
|
||||
version = "0.23.1" # v0.23.1 — ingest 시작 시 임베딩 백엔드/디바이스 한 줄 표시(터미널, --json/--quiet 존중) + README 에 KB 이전(어떤 파일/어느 config 키) 설명. 동작·schema 변경 없음. — CLAUDE.md §Release
|
||||
version = "0.24.0" # v0.24.0 — 상세 ingest 진행 로깅: 신규 wire 이벤트 asset_chunked / expansion_progress / asset_timings (ingest_progress.v1 additive), CLI 진행바 sub-message + phase timing 한 줄. asset 내부 parse/chunk/expansion/embed/store 가시화. wire v1 backward-compat. — CLAUDE.md §Release
|
||||
|
||||
# pre-v0.18 workspace-wide cleanup: enable clippy::pedantic group with
|
||||
# intentional allow-list. The allowed lints are either cosmetic (doc style),
|
||||
|
||||
@@ -87,7 +87,7 @@ Markdown · PDF · 이미지(OCR + caption) · 소스코드(Rust/Python/TS/JS/Go
|
||||
| 명령 | 동작 |
|
||||
|------|------|
|
||||
| `kebab init` | XDG 경로에 데이터 디렉토리 + config.toml 생성 |
|
||||
| `kebab ingest [<path>]` | 워크스페이스 스캔 후 새/변경 문서 색인 (idempotent · incremental, `--force-reingest` 로 강제 재처리). 미지원 확장자는 자동 skip |
|
||||
| `kebab ingest [<path>]` | 워크스페이스 스캔 후 새/변경 문서 색인 (idempotent · incremental, `--force-reingest` 로 강제 재처리). 미지원 확장자는 자동 skip. 진행바는 문서별 청크 수 · 별칭 확장 라이브 카운터 · 문서 종료 시 phase별 소요시간(parse/chunk/expand/embed/store)을 표시 (`--json` 은 `asset_chunked`/`expansion_progress`/`asset_timings` 이벤트로) |
|
||||
| `kebab ingest-file <path>` | 단일 파일 ingest (workspace 외부 가능 — `_external/` 로 deterministic copy) |
|
||||
| `kebab ingest-stdin --title <T>` | stdin 의 markdown 본문 ingest |
|
||||
| `kebab search --mode {lexical,vector,hybrid} "<query>" [flags]` | 검색 (default hybrid = RRF fusion, citation 포함). 필터/budget flag 는 `--help` |
|
||||
|
||||
@@ -47,11 +47,21 @@ pub struct AggregateCounts {
|
||||
///
|
||||
/// ```text
|
||||
/// ScanStarted < ScanCompleted
|
||||
/// < (AssetStarted [< (PdfOcrStarted < PdfOcrFinished)*] < AssetFinished)*
|
||||
/// < ( AssetStarted
|
||||
/// [< (PdfOcrStarted < PdfOcrFinished)*]
|
||||
/// [< AssetChunked]
|
||||
/// [< ExpansionProgress*]
|
||||
/// [< AssetTimings]
|
||||
/// < AssetFinished )*
|
||||
/// < (Completed | Aborted)
|
||||
/// ```
|
||||
///
|
||||
/// `[]` = optional, per-PDF asset only (v0.20.0 sub-item 1).
|
||||
/// `[]` = optional. `PdfOcr*` is per-PDF asset only (v0.20.0 sub-item 1).
|
||||
/// `AssetChunked` / `ExpansionProgress` / `AssetTimings` are the v0.24.0
|
||||
/// asset-internal phase events: `AssetChunked` fires once right after
|
||||
/// chunking (markdown / image / PDF); `ExpansionProgress` is a throttled
|
||||
/// counter through the alias-expansion loop (markdown, expansion enabled
|
||||
/// only); `AssetTimings` reports per-phase wall-clock once (markdown only).
|
||||
///
|
||||
/// Embed-batch events (`embed_batch_started` / `embed_batch_finished`
|
||||
/// in §2.4a) are reserved for a future iteration and are not emitted
|
||||
@@ -82,6 +92,41 @@ pub enum IngestEvent {
|
||||
result: IngestItemKind,
|
||||
chunks: u32,
|
||||
},
|
||||
/// v0.24.0 (additive): emitted right after an asset is chunked, before
|
||||
/// expansion / embed / store. Surfaces "this document is N chunks"
|
||||
/// immediately so a single large document no longer looks frozen at
|
||||
/// `idx/total` while its per-chunk phases churn. `chunks` is the chunk
|
||||
/// count for asset `idx`.
|
||||
AssetChunked { idx: u32, total: u32, chunks: u32 },
|
||||
/// v0.24.0 (additive): throttled progress through the per-chunk
|
||||
/// expansion (alias-LLM) loop — the slowest inner phase for large
|
||||
/// documents (~1–4s per chunk against a remote GPU Ollama). `done` is
|
||||
/// the number of chunks processed so far (cache hits included, so the
|
||||
/// counter still advances on a warm re-run); `chunks` is the asset's
|
||||
/// total chunk count. Emitted at most every 25 chunks or once per
|
||||
/// second (see the loop in `ingest_one_asset`), plus a final
|
||||
/// `done == chunks` frame.
|
||||
ExpansionProgress {
|
||||
idx: u32,
|
||||
total: u32,
|
||||
done: u32,
|
||||
chunks: u32,
|
||||
},
|
||||
/// v0.24.0 (additive): per-phase wall-clock (milliseconds) for asset
|
||||
/// `idx`, emitted once the asset's markdown pipeline finishes. Lets a
|
||||
/// user see *where* the time went (parse / chunk / expansion / embed /
|
||||
/// store) without parsing logs. Only the markdown path emits this; the
|
||||
/// image / PDF paths surface `AssetChunked` but skip phase timing (their
|
||||
/// phase shapes differ — OCR / caption rather than expansion).
|
||||
AssetTimings {
|
||||
idx: u32,
|
||||
total: u32,
|
||||
parse_ms: u64,
|
||||
chunk_ms: u64,
|
||||
expansion_ms: u64,
|
||||
embed_ms: u64,
|
||||
store_ms: u64,
|
||||
},
|
||||
/// Run finished normally. `counts` is the final aggregate.
|
||||
Completed { counts: AggregateCounts },
|
||||
/// Run finished by user cancellation. `counts` is the partial
|
||||
@@ -199,6 +244,79 @@ mod tests {
|
||||
assert_eq!(v.get("media").and_then(|s| s.as_str()), Some("markdown"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn asset_chunked_serializes_with_discriminator() {
|
||||
// v0.24.0 additive variant — `kind` must be snake_case
|
||||
// `asset_chunked` so wire v1 consumers branch on it cleanly.
|
||||
let ev = IngestEvent::AssetChunked {
|
||||
idx: 3,
|
||||
total: 10,
|
||||
chunks: 142,
|
||||
};
|
||||
let v = serde_json::to_value(&ev).unwrap();
|
||||
assert_eq!(
|
||||
v.get("kind").and_then(|s| s.as_str()),
|
||||
Some("asset_chunked")
|
||||
);
|
||||
assert_eq!(v.get("idx").and_then(serde_json::Value::as_u64), Some(3));
|
||||
assert_eq!(
|
||||
v.get("chunks").and_then(serde_json::Value::as_u64),
|
||||
Some(142)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn expansion_progress_serializes_with_discriminator() {
|
||||
let ev = IngestEvent::ExpansionProgress {
|
||||
idx: 1,
|
||||
total: 5,
|
||||
done: 25,
|
||||
chunks: 200,
|
||||
};
|
||||
let v = serde_json::to_value(&ev).unwrap();
|
||||
assert_eq!(
|
||||
v.get("kind").and_then(|s| s.as_str()),
|
||||
Some("expansion_progress")
|
||||
);
|
||||
assert_eq!(v.get("done").and_then(serde_json::Value::as_u64), Some(25));
|
||||
assert_eq!(
|
||||
v.get("chunks").and_then(serde_json::Value::as_u64),
|
||||
Some(200)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn asset_timings_serializes_all_phase_fields() {
|
||||
let ev = IngestEvent::AssetTimings {
|
||||
idx: 2,
|
||||
total: 7,
|
||||
parse_ms: 12,
|
||||
chunk_ms: 3,
|
||||
expansion_ms: 45_000,
|
||||
embed_ms: 800,
|
||||
store_ms: 20,
|
||||
};
|
||||
let v = serde_json::to_value(&ev).unwrap();
|
||||
assert_eq!(
|
||||
v.get("kind").and_then(|s| s.as_str()),
|
||||
Some("asset_timings")
|
||||
);
|
||||
// All five phase fields are present (plain u64, always serialized).
|
||||
for (field, want) in [
|
||||
("parse_ms", 12u64),
|
||||
("chunk_ms", 3),
|
||||
("expansion_ms", 45_000),
|
||||
("embed_ms", 800),
|
||||
("store_ms", 20),
|
||||
] {
|
||||
assert_eq!(
|
||||
v.get(field).and_then(serde_json::Value::as_u64),
|
||||
Some(want),
|
||||
"field {field}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingest_event_completed_has_counts() {
|
||||
let ev = IngestEvent::Completed {
|
||||
|
||||
@@ -480,6 +480,8 @@ pub fn ingest_with_config_opts(
|
||||
let item = ingest_one_asset(
|
||||
&app,
|
||||
&asset,
|
||||
idx,
|
||||
scanned_count,
|
||||
&parser_version,
|
||||
&chunk_policy,
|
||||
embedder.as_ref(),
|
||||
@@ -1100,6 +1102,8 @@ fn embed_with_cache(
|
||||
fn ingest_one_asset(
|
||||
app: &App,
|
||||
asset: &RawAsset,
|
||||
idx: u32,
|
||||
total: u32,
|
||||
parser_version: &ParserVersion,
|
||||
chunk_policy: &ChunkPolicy,
|
||||
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
|
||||
@@ -1132,18 +1136,23 @@ fn ingest_one_asset(
|
||||
return ingest_one_image_asset(
|
||||
app,
|
||||
asset,
|
||||
idx,
|
||||
total,
|
||||
chunk_policy,
|
||||
embedder,
|
||||
vector_store,
|
||||
existing_doc_ids,
|
||||
image_pipeline,
|
||||
force_reingest,
|
||||
progress,
|
||||
);
|
||||
}
|
||||
MediaType::Pdf => {
|
||||
return ingest_one_pdf_asset(
|
||||
app,
|
||||
asset,
|
||||
idx,
|
||||
total,
|
||||
chunk_policy,
|
||||
embedder,
|
||||
vector_store,
|
||||
@@ -1252,6 +1261,10 @@ fn ingest_one_asset(
|
||||
return Ok(item);
|
||||
}
|
||||
|
||||
// v0.24.0 phase timing: parse spans from here (byte read) through
|
||||
// `build_canonical_document`, i.e. everything before the chunker runs.
|
||||
let t_parse = std::time::Instant::now();
|
||||
|
||||
let bytes = std::fs::read(&path)
|
||||
.with_context(|| format!("read asset bytes from {}", path.display()))?;
|
||||
|
||||
@@ -1286,9 +1299,26 @@ fn ingest_one_asset(
|
||||
build_canonical_document(asset, metadata, parsed_blocks, parser_version, all_warnings)
|
||||
.context("kb-parse-md::build_canonical_document")?;
|
||||
|
||||
let parse_ms = u64::try_from(t_parse.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
let t_chunk = std::time::Instant::now();
|
||||
let mut chunks = MdHeadingV1Chunker
|
||||
.chunk(&canonical, chunk_policy)
|
||||
.context("kb-chunk::MdHeadingV1Chunker::chunk")?;
|
||||
let chunk_ms = u64::try_from(t_chunk.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// v0.24.0: surface the chunk count immediately, before the (potentially
|
||||
// very slow) expansion / embed phases — so a single large document no
|
||||
// longer looks frozen at `idx/total` while its chunks churn.
|
||||
let total_chunks = u32::try_from(chunks.len()).unwrap_or(u32::MAX);
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetChunked {
|
||||
idx,
|
||||
total,
|
||||
chunks: total_chunks,
|
||||
},
|
||||
);
|
||||
|
||||
// Phase 2 doc-side expansion: flag on 이면 청크당 별칭 생성 (fail-soft).
|
||||
// derivation cache(§3.4): 같은 청크 text + 같은 alias version_key 면 LLM
|
||||
@@ -1296,6 +1326,7 @@ fn ingest_one_asset(
|
||||
let mut alias_cache_hit = 0_usize;
|
||||
let mut alias_cache_miss = 0_usize;
|
||||
let mut alias_touch_keys: Vec<String> = Vec::new();
|
||||
let t_expansion = std::time::Instant::now();
|
||||
if app.config.ingest.expansion.enabled {
|
||||
let exp = &app.config.ingest.expansion;
|
||||
let alias_version_key = format!(
|
||||
@@ -1313,6 +1344,12 @@ fn ingest_one_asset(
|
||||
Ok(llm) => {
|
||||
let generator =
|
||||
crate::expansion::ExpansionGenerator::new(&llm, exp.max_aliases_per_chunk);
|
||||
// v0.24.0: throttled live counter through the per-chunk
|
||||
// expansion loop. Emit at most every 25 chunks or once per
|
||||
// second — never per chunk (would flood the mpsc channel).
|
||||
let mut done: u32 = 0;
|
||||
let mut last_emit = std::time::Instant::now();
|
||||
let mut last_done: u32 = 0;
|
||||
for chunk in &mut chunks {
|
||||
let key = kebab_core::derivation_cache_key(
|
||||
"alias",
|
||||
@@ -1345,6 +1382,40 @@ fn ingest_one_asset(
|
||||
.derivation_cache_put(&key, "alias", a.as_bytes())?;
|
||||
}
|
||||
}
|
||||
// Cache hits count toward `done` too (the brief: show the
|
||||
// warm-run fast-forward). Throttle: every 25 chunks or
|
||||
// ≥1s since the last emit.
|
||||
done += 1;
|
||||
if done % 25 == 0
|
||||
|| last_emit.elapsed() >= std::time::Duration::from_secs(1)
|
||||
{
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::ExpansionProgress {
|
||||
idx,
|
||||
total,
|
||||
done,
|
||||
chunks: total_chunks,
|
||||
},
|
||||
);
|
||||
last_emit = std::time::Instant::now();
|
||||
last_done = done;
|
||||
}
|
||||
}
|
||||
// Final frame so the counter lands on done == total — but only
|
||||
// if the last in-loop emit didn't already report this `done`
|
||||
// (avoids a duplicate frame when chunks is a multiple of the
|
||||
// throttle, and skips a 0/0 frame when there are no chunks).
|
||||
if done != last_done {
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::ExpansionProgress {
|
||||
idx,
|
||||
total,
|
||||
done,
|
||||
chunks: total_chunks,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1355,6 +1426,7 @@ fn ingest_one_asset(
|
||||
}
|
||||
}
|
||||
}
|
||||
let expansion_ms = u64::try_from(t_expansion.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// Stamp chunker + embedding versions so Task 7's skip detection has
|
||||
// data on the second run.
|
||||
@@ -1367,7 +1439,7 @@ fn ingest_one_asset(
|
||||
// (per-document tx semantics per design §5.8); composing them is
|
||||
// the kb-app job. A failure mid-way leaves the DB in a state the
|
||||
// next ingest run can re-converge (UPSERT + DELETE-then-INSERT).
|
||||
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
|
||||
let t_store = std::time::Instant::now();
|
||||
app.sqlite
|
||||
.put_asset_with_bytes(asset, &bytes)
|
||||
.context("DocumentStore::put_asset_with_bytes")?;
|
||||
@@ -1380,8 +1452,16 @@ fn ingest_one_asset(
|
||||
app.sqlite
|
||||
.put_chunks(&canonical.doc_id, &chunks)
|
||||
.context("DocumentStore::put_chunks")?;
|
||||
let store_ms = u64::try_from(t_store.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// Embed + vector upsert (only when both sides are configured).
|
||||
let t_embed = std::time::Instant::now();
|
||||
// Stale-vector purge is LanceDB I/O, so it belongs to the embed/vector
|
||||
// phase — not the SQLite `store` phase. Keeping it here makes `store_ms`
|
||||
// mean "SQLite persist only" and `embed_ms` cover all vector-store work
|
||||
// (purge + upsert), so per-phase timings attribute the bottleneck
|
||||
// correctly (review fix). Runs before any new upsert, as before.
|
||||
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
|
||||
let mut emb_cache_hit = 0_usize;
|
||||
let mut emb_cache_miss = 0_usize;
|
||||
if let (Some(emb), Some(vec_store)) = (embedder, vector_store) {
|
||||
@@ -1511,6 +1591,22 @@ fn ingest_one_asset(
|
||||
}
|
||||
}
|
||||
|
||||
let embed_ms = u64::try_from(t_embed.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
|
||||
// v0.24.0: phase-timing breakdown for this asset (markdown path only).
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetTimings {
|
||||
idx,
|
||||
total,
|
||||
parse_ms,
|
||||
chunk_ms,
|
||||
expansion_ms,
|
||||
embed_ms,
|
||||
store_ms,
|
||||
},
|
||||
);
|
||||
|
||||
// 히트한 alias 키들의 last_used_at 갱신(LRU 보존, §3.5).
|
||||
app.sqlite.derivation_cache_touch(&alias_touch_keys)?;
|
||||
|
||||
@@ -1564,12 +1660,15 @@ fn ingest_one_asset(
|
||||
fn ingest_one_image_asset(
|
||||
app: &App,
|
||||
asset: &RawAsset,
|
||||
idx: u32,
|
||||
total: u32,
|
||||
chunk_policy: &ChunkPolicy,
|
||||
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
|
||||
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
|
||||
existing_doc_ids: &std::collections::HashSet<String>,
|
||||
image_pipeline: &ImagePipeline<'_>,
|
||||
force_reingest: bool,
|
||||
progress: Option<&std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
) -> anyhow::Result<kebab_core::IngestItem> {
|
||||
let ocr_engine = image_pipeline.ocr_engine;
|
||||
let caption_llm = image_pipeline.caption_llm;
|
||||
@@ -1722,6 +1821,17 @@ fn ingest_one_image_asset(
|
||||
.chunk(&canonical, chunk_policy)
|
||||
.context("kb-chunk::MdHeadingV1Chunker::chunk (image)")?;
|
||||
|
||||
// v0.24.0: surface chunk count for the image path too (phase timing is
|
||||
// markdown-only, but AssetChunked is consistent across media).
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetChunked {
|
||||
idx,
|
||||
total,
|
||||
chunks: u32::try_from(chunks.len()).unwrap_or(u32::MAX),
|
||||
},
|
||||
);
|
||||
|
||||
// 5. Persist + embed — identical sequence to markdown.
|
||||
// Stamp chunker + embedding versions (image uses MdHeadingV1Chunker
|
||||
// for its single-block doc, so we record that version).
|
||||
@@ -2127,6 +2237,8 @@ fn sweep_deleted_files(
|
||||
fn ingest_one_pdf_asset(
|
||||
app: &App,
|
||||
asset: &RawAsset,
|
||||
idx: u32,
|
||||
total: u32,
|
||||
chunk_policy: &ChunkPolicy,
|
||||
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
|
||||
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
|
||||
@@ -2330,6 +2442,16 @@ fn ingest_one_pdf_asset(
|
||||
.chunk(&canonical, chunk_policy)
|
||||
.context("kb-chunk::PdfPageV1Chunker::chunk")?;
|
||||
|
||||
// v0.24.0: surface chunk count for the PDF path too.
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
crate::ingest_progress::IngestEvent::AssetChunked {
|
||||
idx,
|
||||
total,
|
||||
chunks: u32::try_from(chunks.len()).unwrap_or(u32::MAX),
|
||||
},
|
||||
);
|
||||
|
||||
// Stamp chunker + embedding versions so Task 7's skip detection has
|
||||
// data on the second run.
|
||||
canonical.last_chunker_version = Some(chunker.chunker_version());
|
||||
|
||||
@@ -69,40 +69,74 @@ fn progress_event_sequence_matches_design_section_2_4a() {
|
||||
other => panic!("expected Completed last, got {other:?}"),
|
||||
}
|
||||
|
||||
// Middle: 3 AssetStarted/AssetFinished pairs in monotonic idx order.
|
||||
let asset_events: Vec<&IngestEvent> = events[2..events.len() - 1].iter().collect();
|
||||
assert_eq!(
|
||||
asset_events.len(),
|
||||
6,
|
||||
"expected 3 (Started + Finished) pairs, got {asset_events:?}"
|
||||
);
|
||||
for (chunk_idx, pair) in asset_events.chunks(2).enumerate() {
|
||||
let expected_idx = chunk_idx as u32 + 1;
|
||||
match (pair[0], pair[1]) {
|
||||
(
|
||||
IngestEvent::AssetStarted {
|
||||
idx: si,
|
||||
total: st,
|
||||
media,
|
||||
..
|
||||
},
|
||||
IngestEvent::AssetFinished {
|
||||
idx: fi,
|
||||
total: ft,
|
||||
result,
|
||||
chunks,
|
||||
},
|
||||
) => {
|
||||
assert_eq!(*si, expected_idx, "Started idx mismatch: {pair:?}");
|
||||
assert_eq!(*fi, expected_idx, "Finished idx mismatch: {pair:?}");
|
||||
assert_eq!(*st, 3, "Started total mismatch");
|
||||
assert_eq!(*ft, 3, "Finished total mismatch");
|
||||
assert_eq!(media, "markdown", "fixture is markdown only");
|
||||
assert_eq!(*result, IngestItemKind::New, "first ingest → New");
|
||||
assert!(*chunks >= 1, "chunks: {pair:?}");
|
||||
// Middle (v0.24.0 ordering invariant §2.4a): per asset the stream is
|
||||
// AssetStarted < AssetChunked < [ExpansionProgress*] < AssetTimings
|
||||
// < AssetFinished
|
||||
// Expansion is disabled in the lexical fixture, so no ExpansionProgress
|
||||
// frames appear here — but AssetChunked + AssetTimings are emitted for
|
||||
// every markdown asset.
|
||||
let middle = &events[2..events.len() - 1];
|
||||
|
||||
// 3 AssetStarted events, monotonic idx 1..=3, all markdown, total = 3.
|
||||
let started: Vec<u32> = middle
|
||||
.iter()
|
||||
.filter_map(|e| match e {
|
||||
IngestEvent::AssetStarted {
|
||||
idx, total, media, ..
|
||||
} => {
|
||||
assert_eq!(*total, 3, "Started total mismatch: {e:?}");
|
||||
assert_eq!(media, "markdown", "fixture is markdown only: {e:?}");
|
||||
Some(*idx)
|
||||
}
|
||||
other => panic!("expected Started+Finished pair, got {other:?}"),
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(started, vec![1, 2, 3], "AssetStarted idx order: {middle:?}");
|
||||
|
||||
// 3 AssetFinished events, monotonic idx 1..=3, each New with ≥1 chunk.
|
||||
let finished: Vec<u32> = middle
|
||||
.iter()
|
||||
.filter_map(|e| match e {
|
||||
IngestEvent::AssetFinished {
|
||||
idx,
|
||||
total,
|
||||
result,
|
||||
chunks,
|
||||
} => {
|
||||
assert_eq!(*total, 3, "Finished total mismatch: {e:?}");
|
||||
assert_eq!(*result, IngestItemKind::New, "first ingest → New: {e:?}");
|
||||
assert!(*chunks >= 1, "chunks: {e:?}");
|
||||
Some(*idx)
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(finished, vec![1, 2, 3], "AssetFinished idx order: {middle:?}");
|
||||
|
||||
// v0.24.0 additive events: exactly one AssetChunked + one AssetTimings
|
||||
// per asset, each strictly bracketed by that asset's Started / Finished.
|
||||
for target in 1u32..=3 {
|
||||
let started_at = middle
|
||||
.iter()
|
||||
.position(|e| matches!(e, IngestEvent::AssetStarted { idx, .. } if *idx == target))
|
||||
.unwrap_or_else(|| panic!("missing AssetStarted for idx {target}: {middle:?}"));
|
||||
let finished_at = middle
|
||||
.iter()
|
||||
.position(|e| matches!(e, IngestEvent::AssetFinished { idx, .. } if *idx == target))
|
||||
.unwrap_or_else(|| panic!("missing AssetFinished for idx {target}: {middle:?}"));
|
||||
let chunked_at = middle
|
||||
.iter()
|
||||
.position(|e| matches!(e, IngestEvent::AssetChunked { idx, chunks, .. } if *idx == target && *chunks >= 1))
|
||||
.unwrap_or_else(|| panic!("missing AssetChunked for idx {target}: {middle:?}"));
|
||||
let timings_at = middle
|
||||
.iter()
|
||||
.position(|e| matches!(e, IngestEvent::AssetTimings { idx, .. } if *idx == target))
|
||||
.unwrap_or_else(|| panic!("missing AssetTimings for idx {target}: {middle:?}"));
|
||||
assert!(
|
||||
started_at < chunked_at && chunked_at < timings_at && timings_at < finished_at,
|
||||
"idx {target} ordering: started={started_at} chunked={chunked_at} \
|
||||
timings={timings_at} finished={finished_at}: {middle:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -157,6 +157,54 @@ impl ProgressDisplay {
|
||||
// in Completed handles the final state. No per-asset bar update
|
||||
// here avoids the duplicate-frame artifact in TTY scrollback.
|
||||
}
|
||||
// v0.24.0: asset-internal phase visibility. AssetChunked /
|
||||
// ExpansionProgress use the bar *message* (live sub-progress for
|
||||
// the current asset) — distinct from the per-file position draw,
|
||||
// so a single large document no longer looks frozen. AssetTimings
|
||||
// prints a one-line breakdown when the asset finishes.
|
||||
IngestEvent::AssetChunked { idx, total, chunks } => {
|
||||
if let Some(bar) = self.bar.as_ref() {
|
||||
bar.set_message(format!("→ {chunks} chunks"));
|
||||
}
|
||||
if !tty && !quiet {
|
||||
let mut err = std::io::stderr().lock();
|
||||
let _ = writeln!(err, "ingest: {idx}/{total} → {chunks} chunks");
|
||||
}
|
||||
}
|
||||
IngestEvent::ExpansionProgress {
|
||||
done, chunks, ..
|
||||
} => {
|
||||
if let Some(bar) = self.bar.as_ref() {
|
||||
bar.set_message(format!("별칭 확장 {done}/{chunks}"));
|
||||
}
|
||||
// Non-TTY: suppressed by default — throttled though it is, one
|
||||
// line per emit would still spam CI logs. The bar message
|
||||
// covers the interactive case; --json carries every frame.
|
||||
}
|
||||
IngestEvent::AssetTimings {
|
||||
parse_ms,
|
||||
chunk_ms,
|
||||
expansion_ms,
|
||||
embed_ms,
|
||||
store_ms,
|
||||
..
|
||||
} => {
|
||||
if let Some(bar) = self.bar.as_ref() {
|
||||
bar.set_message("");
|
||||
}
|
||||
if !quiet {
|
||||
let mut err = std::io::stderr().lock();
|
||||
let _ = writeln!(
|
||||
err,
|
||||
" ⏱ parse {} · chunk {} · expand {} · embed {} · store {}",
|
||||
fmt_ms(*parse_ms),
|
||||
fmt_ms(*chunk_ms),
|
||||
fmt_ms(*expansion_ms),
|
||||
fmt_ms(*embed_ms),
|
||||
fmt_ms(*store_ms),
|
||||
);
|
||||
}
|
||||
}
|
||||
IngestEvent::Completed { counts } => {
|
||||
if let Some(bar) = self.bar.take() {
|
||||
bar.finish_and_clear();
|
||||
@@ -239,6 +287,17 @@ fn emit_json(event: &IngestEvent) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Render a phase duration (milliseconds) compactly for the human-mode
|
||||
/// `AssetTimings` line: `< 1000ms` stays in `ms`, larger spans collapse to
|
||||
/// one-decimal seconds so a 45-second expansion reads `45.0s`, not `45000ms`.
|
||||
fn fmt_ms(ms: u64) -> String {
|
||||
if ms >= 1000 {
|
||||
format!("{:.1}s", ms as f64 / 1000.0)
|
||||
} else {
|
||||
format!("{ms}ms")
|
||||
}
|
||||
}
|
||||
|
||||
/// Format the current wall-clock as RFC 3339 — used by `wire_ingest_progress`
|
||||
/// so every emitted event carries an `ts` field per §2.4a / the wire schema.
|
||||
pub(crate) fn now_rfc3339() -> anyhow::Result<String> {
|
||||
@@ -285,6 +344,15 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fmt_ms_switches_unit_at_one_second() {
|
||||
assert_eq!(fmt_ms(0), "0ms");
|
||||
assert_eq!(fmt_ms(999), "999ms");
|
||||
assert_eq!(fmt_ms(1000), "1.0s");
|
||||
assert_eq!(fmt_ms(45_000), "45.0s");
|
||||
assert_eq!(fmt_ms(1500), "1.5s");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn now_rfc3339_parses_back() {
|
||||
let s = now_rfc3339().unwrap();
|
||||
|
||||
@@ -154,7 +154,14 @@ fn apply_event(state: &mut IngestState, event: IngestEvent) {
|
||||
}
|
||||
// v0.20.0 sub-item 1: per-page PDF OCR events — TUI does not
|
||||
// surface per-page OCR progress in v1; no counter to update.
|
||||
IngestEvent::PdfOcrStarted { .. } | IngestEvent::PdfOcrFinished { .. } => {}
|
||||
IngestEvent::PdfOcrStarted { .. }
|
||||
| IngestEvent::PdfOcrFinished { .. }
|
||||
// v0.24.0 asset-internal phase events: the status-bar reducer tracks
|
||||
// per-asset counters, not sub-asset phase progress, so these are
|
||||
// no-ops here (the CLI / --json surfaces render them).
|
||||
| IngestEvent::AssetChunked { .. }
|
||||
| IngestEvent::ExpansionProgress { .. }
|
||||
| IngestEvent::AssetTimings { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,9 @@
|
||||
"scan_completed",
|
||||
"asset_started",
|
||||
"asset_finished",
|
||||
"asset_chunked",
|
||||
"expansion_progress",
|
||||
"asset_timings",
|
||||
"embed_batch_started",
|
||||
"embed_batch_finished",
|
||||
"pdf_ocr_started",
|
||||
@@ -33,7 +36,13 @@
|
||||
"enum": ["new", "updated", "skipped", "error"],
|
||||
"description": "asset_finished: per-asset outcome (mirrors `ingest_report.v1.items[].kind`)."
|
||||
},
|
||||
"chunks": { "type": "integer", "minimum": 0, "description": "asset_finished: chunk count produced for this asset." },
|
||||
"chunks": { "type": "integer", "minimum": 0, "description": "asset_finished / asset_chunked / expansion_progress (v0.24.0): chunk count produced for this asset." },
|
||||
"done": { "type": "integer", "minimum": 0, "description": "expansion_progress (v0.24.0, additive): chunks processed so far in the per-chunk alias-expansion loop (cache hits included). Throttled: emitted at most every 25 chunks or once per second, plus a final frame where done == chunks." },
|
||||
"parse_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): parse phase wall-clock (ms). Markdown path only." },
|
||||
"chunk_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): chunk phase wall-clock (ms). Markdown path only." },
|
||||
"expansion_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): alias-expansion phase wall-clock (ms). Markdown path only; 0 when expansion is disabled." },
|
||||
"embed_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): embed + vector phase wall-clock (ms) — embedding, vector upsert, and stale-vector purge. Markdown path only." },
|
||||
"store_ms": { "type": "integer", "minimum": 0, "description": "asset_timings (v0.24.0, additive): SQLite persist phase wall-clock (ms) — put_asset/document/blocks/chunks only. Markdown path only." },
|
||||
"n_chunks": { "type": "integer", "minimum": 0, "description": "embed_batch_started / embed_batch_finished: chunks in this embedding batch." },
|
||||
"ms": { "type": "integer", "minimum": 0, "description": "embed_batch_finished / pdf_ocr_finished: wall-clock duration (ms). pdf_ocr_finished skip path 의 의미는 mixed (DCTDecode 부재 시 0, engine 실패 시 latency-before-bail)." },
|
||||
"chars": { "type": "integer", "minimum": 0, "description": "pdf_ocr_finished: char count of OCR result. Skip 시 0." },
|
||||
|
||||
@@ -14,6 +14,61 @@ historical contract that was implemented; this file accumulates the
|
||||
deltas so phase 5+ readers can find the live behavior without diffing
|
||||
git history.
|
||||
|
||||
## 2026-06-02 — 상세 ingest 진행 로깅 (asset 내부 phase 가시화, v0.24.0)
|
||||
|
||||
**무엇이 문제였나.** ingest 진행 이벤트가 asset(문서) 단위(`asset_started` /
|
||||
`asset_finished`)뿐이라 한 문서 내부의 parse / chunk / **expansion(별칭 LLM,
|
||||
청크당 순차 호출)** / embed / store 가 깜깜했다. expansion 은 청크당 ~1~4s
|
||||
(원격 GPU Ollama)이고 큰 문서는 청크 수백~천 개 → 그 한 문서에서 수십 분이
|
||||
걸리는데, 진행바는 `1/5150` 에 멈춘 듯 보여 사용자가 병목을 못 봤다.
|
||||
|
||||
**무엇을 추가했나 (wire `ingest_progress.v1` additive, 호환 유지).**
|
||||
`IngestEvent` 에 세 변이 추가 — `#[serde(tag="kind")]` 라 신규 `kind` 추가는
|
||||
wire v1 호환:
|
||||
|
||||
- `asset_chunked { idx, total, chunks }` — 청킹 직후(expansion/embed 전) 즉시
|
||||
"이 문서가 N청크" 노출. markdown / image / pdf 세 경로 모두 emit.
|
||||
- `expansion_progress { idx, total, done, chunks }` — expansion 루프 중
|
||||
**스로틀** 발신(매 25청크 또는 ≥1s, 종료 시 `done == chunks` 1프레임 더).
|
||||
캐시 히트 청크도 `done` 에 포함(warm 재색인 fast-forward 가시화). 채널 폭주
|
||||
방지 — 매 청크 emit 금지.
|
||||
- `asset_timings { idx, total, parse_ms, chunk_ms, expansion_ms, embed_ms,
|
||||
store_ms }` — asset 처리 phase 별 소요시간. **markdown 경로만** emit
|
||||
(image/pdf 는 phase shape 가 달라 생략; AssetChunked 만 emit).
|
||||
|
||||
**설계 결정 — AssetTimings 이벤트 vs AssetFinished 필드.** IMPL_BRIEF §1 은
|
||||
`AssetFinished` 에 optional phase-timing 필드를, §2 는 대안으로 신규
|
||||
`AssetTimings` 이벤트를 제시(권장). 후자를 택함 — `AssetFinished` 는 호출부
|
||||
(`ingest_with_config_progress` 루프)에서 만들어지는데 timing 데이터는
|
||||
`ingest_one_asset` 내부에만 있어, 필드를 채우려면 `kebab_core::IngestItem`
|
||||
(wire-stable struct) 변경 또는 별도 plumbing 이 필요. `ingest_one_asset` 가
|
||||
`progress` 핸들을 이미 들고 있으므로 새 이벤트를 직접 emit 하는 쪽이 crate
|
||||
경계(kebab-core 불변)도 지키고 더 깔끔. `AssetFinished` 는 손대지 않음.
|
||||
|
||||
**CLI 렌더(`kebab-cli` progress.rs).** `asset_chunked` → 진행바 message `→ N
|
||||
chunks`. `expansion_progress` → message `별칭 확장 {done}/{chunks}` (라이브).
|
||||
`asset_timings` → asset 종료 시 `⏱ parse Xs · chunk Ys · expand Zs · embed Ws
|
||||
· store Vs` 한 줄(`fmt_ms`: <1s 는 ms, ≥1s 는 1-decimal 초). `--json` 은
|
||||
`emit_json` 이 임의 이벤트를 직렬화하므로 자동 처리. `--quiet` 억제, 비-TTY
|
||||
expansion_progress 는 로그 폭주 방지로 기본 억제(진행바 message 로 커버).
|
||||
|
||||
**검증.** `cargo clippy --workspace --all-targets -- -D warnings` exit 0,
|
||||
`cargo test -p kebab-app -p kebab-cli` exit 0. 단위 테스트: ingest_progress.rs
|
||||
(3 신규 변이 직렬화 `kind` 판별 + 순서 불변식 재작성), progress.rs(`fmt_ms` 단위
|
||||
전환), 통합(`--json`/human stderr 에 새 이벤트 흐름). 실동작 smoke: 2-문서 ingest
|
||||
의 `--json` 에 `asset_chunked`/`asset_timings` 출현 + human `⏱ parse…·store…` 라인
|
||||
확인. expansion 라이브 카운터는 원격 LLM 필요라 단위/통합으로 커버.
|
||||
|
||||
**리뷰 반영.** (1) `store_ms` 경계 정정 — stale-vector orphan purge(LanceDB I/O)를
|
||||
`store_ms`(SQLite persist 전용)에서 빼 `embed_ms`(vector phase)로 이동. 진단
|
||||
정확도: store_ms 가 이제 SQLite put_* 만 의미(편집 재색인 시 920ms 가 실은 벡터
|
||||
삭제였던 오귀속 제거). purge 는 여전히 unconditional + 새 upsert 이전 실행 —
|
||||
기능 동등. (2) 최종 `expansion_progress` 프레임을 `done != last_done` 로 가드 —
|
||||
chunks 가 throttle 배수일 때의 중복 프레임 + chunks==0 시 0/0 프레임 제거.
|
||||
|
||||
**알려진 한계.** image/pdf 경로는 phase timing 없음(AssetChunked 만).
|
||||
expansion_progress 비-TTY 억제는 의도적(필요 시 `--json` 으로 전량 관측).
|
||||
|
||||
## 2026-06-02 — ingest 백엔드/디바이스 표시 + KB 이전 문서 (v0.23.1)
|
||||
|
||||
**동기.** Metal 빌드가 실제로 GPU 를 쓰는지 사용자가 터미널에서 못 봐서 Activity
|
||||
|
||||
Reference in New Issue
Block a user