Merge pull request 'fix(kebab-app): p9-fb-23 — Incremental ingest (skip unchanged docs)' (#98) from fix/p9-fb-23-incremental-ingest into main
Reviewed-on: #98
This commit was merged in pull request #98.
This commit is contained in:
@@ -59,6 +59,7 @@ P0~P5 직렬. P6~P9 P5 이후 병렬 가능.
|
||||
- **2026-05-03 P9 도그푸딩 후속 (p9-fb-12 partial)** — TUI vim-style mode machine (절반 ship — heuristic 제거는 follow-up). `kebab_tui::Mode::{Normal, Insert}` enum + `Mode::auto_for(pane)` (Library/Inspect/Jobs → Normal, Search/Ask → Insert) + `Mode::label()` (`"-- NORMAL --"` / `"-- INSERT --"`) + `App.mode: Mode` field. run loop `mode_intercept(app, key)` 가 dispatch 전 intercept — Insert 에서 `Esc` → Normal (어디서나), Normal 에서 `i` → Insert (Library/Inspect/Jobs 만, Search/Ask 는 자동 Insert 라 `i` 가 typed char). 헤더 우측에 mode label colored (Insert = Role::Success green, Normal = Role::Heading cyan+bold). pane 전환 시 `app.mode = Mode::auto_for(p)` 자동 flip. **Deferred (HOTFIXES entry)**: `is_typing_mod` (search) + input-empty heuristic (ask) 는 후속 PR 에서 mode-authoritative 로 교체 — 현재는 user-visible signal (label + auto flip + i/Esc) 만 ship, 키 dispatch 는 heuristic 유지. spec status `in_progress` (not `completed`). spec: `tasks/p9/p9-fb-12-tui-mode-machine.md`.
|
||||
- **2026-05-03 P9 도그푸딩 후속 (p9-fb-12 follow-up)** — heuristic 제거 (partial PR 의 deferred 부분 finalize). `search::is_typing_mod` (CTRL/ALT chord filter) 함수 삭제 + `ask::handle_key_ask` 의 input-empty heuristic 삭제. 새 dispatch: `search::handle_key_search` 의 `i` (chunk inspect) / `g` (editor jump) pre-pass 가 `state.mode == Mode::Normal` 일 때만 fire (Insert 에서는 typed char). main match 의 `j`/`k`/Char(c) 가 `state.mode` 로 분기 (Normal → 선택 이동, Insert → input.push). `ask::handle_key_ask` 의 `e`/`j`/`k` 도 동일 패턴 — Normal 에서 toggle/scroll, Insert 에서 input typing. 테스트 fixture (`tests/search.rs::fresh_app`, `tests/ask.rs::fresh_app`) 가 `app.mode = Mode::auto_for(focus)` 로 run-loop 동작 mirror. 기존 nav 테스트 (j_k_move, g_key_enqueues, e_toggles) 는 explicit `app.mode = Mode::Normal` 추가, 신규 4 테스트 (j_in_insert_types / arbitrary_char_in_normal_noop / e_types_in_insert / jk_scroll-in-normal-type-in-insert) 가 mode-authoritative 동작 pin. spec status `in_progress` → `completed`. spec: `tasks/p9/p9-fb-12-tui-mode-machine.md`.
|
||||
- **2026-05-03 P9 도그푸딩 후속 (p9-fb-10 partial)** — TUI CJK rendering helpers. `kebab-tui::input::{display_width, truncate_to_display_width}` 신규 — `unicode-width` 위에서 column-단위 width 계산 (ASCII=1, Hangul/CJK/fullwidth=2, combining=0) + char-boundary 안전 truncate (wide char 를 split 없이 keep-or-omit, ellipsis 1 col). library.rs 의 중복 `truncate_to_display_width` private fn 제거 — 단일 source. 9 unit tests (ASCII / Hangul / Japanese / mixed / truncate fits·overflow·zero-cols·wide-char-boundary / `String::pop` char-aware sanity) + 1 integration render test (Korean + Japanese fixture, TestBackend 80×20, 한글/일본어 글자가 frame 에 살아남음 확인). spec 의 `InputBuffer` struct (cursor 가 column 단위 wide-char width 추적) 도입은 follow-up — Ask/Search/Editor pane 의 String + cursor 일괄 마이그레이션이 회귀 표면이 커서 helper 만 먼저 머지. backspace 는 모든 pane 이 이미 `String::pop()` 사용 (char-aware) → byte-boundary 안전성 helper 없이도 확보. crossterm 0.28 이 native IME composing 미노출 — preedit handling out of scope. spec status `planned` → `in_progress`. spec: `tasks/p9/p9-fb-10-tui-cjk-input.md`.
|
||||
- **2026-05-04 P9 post-도그푸딩 (p9-fb-23)** — Incremental ingest. 사용자 도그푸딩 피드백: 변하지 않은 문서는 다시 ingest 하지 않기. blake3 checksum + parser_version + chunker_version + embedding_version 4개 input 이 모두 일치할 때 parse/chunk/embed/vector upsert 모두 회피. SQLite V006 마이그레이션 — `documents` 에 `last_chunker_version` + `last_embedding_version` 컬럼 추가. 신규 `IngestItemKind::Unchanged` variant + `IngestReport.unchanged` + `AggregateCounts.unchanged` (wire schema additive). `IngestOpts { progress, cancel, force_reingest }` struct 도입 — `AskOpts` 패턴. `--force-reingest` CLI flag 로 skip 우회. 비용 dominator (fastembed) 가 변경된 / 새 doc 에만 발생. spec: `tasks/p9/p9-fb-23-incremental-ingest.md`. HOTFIXES `2026-05-04 — p9-fb-23` 항목이 version cascade 명시 동작의 source of truth.
|
||||
- **2026-05-04 P9 post-도그푸딩 (p9-fb-24)** — TUI status/key bar + Library 컬럼 헤더 + Ask/Inspect PgUp/PgDn. 사용자 도그푸딩 3 건 (Library 컬럼 의미 부재, 페이지 스크롤 키 부재, 상태바 + 버전 정보 항상 노출 요청) 을 단일 PR 로 통합. bottom 영역을 status bar (1 row, version + pane + docs + dynamic state) + key hint bar (1 row, 기존 `footer_hints` 그대로) 두 줄로 분할; 기존 ingest progress dedicated row 는 status bar 의 dynamic slot 에 흡수 (priority cascade: streaming → searching → indexing → idle). Library `List` 위에 `format_doc_header` 행 + Layout 분할로 헤더 표시 (TITLE / TAGS / UPDATED / CHUNKS, display-width 정렬). `kebab-tui::pager::PAGE_STEP = 10` 신규 — Ask 의 PgUp/PgDn 추가 + Inspect 의 기존 +/-10 hardcode 가 같은 상수 참조로 통일. Ask 의 page-scroll 은 `j`/`k` 와 동일하게 `follow_tail = false` 로 freeze. spec: `tasks/p9/p9-fb-24-tui-affordances.md`. HOTFIXES `2026-05-04 — p9-fb-24` 항목이 footer 단행 row (p9-fb-13) + ingest dedicated row (p9-fb-03) 와의 layout 충돌의 source of truth.
|
||||
- **2026-05-04 P9 post-도그푸딩 (p9-fb-22)** — TUI 입력 cursor mid-string 편집 + Ask follow-tail auto-scroll. Gitea #94 (입력 후 커서 이동 안 됨) + #95 (새 응답 자동 스크롤 안 됨) 두 건. `InputBuffer` 의 cursor 모델을 byte-position 기반으로 재구성 — cursor 가 끝일 때 기존 append 동작과 backwards-compatible, mid-string 일 때는 `←/→/Home/End/Delete` 로 편집. `AskState` 에 `follow_tail: bool` (default true). `Paragraph::line_count(width)` (ratatui `unstable-rendered-line-info` feature 활성화) 로 매 프레임 wrapped row 수 계산해 follow-tail 시 scroll 을 bottom 에 pin. `j`/`k` 가 follow-tail 끄고 `Shift-G` 가 다시 켬. 12 신규 InputBuffer unit + 6 신규 Ask integration. spec: `tasks/p9/p9-fb-22-tui-cursor-and-autoscroll.md`. HOTFIXES 항목 `2026-05-04` 가 live cursor 모델 source of truth.
|
||||
- **2026-05-03 P9 post-도그푸딩 (p9-fb-21)** — `i` 가 universal Normal→Insert toggle (모든 pane). 이전 mode_intercept 는 Library/Inspect/Jobs 만 `i` intercept 였고 Search/Ask 는 fall-through (자동 INSERT 가정). 사용자가 Esc 로 NORMAL 로 빠진 후 Insert 복귀 키 없어 dead-end → 도그푸딩에서 보고됨. mode_intercept 의 `(Char('i'), Normal, _)` arm 이 pane 무관 모두 INSERT flip. Search 의 chunk inspect 키 `i`→`o` rebind (vim "open") 으로 충돌 해소. footer hint 모든 (pane, mode, filter) 조합 첫 fragment = `F1 도움말` (cheatsheet binding discoverability). Search/Ask Normal hint 에 `i 입력모드` fragment 추가. cheatsheet popup Global/Search/Ask section 갱신. 6 신규 unit + 3 기존 갱신. spec: `tasks/p9/p9-fb-21-tui-insert-key-discoverability.md` (status `completed` 직접). HOTFIXES 항목이 Search `i`→`o` rebind 의 source of truth.
|
||||
|
||||
@@ -70,7 +70,7 @@ kebab doctor
|
||||
| 명령 | 동작 |
|
||||
|------|------|
|
||||
| `kebab init` | XDG 경로에 데이터 디렉토리 + config.toml 생성 |
|
||||
| `kebab ingest [<path>]` | Markdown / 이미지 / PDF 색인 (idempotent). TTY 에서는 stderr 진행 바, non-TTY (CI / pipe) 는 stderr 한 줄씩, `--json` 은 stdout 에 `ingest_progress.v1` 라인 streaming 후 마지막에 `ingest_report.v1`. Ctrl-C 한 번이면 현재 asset 마무리 후 abort (부분 commit 보존, idempotent re-run), 두 번째 Ctrl-C 는 hard exit. Markdown title 이 frontmatter 에 없어도 첫 H1 → H2 → 첫 paragraph 80 자 → 파일명 순으로 자동 채움 (parser_version `md-frontmatter-v2`) — 기존 색인된 doc 도 다음 ingest 에서 새 title 로 갱신 |
|
||||
| `kebab ingest [<path>]` | Markdown / 이미지 / PDF 색인 (idempotent). TTY 에서는 stderr 진행 바, non-TTY (CI / pipe) 는 stderr 한 줄씩, `--json` 은 stdout 에 `ingest_progress.v1` 라인 streaming 후 마지막에 `ingest_report.v1`. Ctrl-C 한 번이면 현재 asset 마무리 후 abort (부분 commit 보존, idempotent re-run), 두 번째 Ctrl-C 는 hard exit. Markdown title 이 frontmatter 에 없어도 첫 H1 → H2 → 첫 paragraph 80 자 → 파일명 순으로 자동 채움 (parser_version `md-frontmatter-v2`) — 기존 색인된 doc 도 다음 ingest 에서 새 title 로 갱신. **Incremental** (p9-fb-23): 두 번째 이후의 ingest 는 변하지 않은 doc (blake3 + parser/chunker/embedder version 모두 동일) 의 parse/chunk/embed/vector upsert 를 자동 스킵. final summary 에 `N unchanged` 카운트 표시. `--force-reingest` 로 skip 무시 강제 재처리. |
|
||||
| `kebab search --mode {lexical,vector,hybrid} "<query>" [--no-cache]` | 검색. hybrid는 RRF fusion, citation 포함. 같은 process 안에서 동일 query (NFKC + trim + lowercase 정규화) 반복 시 in-process LRU 캐시 hit (capacity = `[search] cache_capacity`, default 256). `--no-cache` 로 강제 bypass — 디버깅용. ingest commit 발생 시 `kv['corpus_revision']` bump 으로 모든 entry 자동 stale |
|
||||
| `kebab list docs` | 색인된 문서 목록 |
|
||||
| `kebab inspect doc <id>` / `kebab inspect chunk <id>` | raw record 보기 |
|
||||
|
||||
@@ -28,6 +28,10 @@ pub struct AggregateCounts {
|
||||
pub new: u32,
|
||||
pub updated: u32,
|
||||
pub skipped: u32,
|
||||
/// p9-fb-23: assets whose checksum + all version inputs matched the
|
||||
/// existing DB record — parse / chunk / embed / vector upsert all
|
||||
/// skipped.
|
||||
pub unchanged: u32,
|
||||
pub errors: u32,
|
||||
pub chunks_indexed: u32,
|
||||
pub embeddings_indexed: u32,
|
||||
|
||||
@@ -186,6 +186,22 @@ fn load_config() -> anyhow::Result<kebab_config::Config> {
|
||||
|
||||
// ── ingest ────────────────────────────────────────────────────────────────
|
||||
|
||||
/// p9-fb-23: optional per-call ingest controls. Kept as a struct (vs.
|
||||
/// a growing positional arg list) so future flags (e.g. `dry_run`,
|
||||
/// per-asset `concurrency`) land additively without churning every
|
||||
/// caller. Mirrors the `AskOpts` pattern from p9-fb-15.
|
||||
#[derive(Default)]
|
||||
pub struct IngestOpts {
|
||||
/// Streaming progress sink. `None` suppresses emission entirely.
|
||||
pub progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
/// Cooperative cancel token. `None` = uncancellable.
|
||||
pub cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
|
||||
/// p9-fb-23: when `true`, the per-asset early-skip block is bypassed
|
||||
/// — every asset is re-parsed / re-chunked / re-embedded as if the
|
||||
/// DB were empty. Default `false` preserves the auto-skip path.
|
||||
pub force_reingest: bool,
|
||||
}
|
||||
|
||||
pub fn ingest(scope: SourceScope, summary_only: bool) -> anyhow::Result<IngestReport> {
|
||||
let config = load_config()?;
|
||||
ingest_with_config(config, scope, summary_only)
|
||||
@@ -226,12 +242,16 @@ pub fn ingest_with_config_progress(
|
||||
ingest_with_config_cancellable(config, scope, summary_only, progress, None)
|
||||
}
|
||||
|
||||
/// Config + progress + cancel variant (p9-fb-04). The caller injects
|
||||
/// an `Arc<AtomicBool>` cancel token; setting it to `true` causes the
|
||||
/// ingest loop to break at the next step boundary (asset loop iter
|
||||
/// start), emit `IngestEvent::Aborted { counts: <partial> }`, and
|
||||
/// return `Ok(IngestReport)` with whatever assets were committed
|
||||
/// before cancellation. Per design §10:
|
||||
/// Config + opts variant (p9-fb-23). Supersedes the positional
|
||||
/// `ingest_with_config_cancellable` fn; callers now pass an
|
||||
/// [`IngestOpts`] struct so future knobs (e.g. `force_reingest`,
|
||||
/// `dry_run`) land additively without churning every call site.
|
||||
///
|
||||
/// Existing callers that still pass positional `progress` + `cancel`
|
||||
/// should use [`ingest_with_config_cancellable`], which remains as a
|
||||
/// thin wrapper that builds `IngestOpts` and forwards here.
|
||||
///
|
||||
/// Per design §10 (cancellation contract — unchanged from p9-fb-04):
|
||||
///
|
||||
/// - The current in-flight asset finishes (rollback would break
|
||||
/// idempotent re-run). Subsequent assets are skipped.
|
||||
@@ -242,23 +262,22 @@ pub fn ingest_with_config_progress(
|
||||
/// doc_id recipes).
|
||||
///
|
||||
/// CLI's `Ctrl-C` SIGINT handler and TUI's `Esc` / `Ctrl-C` both
|
||||
/// flip the same `AtomicBool`. Pass `None` to retain pre-p9-fb-04
|
||||
/// behaviour (uncancellable).
|
||||
/// flip the same `AtomicBool` (via `opts.cancel`).
|
||||
#[doc(hidden)]
|
||||
pub fn ingest_with_config_cancellable(
|
||||
pub fn ingest_with_config_opts(
|
||||
config: kebab_config::Config,
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
|
||||
opts: IngestOpts,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
let progress = progress.as_ref();
|
||||
let progress = opts.progress.as_ref();
|
||||
let cancelled = || {
|
||||
cancel
|
||||
opts.cancel
|
||||
.as_ref()
|
||||
.map(|c| c.load(std::sync::atomic::Ordering::Relaxed))
|
||||
.unwrap_or(false)
|
||||
};
|
||||
let force_reingest = opts.force_reingest;
|
||||
let started_instant = std::time::Instant::now();
|
||||
|
||||
let app = App::open_with_config(config)?;
|
||||
@@ -347,6 +366,7 @@ pub fn ingest_with_config_cancellable(
|
||||
let mut new_count: u32 = 0;
|
||||
let mut updated_count: u32 = 0;
|
||||
let mut skipped_count: u32 = 0;
|
||||
let mut unchanged_count: u32 = 0;
|
||||
let mut error_count: u32 = 0;
|
||||
// Aggregate counts surfaced into `ingest_runs` (and tracing). Not
|
||||
// exposed on `IngestReport` today — `kebab_core::IngestReport` is a
|
||||
@@ -392,6 +412,7 @@ pub fn ingest_with_config_cancellable(
|
||||
vector_store.as_ref(),
|
||||
&existing_doc_ids,
|
||||
&image_pipeline,
|
||||
force_reingest,
|
||||
);
|
||||
|
||||
let item = match item {
|
||||
@@ -445,6 +466,9 @@ pub fn ingest_with_config_cancellable(
|
||||
kebab_core::IngestItemKind::Skipped => {
|
||||
skipped_count = skipped_count.saturating_add(1)
|
||||
}
|
||||
kebab_core::IngestItemKind::Unchanged => {
|
||||
unchanged_count = unchanged_count.saturating_add(1)
|
||||
}
|
||||
kebab_core::IngestItemKind::Error => {
|
||||
error_count = error_count.saturating_add(1)
|
||||
}
|
||||
@@ -585,6 +609,7 @@ pub fn ingest_with_config_cancellable(
|
||||
new: new_count,
|
||||
updated: updated_count,
|
||||
skipped: skipped_count,
|
||||
unchanged: unchanged_count,
|
||||
errors: error_count,
|
||||
chunks_indexed,
|
||||
embeddings_indexed,
|
||||
@@ -626,12 +651,42 @@ pub fn ingest_with_config_cancellable(
|
||||
new: new_count,
|
||||
updated: updated_count,
|
||||
skipped: skipped_count,
|
||||
unchanged: unchanged_count,
|
||||
errors: error_count,
|
||||
duration_ms,
|
||||
items: if summary_only { None } else { Some(items) },
|
||||
})
|
||||
}
|
||||
|
||||
/// Config + progress + cancel variant (p9-fb-04). Retained as a thin
|
||||
/// wrapper around [`ingest_with_config_opts`] for external callers
|
||||
/// (test fixtures, CLI) that pass positional `progress` + `cancel`
|
||||
/// arguments. New callers should prefer [`ingest_with_config_opts`]
|
||||
/// with an explicit [`IngestOpts`].
|
||||
///
|
||||
/// CLI's `Ctrl-C` SIGINT handler and TUI's `Esc` / `Ctrl-C` both
|
||||
/// flip the `cancel` `AtomicBool`. Pass `None` to retain
|
||||
/// pre-p9-fb-04 behaviour (uncancellable).
|
||||
#[doc(hidden)]
|
||||
pub fn ingest_with_config_cancellable(
|
||||
config: kebab_config::Config,
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
ingest_with_config_opts(
|
||||
config,
|
||||
scope,
|
||||
summary_only,
|
||||
IngestOpts {
|
||||
progress,
|
||||
cancel,
|
||||
force_reingest: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Mint a stable 32-hex-char `run_id` for an `ingest_runs` row.
|
||||
/// `(scope, started_at_nanos)` is enough to make two runs with the
|
||||
/// same scope started a nanosecond apart distinguish — same shape as
|
||||
@@ -659,6 +714,105 @@ struct ImagePipeline<'a> {
|
||||
caption_llm: Option<&'a dyn LanguageModel>,
|
||||
}
|
||||
|
||||
/// p9-fb-23 task 7: incremental-ingest early-skip predicate. Shared
|
||||
/// across the markdown / image / PDF per-asset flows. Returns
|
||||
/// `Some(IngestItem { kind: Unchanged, .. })` when ALL FOUR conditions
|
||||
/// hold (per design §9 cascade rule):
|
||||
///
|
||||
/// 1. `force_reingest == false` — caller hasn't asked to bypass skip.
|
||||
/// 2. The freshly-scanned asset's blake3 checksum equals what the
|
||||
/// existing `assets` row stores at the same `workspace_path`.
|
||||
/// 3. The doc keyed on `(workspace_path, asset_id, current_parser_version)`
|
||||
/// exists. If the parser_version changed, `id_for_doc` produces a
|
||||
/// different `doc_id` so the lookup misses → no skip → re-process.
|
||||
/// 4. The existing doc's stamped `last_chunker_version` AND
|
||||
/// `last_embedding_version` match the values the caller is about
|
||||
/// to use (`Some(v) == Some(v)` and `None == None` — see design
|
||||
/// doc for the `None == None` rule when no embedder is configured).
|
||||
///
|
||||
/// Returns `Ok(None)` (proceed with full re-process) when any check
|
||||
/// fails or any DB read errors out — the skip path is opportunistic;
|
||||
/// a missed skip is correct (just slower), a wrong skip would corrupt
|
||||
/// the index.
|
||||
fn try_skip_unchanged(
|
||||
app: &App,
|
||||
asset: &RawAsset,
|
||||
current_parser_version: &ParserVersion,
|
||||
current_chunker_version: &ChunkerVersion,
|
||||
current_embedding_version: Option<&kebab_core::EmbeddingVersion>,
|
||||
force_reingest: bool,
|
||||
) -> anyhow::Result<Option<kebab_core::IngestItem>> {
|
||||
if force_reingest {
|
||||
return Ok(None);
|
||||
}
|
||||
let existing_asset = match app
|
||||
.sqlite
|
||||
.get_asset_by_workspace_path(&asset.workspace_path)
|
||||
{
|
||||
Ok(Some(a)) => a,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
target: "kebab-app",
|
||||
path = %asset.workspace_path.0,
|
||||
error = %e,
|
||||
"skip-check: get_asset_by_workspace_path failed; falling through to re-process"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
if existing_asset.checksum != asset.checksum {
|
||||
return Ok(None);
|
||||
}
|
||||
let candidate_doc_id = kebab_core::id_for_doc(
|
||||
&asset.workspace_path,
|
||||
&asset.asset_id,
|
||||
current_parser_version,
|
||||
);
|
||||
let existing_doc = match app.sqlite.get_document(&candidate_doc_id) {
|
||||
Ok(Some(d)) => d,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
target: "kebab-app",
|
||||
path = %asset.workspace_path.0,
|
||||
error = %e,
|
||||
"skip-check: get_document failed; falling through to re-process"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
let chunker_match = existing_doc.last_chunker_version.as_ref()
|
||||
== Some(current_chunker_version);
|
||||
if !chunker_match {
|
||||
return Ok(None);
|
||||
}
|
||||
let embedder_match = existing_doc.last_embedding_version.as_ref()
|
||||
== current_embedding_version;
|
||||
if !embedder_match {
|
||||
return Ok(None);
|
||||
}
|
||||
tracing::debug!(
|
||||
target: "kebab-app::ingest",
|
||||
path = %asset.workspace_path.0,
|
||||
doc_id = %candidate_doc_id.0,
|
||||
"skip-unchanged: checksum + parser/chunker/embedding versions match"
|
||||
);
|
||||
Ok(Some(kebab_core::IngestItem {
|
||||
kind: kebab_core::IngestItemKind::Unchanged,
|
||||
doc_id: Some(candidate_doc_id),
|
||||
doc_path: asset.workspace_path.clone(),
|
||||
asset_id: Some(asset.asset_id.clone()),
|
||||
byte_len: Some(asset.byte_len),
|
||||
block_count: u32::try_from(existing_doc.blocks.len()).ok(),
|
||||
chunk_count: None,
|
||||
parser_version: Some(existing_doc.parser_version.clone()),
|
||||
chunker_version: existing_doc.last_chunker_version.clone(),
|
||||
warnings: Vec::new(),
|
||||
error: None,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Process a single asset: read bytes, parse, normalize, chunk,
|
||||
/// persist, embed. Per-asset failures bubble up to the caller for
|
||||
/// labelling as `IngestItemKind::Error` — they do NOT abort the
|
||||
@@ -673,6 +827,7 @@ fn ingest_one_asset(
|
||||
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
|
||||
existing_doc_ids: &std::collections::HashSet<String>,
|
||||
image_pipeline: &ImagePipeline<'_>,
|
||||
force_reingest: bool,
|
||||
) -> anyhow::Result<kebab_core::IngestItem> {
|
||||
tracing::debug!(
|
||||
target: "kebab-app::ingest",
|
||||
@@ -696,6 +851,7 @@ fn ingest_one_asset(
|
||||
vector_store,
|
||||
existing_doc_ids,
|
||||
image_pipeline,
|
||||
force_reingest,
|
||||
);
|
||||
}
|
||||
MediaType::Pdf => {
|
||||
@@ -706,6 +862,7 @@ fn ingest_one_asset(
|
||||
embedder,
|
||||
vector_store,
|
||||
existing_doc_ids,
|
||||
force_reingest,
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
@@ -746,6 +903,23 @@ fn ingest_one_asset(
|
||||
}
|
||||
};
|
||||
|
||||
// p9-fb-23 task 7: incremental-ingest early-skip. When force_reingest
|
||||
// is false AND the on-disk asset's checksum + parser_version +
|
||||
// last_chunker_version + last_embedding_version all match the existing
|
||||
// DB record, this asset doesn't need to be re-parsed / re-chunked /
|
||||
// re-embedded. Return Unchanged so the caller bumps `aggregate.unchanged`
|
||||
// and the AssetFinished progress event reflects the skip.
|
||||
if let Some(item) = try_skip_unchanged(
|
||||
app,
|
||||
asset,
|
||||
parser_version,
|
||||
&MdHeadingV1Chunker.chunker_version(),
|
||||
embedder.map(|e| e.model_version()).as_ref(),
|
||||
force_reingest,
|
||||
)? {
|
||||
return Ok(item);
|
||||
}
|
||||
|
||||
let bytes = std::fs::read(&path)
|
||||
.with_context(|| format!("read asset bytes from {}", path.display()))?;
|
||||
|
||||
@@ -775,7 +949,7 @@ fn ingest_one_asset(
|
||||
.map(|w| format!("{:?}: {}", w.kind, w.note))
|
||||
.collect();
|
||||
|
||||
let canonical = build_canonical_document(
|
||||
let mut canonical = build_canonical_document(
|
||||
asset,
|
||||
metadata,
|
||||
parsed_blocks,
|
||||
@@ -788,6 +962,13 @@ fn ingest_one_asset(
|
||||
.chunk(&canonical, chunk_policy)
|
||||
.context("kb-chunk::MdHeadingV1Chunker::chunk")?;
|
||||
|
||||
// Stamp chunker + embedding versions so Task 7's skip detection has
|
||||
// data on the second run.
|
||||
canonical.last_chunker_version = Some(MdHeadingV1Chunker.chunker_version());
|
||||
if let Some(emb) = embedder {
|
||||
canonical.last_embedding_version = Some(emb.model_version());
|
||||
}
|
||||
|
||||
// Persist. Each `put_*` call wraps its own short transaction
|
||||
// (per-document tx semantics per design §5.8); composing them is
|
||||
// the kb-app job. A failure mid-way leaves the DB in a state the
|
||||
@@ -890,6 +1071,7 @@ fn ingest_one_image_asset(
|
||||
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
|
||||
existing_doc_ids: &std::collections::HashSet<String>,
|
||||
image_pipeline: &ImagePipeline<'_>,
|
||||
force_reingest: bool,
|
||||
) -> anyhow::Result<kebab_core::IngestItem> {
|
||||
let image_extractor = image_pipeline.extractor;
|
||||
let ocr_engine = image_pipeline.ocr_engine;
|
||||
@@ -914,6 +1096,23 @@ fn ingest_one_image_asset(
|
||||
});
|
||||
}
|
||||
};
|
||||
// p9-fb-23 task 7: incremental-ingest early-skip for the image flow.
|
||||
// Image docs use the `image-meta-v1` parser_version + the same
|
||||
// MdHeadingV1Chunker as the markdown flow (single-block doc). The
|
||||
// embedding-version check matches the markdown path: when the
|
||||
// active embedder's model_version equals what was stamped on the
|
||||
// existing doc, the asset is Unchanged.
|
||||
let image_parser_version = ParserVersion(kebab_parse_image::PARSER_VERSION.to_string());
|
||||
if let Some(item) = try_skip_unchanged(
|
||||
app,
|
||||
asset,
|
||||
&image_parser_version,
|
||||
&MdHeadingV1Chunker.chunker_version(),
|
||||
embedder.map(|e| e.model_version()).as_ref(),
|
||||
force_reingest,
|
||||
)? {
|
||||
return Ok(item);
|
||||
}
|
||||
let bytes = std::fs::read(&path)
|
||||
.with_context(|| format!("read image asset bytes from {}", path.display()))?;
|
||||
|
||||
@@ -1024,6 +1223,12 @@ fn ingest_one_image_asset(
|
||||
.context("kb-chunk::MdHeadingV1Chunker::chunk (image)")?;
|
||||
|
||||
// 5. Persist + embed — identical sequence to markdown.
|
||||
// Stamp chunker + embedding versions (image uses MdHeadingV1Chunker
|
||||
// for its single-block doc, so we record that version).
|
||||
canonical.last_chunker_version = Some(MdHeadingV1Chunker.chunker_version());
|
||||
if let Some(emb) = embedder {
|
||||
canonical.last_embedding_version = Some(emb.model_version());
|
||||
}
|
||||
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
|
||||
app.sqlite
|
||||
.put_asset_with_bytes(asset, &bytes)
|
||||
@@ -1204,6 +1409,7 @@ fn ingest_one_pdf_asset(
|
||||
embedder: Option<&Arc<dyn Embedder + Send + Sync>>,
|
||||
vector_store: Option<&Arc<kebab_store_vector::LanceVectorStore>>,
|
||||
existing_doc_ids: &std::collections::HashSet<String>,
|
||||
force_reingest: bool,
|
||||
) -> anyhow::Result<kebab_core::IngestItem> {
|
||||
let path = match &asset.source_uri {
|
||||
SourceUri::File(p) => p.clone(),
|
||||
@@ -1225,6 +1431,20 @@ fn ingest_one_pdf_asset(
|
||||
});
|
||||
}
|
||||
};
|
||||
// p9-fb-23 task 7: incremental-ingest early-skip for the PDF flow.
|
||||
// PDF docs use `pdf-text-v1` as the parser_version and `PdfPageV1Chunker`
|
||||
// as the chunker — both pinned per-medium today (no config knob).
|
||||
let pdf_parser_version = ParserVersion(kebab_parse_pdf::PARSER_VERSION.to_string());
|
||||
if let Some(item) = try_skip_unchanged(
|
||||
app,
|
||||
asset,
|
||||
&pdf_parser_version,
|
||||
&PdfPageV1Chunker.chunker_version(),
|
||||
embedder.map(|e| e.model_version()).as_ref(),
|
||||
force_reingest,
|
||||
)? {
|
||||
return Ok(item);
|
||||
}
|
||||
let bytes = std::fs::read(&path)
|
||||
.with_context(|| format!("read PDF asset bytes from {}", path.display()))?;
|
||||
|
||||
@@ -1238,7 +1458,7 @@ fn ingest_one_pdf_asset(
|
||||
workspace_root: &workspace_root,
|
||||
config: &extract_config,
|
||||
};
|
||||
let canonical = PdfTextExtractor::new()
|
||||
let mut canonical = PdfTextExtractor::new()
|
||||
.extract(&ctx, &bytes)
|
||||
.context("kb-parse-pdf::PdfTextExtractor::extract")?;
|
||||
|
||||
@@ -1251,6 +1471,13 @@ fn ingest_one_pdf_asset(
|
||||
.chunk(&canonical, chunk_policy)
|
||||
.context("kb-chunk::PdfPageV1Chunker::chunk")?;
|
||||
|
||||
// Stamp chunker + embedding versions so Task 7's skip detection has
|
||||
// data on the second run.
|
||||
canonical.last_chunker_version = Some(chunker.chunker_version());
|
||||
if let Some(emb) = embedder {
|
||||
canonical.last_embedding_version = Some(emb.model_version());
|
||||
}
|
||||
|
||||
purge_vector_orphans_for_workspace_path(app, asset, vector_store)?;
|
||||
app.sqlite
|
||||
.put_asset_with_bytes(asset, &bytes)
|
||||
|
||||
@@ -363,10 +363,14 @@ async fn garbage_png_increments_errors_counter_exactly_once() {
|
||||
|
||||
// ── 6. Determinism: re-ingest produces identical doc_id / chunk_id ───────
|
||||
|
||||
/// Idempotency contract — running the same ingest twice should mark
|
||||
/// the asset Updated on the second run with byte-identical IDs.
|
||||
/// Idempotency contract — running the same ingest twice keeps the
|
||||
/// doc_id stable. p9-fb-23 task 7 introduced the early-skip path for
|
||||
/// incremental ingest: when checksum + parser/chunker/embedding versions
|
||||
/// all match, the second run reports `Unchanged` rather than `Updated`.
|
||||
/// The pre-p9-fb-23 contract was `Updated` — that path is still exercised
|
||||
/// by `force_reingest = true` tests in `incremental_ingest.rs`.
|
||||
#[tokio::test]
|
||||
async fn re_ingest_image_produces_updated_with_same_doc_id() {
|
||||
async fn re_ingest_image_produces_unchanged_with_same_doc_id() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/api/generate"))
|
||||
@@ -416,6 +420,6 @@ async fn re_ingest_image_produces_updated_with_same_doc_id() {
|
||||
.iter()
|
||||
.find(|i| i.doc_path.0.ends_with("diagram.png"))
|
||||
.unwrap();
|
||||
assert_eq!(img2.kind, kebab_core::IngestItemKind::Updated);
|
||||
assert_eq!(img2.kind, kebab_core::IngestItemKind::Unchanged);
|
||||
assert_eq!(img2.doc_id.as_ref().unwrap(), &id1);
|
||||
}
|
||||
|
||||
82
crates/kebab-app/tests/incremental_ingest.rs
Normal file
82
crates/kebab-app/tests/incremental_ingest.rs
Normal file
@@ -0,0 +1,82 @@
|
||||
//! p9-fb-23: incremental ingest — skip parse/chunk/embed when nothing
|
||||
//! has changed.
|
||||
//!
|
||||
//! Task 7 contract: when `IngestOpts::force_reingest == false` and the
|
||||
//! per-asset (checksum, parser_version, chunker_version, embedding_version)
|
||||
//! tuple matches the existing DB record, ingest emits
|
||||
//! `IngestEvent::AssetFinished { result: Unchanged }` and skips
|
||||
//! parse / chunk / embed / vector upsert. `force_reingest = true`
|
||||
//! bypasses the skip path and re-processes every asset as `Updated`.
|
||||
|
||||
mod common;
|
||||
|
||||
use common::TestEnv;
|
||||
|
||||
use kebab_app::{IngestOpts, ingest_with_config, ingest_with_config_opts};
|
||||
|
||||
#[test]
|
||||
fn second_ingest_of_unchanged_corpus_marks_all_unchanged() {
|
||||
let env = TestEnv::lexical_only();
|
||||
|
||||
// First ingest — populates the DB. Use the legacy entry so the
|
||||
// assertions cover the "previously ingested" set without needing
|
||||
// IngestOpts::default() to behave identically.
|
||||
let first =
|
||||
ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
|
||||
assert_eq!(first.errors, 0, "first ingest must not error: {first:?}");
|
||||
assert!(first.new >= 1, "first ingest must create new docs: {first:?}");
|
||||
assert_eq!(first.unchanged, 0, "first ingest cannot have unchanged: {first:?}");
|
||||
|
||||
let scanned = first.scanned;
|
||||
|
||||
// Second ingest — same files, same versions → all assets must be
|
||||
// labelled Unchanged (no parse / chunk / embed re-work).
|
||||
let second = ingest_with_config_opts(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
false,
|
||||
IngestOpts::default(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(second.scanned, scanned, "second scanned matches first: {second:?}");
|
||||
assert_eq!(second.new, 0, "no new docs on re-ingest: {second:?}");
|
||||
assert_eq!(second.updated, 0, "nothing should be marked updated: {second:?}");
|
||||
assert_eq!(
|
||||
second.unchanged, scanned,
|
||||
"every doc must be Unchanged: {second:?}"
|
||||
);
|
||||
assert_eq!(second.errors, 0, "no errors expected: {second:?}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn force_reingest_bypasses_skip() {
|
||||
let env = TestEnv::lexical_only();
|
||||
|
||||
let first =
|
||||
ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
|
||||
assert_eq!(first.errors, 0, "first ingest must not error: {first:?}");
|
||||
assert!(first.new >= 1, "first ingest must create new docs: {first:?}");
|
||||
let scanned = first.scanned;
|
||||
|
||||
let second = ingest_with_config_opts(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
false,
|
||||
IngestOpts {
|
||||
force_reingest: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(second.scanned, scanned);
|
||||
assert_eq!(
|
||||
second.unchanged, 0,
|
||||
"force_reingest must bypass skip: {second:?}"
|
||||
);
|
||||
assert_eq!(
|
||||
second.updated, scanned,
|
||||
"every doc must be re-processed as Updated: {second:?}"
|
||||
);
|
||||
assert_eq!(second.new, 0, "no new docs on force reingest: {second:?}");
|
||||
assert_eq!(second.errors, 0, "no errors expected: {second:?}");
|
||||
}
|
||||
@@ -52,10 +52,15 @@ fn ingest_idempotent_on_second_run() {
|
||||
|
||||
let r2 =
|
||||
kebab_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
|
||||
// Same files re-ingested — labelled Updated, not duplicated.
|
||||
// Same files re-ingested — p9-fb-23 task 7 introduced the early-skip
|
||||
// path: when checksum + parser/chunker/embedding versions all match,
|
||||
// the second run reports `Unchanged` rather than `Updated`. Pre-p9-fb-23
|
||||
// returned `Updated` here. The `force_reingest=true` path still returns
|
||||
// `Updated` and is exercised by `incremental_ingest.rs`.
|
||||
assert_eq!(r2.scanned, 3, "second scan: {r2:?}");
|
||||
assert_eq!(r2.new, 0, "second run new should be 0: {r2:?}");
|
||||
assert_eq!(r2.updated, 3, "second run updated: {r2:?}");
|
||||
assert_eq!(r2.updated, 0, "second run updated: {r2:?}");
|
||||
assert_eq!(r2.unchanged, 3, "second run unchanged: {r2:?}");
|
||||
|
||||
// list_docs still has 3 docs (no duplicates).
|
||||
let docs = kebab_app::list_docs_with_config(
|
||||
@@ -218,3 +223,62 @@ fn inspect_chunk_not_found_returns_actionable_error() {
|
||||
let msg = format!("{err:#}");
|
||||
assert!(msg.contains("not found"), "got: {msg}");
|
||||
}
|
||||
|
||||
/// p9-fb-23 task 6: `ingest_with_config_opts` with `IngestOpts::default()`
|
||||
/// must behave identically to `ingest_with_config` — first ingest reports
|
||||
/// all assets as new, no errors, no unchanged.
|
||||
#[test]
|
||||
fn ingest_with_config_opts_default_matches_legacy_behaviour() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let report = kebab_app::ingest_with_config_opts(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
false,
|
||||
kebab_app::IngestOpts::default(),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(report.new >= 1, "expected at least one new doc: {report:?}");
|
||||
assert_eq!(report.errors, 0, "no errors expected: {report:?}");
|
||||
assert_eq!(
|
||||
report.unchanged, 0,
|
||||
"first ingest cannot have unchanged: {report:?}"
|
||||
);
|
||||
}
|
||||
|
||||
/// p9-fb-23 task 5: every freshly-ingested markdown doc must carry
|
||||
/// `last_chunker_version`. With `provider="none"` (lexical-only),
|
||||
/// `last_embedding_version` stays `None`.
|
||||
#[test]
|
||||
fn ingest_stamps_chunker_version_on_document() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let report =
|
||||
kebab_app::ingest_with_config(env.config.clone(), env.scope(), false).unwrap();
|
||||
assert!(report.new >= 1, "expected at least one new doc: {report:?}");
|
||||
assert_eq!(report.errors, 0, "no errors expected: {report:?}");
|
||||
|
||||
let docs = kebab_app::list_docs_with_config(
|
||||
env.config.clone(),
|
||||
kebab_core::DocFilter::default(),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(!docs.is_empty(), "no docs after ingest");
|
||||
|
||||
for doc_entry in &docs {
|
||||
let canonical =
|
||||
kebab_app::inspect_doc_with_config(env.config.clone(), &doc_entry.doc_id)
|
||||
.unwrap();
|
||||
assert!(
|
||||
canonical.last_chunker_version.is_some(),
|
||||
"last_chunker_version must be stamped for doc {}: got {:?}",
|
||||
doc_entry.doc_id.0,
|
||||
canonical.last_chunker_version,
|
||||
);
|
||||
// provider="none" → embedder is None → last_embedding_version stays None.
|
||||
assert!(
|
||||
canonical.last_embedding_version.is_none(),
|
||||
"last_embedding_version must be None when provider=none for doc {}: got {:?}",
|
||||
doc_entry.doc_id.0,
|
||||
canonical.last_embedding_version,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,10 +187,15 @@ fn ingest_3_page_pdf_produces_one_doc_and_per_page_chunks() {
|
||||
}
|
||||
}
|
||||
|
||||
/// Re-ingest the SAME PDF bytes → identical doc_id, identical chunk_id
|
||||
/// set, item kind = Updated. P1 idempotency contract.
|
||||
/// Re-ingest the SAME PDF bytes → identical doc_id, item kind =
|
||||
/// Unchanged. p9-fb-23 task 7 introduced the early-skip path: when
|
||||
/// checksum + parser/chunker/embedding versions all match, the second
|
||||
/// run reports `Unchanged` rather than `Updated` and skips parse /
|
||||
/// chunk / embed entirely. The pre-p9-fb-23 contract was `Updated`;
|
||||
/// the `force_reingest=true` path still exercises that branch (see
|
||||
/// `incremental_ingest.rs`).
|
||||
#[test]
|
||||
fn re_ingest_identical_pdf_produces_updated_with_same_doc_id() {
|
||||
fn re_ingest_identical_pdf_produces_unchanged_with_same_doc_id() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let bytes = build_text_pdf(&[Some("page 1"), Some("page 2")]);
|
||||
write_pdf(&env.workspace_root, "stable.pdf", &bytes);
|
||||
@@ -216,17 +221,8 @@ fn re_ingest_identical_pdf_produces_updated_with_same_doc_id() {
|
||||
.into_iter()
|
||||
.find(|i| i.doc_path.0.ends_with("stable.pdf"))
|
||||
.unwrap();
|
||||
assert_eq!(item2.kind, IngestItemKind::Updated);
|
||||
assert_eq!(item2.kind, IngestItemKind::Unchanged);
|
||||
assert_eq!(item2.doc_id, item1.doc_id);
|
||||
// P1 idempotency contract: identical bytes → identical chunk set.
|
||||
// Comparing `chunk_count` as a proxy (full chunk_id set comparison
|
||||
// would need direct sqlite access; the per-chunk #c{char_start}
|
||||
// hash variant in pdf-page-v1 is already tested for stability in
|
||||
// `kebab-chunk::pdf_page_v1::deterministic_chunk_ids_1000`).
|
||||
assert_eq!(
|
||||
item1.chunk_count, item2.chunk_count,
|
||||
"identical bytes must produce identical chunk count"
|
||||
);
|
||||
}
|
||||
|
||||
/// Edit a PDF (replace bytes) → different blake3 → different asset_id
|
||||
|
||||
@@ -477,6 +477,8 @@ mod tests {
|
||||
parser_version: kebab_core::ParserVersion("test-parser-0".into()),
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -352,6 +352,8 @@ mod tests {
|
||||
parser_version,
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -515,6 +517,8 @@ mod tests {
|
||||
parser_version,
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
};
|
||||
let err = PdfPageV1Chunker
|
||||
.chunk(&doc, &default_policy(500, 80))
|
||||
|
||||
@@ -53,6 +53,14 @@ enum Cmd {
|
||||
/// Suppress the per-file `items` list.
|
||||
#[arg(long)]
|
||||
summary_only: bool,
|
||||
|
||||
/// p9-fb-23: bypass the per-asset early-skip path. Every asset is
|
||||
/// re-parsed, re-chunked, re-embedded, and re-upserted regardless
|
||||
/// of whether the DB already has a record with matching checksum
|
||||
/// and version stamps. Useful after manual schema bumps or when
|
||||
/// the user suspects the corpus is in a stale state.
|
||||
#[arg(long)]
|
||||
force_reingest: bool,
|
||||
},
|
||||
|
||||
/// Listing subcommands.
|
||||
@@ -313,6 +321,7 @@ fn run(cli: &Cli) -> anyhow::Result<()> {
|
||||
Cmd::Ingest {
|
||||
root,
|
||||
summary_only,
|
||||
force_reingest,
|
||||
} => {
|
||||
let cfg = kebab_config::Config::load(cli.config.as_deref())?;
|
||||
let scope = kebab_core::SourceScope {
|
||||
@@ -337,12 +346,17 @@ fn run(cli: &Cli) -> anyhow::Result<()> {
|
||||
// *second* Ctrl-C is a hard exit (handled inside `cancel`).
|
||||
let cancel_token = cancel::install_sigint_cancel()?;
|
||||
|
||||
let ingest_result = kebab_app::ingest_with_config_cancellable(
|
||||
// p9-fb-23: use IngestOpts so force_reingest threads through
|
||||
// without churning the positional-arg list.
|
||||
let ingest_result = kebab_app::ingest_with_config_opts(
|
||||
cfg,
|
||||
scope,
|
||||
*summary_only,
|
||||
Some(tx),
|
||||
Some(cancel_token),
|
||||
kebab_app::IngestOpts {
|
||||
progress: Some(tx),
|
||||
cancel: Some(cancel_token),
|
||||
force_reingest: *force_reingest,
|
||||
},
|
||||
);
|
||||
|
||||
// Join the display thread *before* surfacing the ingest
|
||||
|
||||
@@ -168,6 +168,7 @@ mod tests {
|
||||
new: 0,
|
||||
updated: 0,
|
||||
skipped: 0,
|
||||
unchanged: 0,
|
||||
errors: 0,
|
||||
duration_ms: 0,
|
||||
items: None,
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::asset::WorkspacePath;
|
||||
use crate::ids::{AssetId, BlockId, DocumentId};
|
||||
use crate::media::Lang;
|
||||
use crate::metadata::{Metadata, Provenance};
|
||||
use crate::versions::ParserVersion;
|
||||
use crate::versions::{ChunkerVersion, EmbeddingVersion, ParserVersion};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct CanonicalDocument {
|
||||
@@ -22,6 +22,15 @@ pub struct CanonicalDocument {
|
||||
pub parser_version: ParserVersion,
|
||||
pub schema_version: u32,
|
||||
pub doc_version: u32,
|
||||
/// p9-fb-23: chunker version active when this document was last
|
||||
/// chunked. `None` for rows ingested before V006 migration; the
|
||||
/// next ingest stamps the current version. Compared against the
|
||||
/// active chunker version for the incremental-ingest skip path.
|
||||
pub last_chunker_version: Option<ChunkerVersion>,
|
||||
/// p9-fb-23: embedding model version active when this document
|
||||
/// was last embedded. `None` if no embedder is configured (skip
|
||||
/// path treats `None == None` as a match — see design doc).
|
||||
pub last_embedding_version: Option<EmbeddingVersion>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
|
||||
@@ -13,7 +13,11 @@ pub struct IngestReport {
|
||||
pub scanned: u32,
|
||||
pub new: u32,
|
||||
pub updated: u32,
|
||||
/// Media-type / source filter (`kb://`, unsupported types).
|
||||
pub skipped: u32,
|
||||
/// p9-fb-23: assets whose checksum + all version inputs matched —
|
||||
/// parse / chunk / embed / vector upsert all skipped.
|
||||
pub unchanged: u32,
|
||||
pub errors: u32,
|
||||
pub duration_ms: u32,
|
||||
/// `None` ↔ wire `items: null` (`--summary-only`).
|
||||
@@ -40,6 +44,12 @@ pub struct IngestItem {
|
||||
pub enum IngestItemKind {
|
||||
New,
|
||||
Updated,
|
||||
/// Media-type filter / kb:// URI / non-supported source — never made
|
||||
/// it into the parse step.
|
||||
Skipped,
|
||||
/// p9-fb-23: blake3 checksum + parser_version + chunker_version +
|
||||
/// embedding_version all matched the existing record. Parse / chunk
|
||||
/// / embed / vector upsert all skipped.
|
||||
Unchanged,
|
||||
Error,
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::asset::RawAsset;
|
||||
use crate::asset::{RawAsset, WorkspacePath};
|
||||
use crate::chunk::Chunk;
|
||||
use crate::document::{Block, CanonicalDocument};
|
||||
use crate::ids::{ChunkId, DocumentId};
|
||||
@@ -156,6 +156,14 @@ pub trait DocumentStore {
|
||||
fn get_document(&self, id: &DocumentId) -> anyhow::Result<Option<CanonicalDocument>>;
|
||||
fn get_chunk(&self, id: &ChunkId) -> anyhow::Result<Option<Chunk>>;
|
||||
fn list_documents(&self, filter: &DocFilter) -> anyhow::Result<Vec<DocSummary>>;
|
||||
/// p9-fb-23: look up an asset row by its workspace path. Used by
|
||||
/// the incremental-ingest skip path to compare the freshly
|
||||
/// computed blake3 checksum against what's already in SQLite. The
|
||||
/// schema enforces a unique workspace_path per asset.
|
||||
fn get_asset_by_workspace_path(
|
||||
&self,
|
||||
path: &WorkspacePath,
|
||||
) -> anyhow::Result<Option<RawAsset>>;
|
||||
}
|
||||
|
||||
pub trait VectorStore {
|
||||
|
||||
@@ -169,6 +169,8 @@ pub fn build_canonical_document(
|
||||
parser_version: parser_version.clone(),
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -212,6 +212,8 @@ impl Extractor for ImageExtractor {
|
||||
parser_version,
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,6 +216,8 @@ impl Extractor for PdfTextExtractor {
|
||||
parser_version,
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,5 +43,6 @@
|
||||
"root": "/home/u/KB"
|
||||
},
|
||||
"skipped": 0,
|
||||
"unchanged": 0,
|
||||
"updated": 1
|
||||
}
|
||||
|
||||
@@ -165,7 +165,8 @@ impl kebab_core::DocumentStore for SqliteStore {
|
||||
doc_id, asset_id, workspace_path, title, lang,
|
||||
source_type, trust_level, parser_version,
|
||||
doc_version, schema_version, metadata_json,
|
||||
provenance_json, created_at, updated_at
|
||||
provenance_json, created_at, updated_at,
|
||||
last_chunker_version, last_embedding_version
|
||||
FROM documents WHERE doc_id = ?",
|
||||
params![id.0],
|
||||
document_row_from_sql,
|
||||
@@ -221,6 +222,8 @@ impl kebab_core::DocumentStore for SqliteStore {
|
||||
// under that invariant.
|
||||
schema_version: row.schema_version as u32,
|
||||
doc_version: row.doc_version as u32,
|
||||
last_chunker_version: row.last_chunker_version.map(kebab_core::ChunkerVersion),
|
||||
last_embedding_version: row.last_embedding_version.map(kebab_core::EmbeddingVersion),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -261,6 +264,28 @@ impl kebab_core::DocumentStore for SqliteStore {
|
||||
}))
|
||||
}
|
||||
|
||||
fn get_asset_by_workspace_path(
|
||||
&self,
|
||||
path: &kebab_core::WorkspacePath,
|
||||
) -> Result<Option<kebab_core::RawAsset>> {
|
||||
let conn = self.lock_conn();
|
||||
let result = conn.query_row(
|
||||
r#"SELECT
|
||||
asset_id, source_uri, workspace_path, media_type,
|
||||
byte_len, checksum, storage_kind, storage_path,
|
||||
discovered_at
|
||||
FROM assets
|
||||
WHERE workspace_path = ?"#,
|
||||
rusqlite::params![path.0.as_str()],
|
||||
asset_from_row,
|
||||
);
|
||||
match result {
|
||||
Ok(asset) => Ok(Some(asset)),
|
||||
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn list_documents(
|
||||
&self,
|
||||
filter: &kebab_core::DocFilter,
|
||||
@@ -365,6 +390,8 @@ struct DocumentRow {
|
||||
provenance_json: String,
|
||||
// source_type / trust_level are loaded back via metadata_json round-trip,
|
||||
// so we do not need separate fields here for `get_document`.
|
||||
last_chunker_version: Option<String>,
|
||||
last_embedding_version: Option<String>,
|
||||
}
|
||||
|
||||
fn document_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
|
||||
@@ -383,6 +410,10 @@ fn document_row_from_sql(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRo
|
||||
schema_version: row.get(9)?,
|
||||
metadata_json: row.get(10)?,
|
||||
provenance_json: row.get(11)?,
|
||||
// 12: created_at, 13: updated_at — not stored in DocumentRow
|
||||
// (only needed for list_documents). Columns 14-15 follow.
|
||||
last_chunker_version: row.get(14)?,
|
||||
last_embedding_version: row.get(15)?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -475,6 +506,69 @@ fn rows_optional<T>(err: rusqlite::Error) -> rusqlite::Result<Option<T>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reconstruct a [`kebab_core::RawAsset`] from one `assets` row.
|
||||
/// Row mapper for `RawAsset`. Column names are self-documenting; the
|
||||
/// SELECT in [`DocumentStore::get_asset_by_workspace_path`] must include
|
||||
/// all nine columns by their schema names.
|
||||
fn asset_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<kebab_core::RawAsset> {
|
||||
use std::path::PathBuf;
|
||||
|
||||
let asset_id: String = row.get("asset_id")?;
|
||||
let source_uri_raw: String = row.get("source_uri")?;
|
||||
let workspace_path_raw: String = row.get("workspace_path")?;
|
||||
let media_type_json: String = row.get("media_type")?;
|
||||
let byte_len: i64 = row.get("byte_len")?;
|
||||
let checksum_raw: String = row.get("checksum")?;
|
||||
let storage_kind: String = row.get("storage_kind")?;
|
||||
let storage_path_raw: String = row.get("storage_path")?;
|
||||
let discovered_at_raw: String = row.get("discovered_at")?;
|
||||
|
||||
// Parse source_uri: stored as "file://<path>" or "kb://<uri>".
|
||||
let source_uri = if let Some(path_str) = source_uri_raw.strip_prefix("file://") {
|
||||
kebab_core::SourceUri::File(PathBuf::from(path_str))
|
||||
} else {
|
||||
kebab_core::SourceUri::Kb(source_uri_raw.clone())
|
||||
};
|
||||
|
||||
let workspace_path = kebab_core::WorkspacePath(workspace_path_raw);
|
||||
let media_type: kebab_core::MediaType = serde_json::from_str(&media_type_json)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(3, rusqlite::types::Type::Text, Box::new(e)))?;
|
||||
let checksum = kebab_core::Checksum(checksum_raw.clone());
|
||||
let discovered_at = time::OffsetDateTime::parse(
|
||||
&discovered_at_raw,
|
||||
&time::format_description::well_known::Rfc3339,
|
||||
)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(8, rusqlite::types::Type::Text, Box::new(e)))?;
|
||||
|
||||
let storage_path = PathBuf::from(&storage_path_raw);
|
||||
let stored = if storage_kind == "copied" {
|
||||
kebab_core::AssetStorage::Copied { path: storage_path }
|
||||
} else {
|
||||
kebab_core::AssetStorage::Reference {
|
||||
path: storage_path,
|
||||
sha: checksum.clone(),
|
||||
}
|
||||
};
|
||||
|
||||
Ok(kebab_core::RawAsset {
|
||||
asset_id: kebab_core::AssetId(asset_id),
|
||||
source_uri,
|
||||
workspace_path,
|
||||
media_type,
|
||||
byte_len: u64::try_from(byte_len)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
|
||||
// index parameter for named-column path is unused but the
|
||||
// type still requires a number; pass 0 with a clear msg.
|
||||
0,
|
||||
rusqlite::types::Type::Integer,
|
||||
Box::new(e),
|
||||
))?,
|
||||
checksum,
|
||||
discovered_at,
|
||||
stored,
|
||||
})
|
||||
}
|
||||
|
||||
/// UPSERT the documents row and bump `doc_version` on conflict.
|
||||
fn upsert_document(
|
||||
tx: &rusqlite::Transaction<'_>,
|
||||
@@ -503,24 +597,27 @@ fn upsert_document(
|
||||
doc_id, asset_id, workspace_path, title, lang,
|
||||
source_type, trust_level, parser_version,
|
||||
doc_version, schema_version, metadata_json,
|
||||
provenance_json, created_at, updated_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
provenance_json, created_at, updated_at,
|
||||
last_chunker_version, last_embedding_version
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(doc_id) DO UPDATE SET
|
||||
asset_id = excluded.asset_id,
|
||||
workspace_path = excluded.workspace_path,
|
||||
title = excluded.title,
|
||||
lang = excluded.lang,
|
||||
source_type = excluded.source_type,
|
||||
trust_level = excluded.trust_level,
|
||||
parser_version = excluded.parser_version,
|
||||
asset_id = excluded.asset_id,
|
||||
workspace_path = excluded.workspace_path,
|
||||
title = excluded.title,
|
||||
lang = excluded.lang,
|
||||
source_type = excluded.source_type,
|
||||
trust_level = excluded.trust_level,
|
||||
parser_version = excluded.parser_version,
|
||||
-- doc_version: bump on update. excluded.doc_version is the
|
||||
-- caller's submitted value; we ignore it and add 1 to the
|
||||
-- existing column so each re-ingest cleanly increments.
|
||||
doc_version = documents.doc_version + 1,
|
||||
schema_version = excluded.schema_version,
|
||||
metadata_json = excluded.metadata_json,
|
||||
provenance_json = excluded.provenance_json,
|
||||
updated_at = excluded.updated_at",
|
||||
doc_version = documents.doc_version + 1,
|
||||
schema_version = excluded.schema_version,
|
||||
metadata_json = excluded.metadata_json,
|
||||
provenance_json = excluded.provenance_json,
|
||||
updated_at = excluded.updated_at,
|
||||
last_chunker_version = excluded.last_chunker_version,
|
||||
last_embedding_version = excluded.last_embedding_version",
|
||||
params![
|
||||
doc.doc_id.0,
|
||||
doc.source_asset_id.0,
|
||||
@@ -536,6 +633,8 @@ fn upsert_document(
|
||||
provenance_json,
|
||||
created_at,
|
||||
now,
|
||||
doc.last_chunker_version.as_ref().map(|v| v.0.as_str()),
|
||||
doc.last_embedding_version.as_ref().map(|v| v.0.as_str()),
|
||||
],
|
||||
)
|
||||
.map_err(StoreError::from)?;
|
||||
|
||||
@@ -78,6 +78,8 @@ fn make_doc() -> CanonicalDocument {
|
||||
parser_version: ParserVersion("test-parser".into()),
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
148
crates/kebab-store-sqlite/tests/incremental_ingest.rs
Normal file
148
crates/kebab-store-sqlite/tests/incremental_ingest.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
//! Round-trip tests for `last_chunker_version` / `last_embedding_version`
|
||||
//! columns added by the V006 migration (p9-fb-23 task 3).
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use kebab_core::{
|
||||
AssetId, AssetStorage, Block, CanonicalDocument, Checksum, ChunkerVersion, CommonBlock,
|
||||
DocumentId, DocumentStore, EmbeddingVersion, HeadingBlock, Lang, MediaType, Metadata,
|
||||
ParserVersion, Provenance, RawAsset, SourceSpan, SourceType, SourceUri, TrustLevel,
|
||||
WorkspacePath,
|
||||
};
|
||||
use kebab_store_sqlite::SqliteStore;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
mod common;
|
||||
|
||||
fn make_asset() -> RawAsset {
|
||||
let bytes = b"incremental-ingest-test";
|
||||
RawAsset {
|
||||
asset_id: AssetId("f".repeat(32)),
|
||||
source_uri: SourceUri::File(PathBuf::from("/tmp/inc.md")),
|
||||
workspace_path: WorkspacePath::new("notes/inc.md".into()).unwrap(),
|
||||
media_type: MediaType::Markdown,
|
||||
byte_len: bytes.len() as u64,
|
||||
checksum: Checksum(blake3::hash(bytes).to_hex().to_string()),
|
||||
discovered_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
|
||||
stored: AssetStorage::Reference {
|
||||
path: PathBuf::from("/tmp/inc.md"),
|
||||
sha: Checksum(blake3::hash(bytes).to_hex().to_string()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn make_doc() -> CanonicalDocument {
|
||||
let doc_id = DocumentId("d".repeat(32));
|
||||
let block = Block::Heading(HeadingBlock {
|
||||
common: CommonBlock {
|
||||
block_id: kebab_core::BlockId("b".repeat(32)),
|
||||
heading_path: vec![],
|
||||
source_span: SourceSpan::Line { start: 1, end: 1 },
|
||||
},
|
||||
level: 1,
|
||||
text: "Incremental Title".into(),
|
||||
});
|
||||
let metadata = Metadata {
|
||||
aliases: vec![],
|
||||
tags: vec![],
|
||||
created_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
|
||||
updated_at: OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap(),
|
||||
source_type: SourceType::Markdown,
|
||||
trust_level: TrustLevel::Primary,
|
||||
user_id_alias: None,
|
||||
user: Default::default(),
|
||||
};
|
||||
CanonicalDocument {
|
||||
doc_id,
|
||||
source_asset_id: AssetId("f".repeat(32)),
|
||||
workspace_path: WorkspacePath::new("notes/inc.md".into()).unwrap(),
|
||||
title: "Incremental Title".into(),
|
||||
lang: Lang("en".into()),
|
||||
blocks: vec![block],
|
||||
metadata,
|
||||
provenance: Provenance { events: vec![] },
|
||||
parser_version: ParserVersion("test-parser".into()),
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn put_then_get_document_roundtrips_version_stamps() {
|
||||
let env = common::TestEnv::new();
|
||||
let store = SqliteStore::open(&env.config()).unwrap();
|
||||
store.run_migrations().unwrap();
|
||||
|
||||
let asset = make_asset();
|
||||
store.put_asset(&asset).unwrap();
|
||||
|
||||
let mut doc = make_doc();
|
||||
doc.last_chunker_version = Some(ChunkerVersion("md-heading-v1".into()));
|
||||
doc.last_embedding_version = Some(EmbeddingVersion("multilingual-e5-small@v1".into()));
|
||||
|
||||
store.put_document(&doc).unwrap();
|
||||
let loaded = store
|
||||
.get_document(&doc.doc_id)
|
||||
.unwrap()
|
||||
.expect("doc round-trips");
|
||||
|
||||
assert_eq!(loaded.last_chunker_version, doc.last_chunker_version);
|
||||
assert_eq!(loaded.last_embedding_version, doc.last_embedding_version);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn put_then_get_document_roundtrips_none_stamps() {
|
||||
let env = common::TestEnv::new();
|
||||
let store = SqliteStore::open(&env.config()).unwrap();
|
||||
store.run_migrations().unwrap();
|
||||
|
||||
let asset = make_asset();
|
||||
store.put_asset(&asset).unwrap();
|
||||
|
||||
let doc = make_doc(); // both version stamps are None by default
|
||||
store.put_document(&doc).unwrap();
|
||||
let loaded = store
|
||||
.get_document(&doc.doc_id)
|
||||
.unwrap()
|
||||
.expect("doc round-trips");
|
||||
|
||||
assert!(
|
||||
loaded.last_chunker_version.is_none(),
|
||||
"last_chunker_version must be None when not set"
|
||||
);
|
||||
assert!(
|
||||
loaded.last_embedding_version.is_none(),
|
||||
"last_embedding_version must be None when not set"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_asset_by_workspace_path_roundtrips() {
|
||||
let env = common::TestEnv::new();
|
||||
let store = SqliteStore::open(&env.config()).unwrap();
|
||||
store.run_migrations().unwrap();
|
||||
|
||||
let asset = make_asset();
|
||||
store.put_asset(&asset).unwrap();
|
||||
|
||||
let loaded = store
|
||||
.get_asset_by_workspace_path(&asset.workspace_path)
|
||||
.unwrap()
|
||||
.expect("asset must round-trip");
|
||||
|
||||
assert_eq!(loaded.asset_id, asset.asset_id);
|
||||
assert_eq!(loaded.checksum, asset.checksum);
|
||||
assert_eq!(loaded.byte_len, asset.byte_len);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_asset_by_workspace_path_returns_none_for_unknown() {
|
||||
let env = common::TestEnv::new();
|
||||
let store = SqliteStore::open(&env.config()).unwrap();
|
||||
store.run_migrations().unwrap();
|
||||
|
||||
let path = WorkspacePath::new("notes/missing.md".into()).unwrap();
|
||||
assert!(store.get_asset_by_workspace_path(&path).unwrap().is_none());
|
||||
}
|
||||
@@ -31,6 +31,7 @@ fn fixture_report() -> IngestReport {
|
||||
new: 2,
|
||||
updated: 1,
|
||||
skipped: 0,
|
||||
unchanged: 0,
|
||||
errors: 0,
|
||||
duration_ms: 187,
|
||||
items: Some(vec![
|
||||
|
||||
@@ -67,6 +67,8 @@ fn make_doc(
|
||||
parser_version: ParserVersion("test".into()),
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
};
|
||||
(asset, doc)
|
||||
}
|
||||
|
||||
@@ -131,6 +131,9 @@ fn apply_event(state: &mut IngestState, event: IngestEvent) {
|
||||
kebab_core::IngestItemKind::Skipped => {
|
||||
state.counts.skipped = state.counts.skipped.saturating_add(1);
|
||||
}
|
||||
kebab_core::IngestItemKind::Unchanged => {
|
||||
state.counts.unchanged = state.counts.unchanged.saturating_add(1);
|
||||
}
|
||||
kebab_core::IngestItemKind::Error => {
|
||||
state.counts.errors = state.counts.errors.saturating_add(1);
|
||||
}
|
||||
@@ -173,21 +176,23 @@ pub fn status_line(state: &IngestState) -> String {
|
||||
let secs = elapsed.as_secs();
|
||||
if state.aborted {
|
||||
return format!(
|
||||
"✗ ingest aborted at {}/{} after {}s (new={} updated={} skipped={} errors={})",
|
||||
"✗ ingest aborted at {}/{} after {}s (new={} updated={} unchanged={} skipped={} errors={})",
|
||||
state.counts.scanned.saturating_sub(state.counts.errors),
|
||||
state.counts.scanned,
|
||||
secs,
|
||||
state.counts.new,
|
||||
state.counts.updated,
|
||||
state.counts.unchanged,
|
||||
state.counts.skipped,
|
||||
state.counts.errors,
|
||||
);
|
||||
}
|
||||
return format!(
|
||||
"✓ ingest: {} docs ({} new, {} updated, {} skipped), {} chunks indexed in {}s",
|
||||
"✓ ingest: {} docs ({} new, {} updated, {} unchanged, {} skipped), {} chunks indexed in {}s",
|
||||
state.counts.scanned,
|
||||
state.counts.new,
|
||||
state.counts.updated,
|
||||
state.counts.unchanged,
|
||||
state.counts.skipped,
|
||||
state.counts.chunks_indexed,
|
||||
secs,
|
||||
|
||||
@@ -91,6 +91,8 @@ fn make_doc() -> CanonicalDocument {
|
||||
parser_version: ParserVersion("test-parser".into()),
|
||||
schema_version: 1,
|
||||
doc_version: 1,
|
||||
last_chunker_version: None,
|
||||
last_embedding_version: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
1138
docs/superpowers/plans/2026-05-04-p9-fb-23-incremental-ingest.md
Normal file
1138
docs/superpowers/plans/2026-05-04-p9-fb-23-incremental-ingest.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,173 @@
|
||||
# p9-fb-23 — Incremental ingest (skip unchanged docs)
|
||||
|
||||
**Date**: 2026-05-04
|
||||
**Status**: planned
|
||||
**Audience**: kebab-app / kebab-store-sqlite implementer / reviewer.
|
||||
**Source feedback**: 사용자 도그푸딩 2026-05-04 — "새 문서들이 폴더에 추가되면 ingest 시 변하지 않은 문서는 다시 ingest 하지 않고 변하거나 새로 추가된 문서만 처리하고 싶어."
|
||||
|
||||
## Goal
|
||||
|
||||
`kebab ingest` 가 변경되지 않은 (그리고 모든 version cascade input 도 동일한) document 의 parse / chunk / embed / vector upsert 를 스킵. 비용 dominator (fastembed embedding 호출) 가 변경된 / 새 file 에만 발생.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- Mtime 기반 pre-hash skip (파일 읽기 자체를 회피). YAGNI — blake3 streaming 은 이미 scan 에서 무조건 발생, 본 spec 은 parse/chunk/embed 만 회피해도 90%+ 비용 절감.
|
||||
- Watch-mode (실시간 file change detection). 후속 task.
|
||||
- 부분 변경 (single chunk re-embedding). 항상 doc 단위 all-or-nothing.
|
||||
|
||||
## Allowed dependencies
|
||||
|
||||
- 기존 crate 만. 신규 crate 없음.
|
||||
- SQLite migration 추가 (V006).
|
||||
|
||||
## Scope
|
||||
|
||||
본 spec 은 *file-system 소스* (`kebab-source-fs`) + 메인 ingest 파이프라인 (`kebab-app::ingest_with_config*`) 에만 적용. 다른 source connector (현재 없음, 후속 phase) 도 같은 skip 계약을 따름 — `IngestReport.unchanged` 카운트는 connector 무관.
|
||||
|
||||
## Skip 조건
|
||||
|
||||
문서가 다음 4개 모두 만족할 때 `Unchanged` 로 분류:
|
||||
|
||||
1. `assets.checksum` (저장된 blake3) == 신규 blake3 (스캔 중 재계산).
|
||||
2. `documents.parser_version` == 현재 active parser_version.
|
||||
3. `documents.last_chunker_version` == 현재 active chunker_version.
|
||||
4. `documents.last_embedding_version` == 현재 active embedding_version (또는 양쪽 모두 NULL — embedder 미설정).
|
||||
|
||||
위 4개 중 하나라도 다르면 정상 ingest path. parse / chunk / embed / vector upsert 모두 발생.
|
||||
|
||||
## Storage 변경
|
||||
|
||||
**Migration V006** (`crates/kebab-store-sqlite/migrations/V006__incremental_ingest.sql`):
|
||||
|
||||
`documents` 테이블에 두 column 추가:
|
||||
|
||||
```sql
|
||||
ALTER TABLE documents ADD COLUMN last_chunker_version TEXT;
|
||||
ALTER TABLE documents ADD COLUMN last_embedding_version TEXT;
|
||||
```
|
||||
|
||||
기존 row 는 NULL — 첫 ingest 시 항상 mismatch → 강제 재처리 (안전 default). 이후 매 ingest 가 row 의 두 column 을 현 active version 으로 stamp.
|
||||
|
||||
`parser_version` 은 이미 `documents` 테이블에 존재 (v005 이전). 활용.
|
||||
|
||||
V006 migration 은 idempotent (`ALTER TABLE` + `ADD COLUMN` 이 두 번 실행돼도 sqlite 가 column-exists 체크). Refinery framework 가 single-shot 보장.
|
||||
|
||||
## Pipeline 흐름
|
||||
|
||||
`kebab-app::ingest_with_config_progress_cancellable` (현 메인 ingest fn) 의 asset 루프 안에서:
|
||||
|
||||
1. Source connector 가 file scan + blake3 streaming → `asset_blake3` 생성 (현재와 동일).
|
||||
2. **신규 early-skip 체크**:
|
||||
- `store.get_asset_by_workspace_path(path)` 로 기존 asset row 조회.
|
||||
- 존재 + `existing.checksum == new asset_blake3` → asset 동일.
|
||||
- `store.get_document_by_doc_id(id_for_doc(path, asset_id, current_parser_version))` 로 기존 doc 조회.
|
||||
- 존재 + `existing.last_chunker_version == current_chunker_version` + `existing.last_embedding_version == current_embedding_version` → **skip**.
|
||||
- `IngestReport.unchanged += 1`.
|
||||
- `IngestEvent::Item { kind: Unchanged, .. }` emit (progress consumer 가 표시).
|
||||
- 다음 asset 로 continue.
|
||||
3. Skip 미충족 → 정상 path: `put_asset_with_bytes` → parse → `put_document` → chunk → `put_chunks` → embed → `vec_store.upsert`.
|
||||
4. 정상 path 끝에서 `documents.last_chunker_version` + `documents.last_embedding_version` 을 현 active version 으로 stamp (`put_document` 가 받는 `Document` struct 에 두 field 추가, refinery 마이그레이션 자동 column 채움).
|
||||
|
||||
## API 변경
|
||||
|
||||
### `kebab-core::Document` struct
|
||||
|
||||
필드 두 개 추가:
|
||||
|
||||
```rust
|
||||
pub struct Document {
|
||||
// ... existing ...
|
||||
pub last_chunker_version: Option<ChunkerVersion>,
|
||||
pub last_embedding_version: Option<EmbeddingVersion>,
|
||||
}
|
||||
```
|
||||
|
||||
`Option` — embedder 미설정 (config.models.embedding.enabled = false) 시 `last_embedding_version = None`.
|
||||
|
||||
### `kebab-core::IngestReport` + `kebab-app::AggregateCounts`
|
||||
|
||||
`unchanged: u32` 필드 추가. wire schema 변경:
|
||||
|
||||
`docs/wire-schema/v1/ingest_report.schema.json` 에 `unchanged` (integer, minimum 0) 필드 추가. **additive — v1 호환 유지** (기존 client 가 모르는 필드 무시). v2 bump 불필요.
|
||||
|
||||
`AggregateCounts::default()` 가 `unchanged: 0` 자동 처리.
|
||||
|
||||
### `kebab-core::IngestItemKind`
|
||||
|
||||
```rust
|
||||
pub enum IngestItemKind {
|
||||
New,
|
||||
Updated,
|
||||
Skipped, // 기존: media-type 필터 / kb:// URI
|
||||
Unchanged, // 신규: skip 조건 4개 모두 만족
|
||||
Error,
|
||||
}
|
||||
```
|
||||
|
||||
`Skipped` (media-type 필터) 와 `Unchanged` (모든 versions match) 의미적 분리. `IngestEvent::Item.kind` 도 같이 확장.
|
||||
|
||||
### `kebab-store-sqlite` 신규 메서드
|
||||
|
||||
```rust
|
||||
fn get_asset_by_workspace_path(&self, path: &WorkspacePath) -> Result<Option<Asset>>;
|
||||
fn get_document_by_doc_id(&self, doc_id: &DocumentId) -> Result<Option<Document>>;
|
||||
```
|
||||
|
||||
기존 `put_*` / `purge_*` 메서드는 변경 없음. 새 read 경로만 추가.
|
||||
|
||||
## TUI 노출
|
||||
|
||||
`kebab-tui::ingest_progress::status_line` 의 final line 포맷에 `unchanged` 추가:
|
||||
|
||||
```
|
||||
✓ ingest: 100 docs (5 new, 3 updated, 92 unchanged, 0 skipped), 142 chunks indexed in 12s
|
||||
```
|
||||
|
||||
진행 중 (in-flight) status 는 그대로 (per-asset granularity 이므로 unchanged 별 카운트 불필요).
|
||||
|
||||
p9-fb-24 의 status bar dynamic slot 도 같은 텍스트 표시 (cascade 의 `indexing N/M` final line).
|
||||
|
||||
## CLI 노출
|
||||
|
||||
`kebab ingest` 의 `--json` 모드는 wire schema 의 `unchanged` 필드 자동 출력. human 모드 final line 은 위 status_line 과 동일 포맷.
|
||||
|
||||
`--force-reingest` flag 신규 추가 — skip 조건 무시하고 모든 doc 강제 재처리. 사용자가 "이상한 결과 → 일단 모두 재처리" 케이스 대응. CLI 의 `kebab_app::AskOpts` 같은 패턴으로 `IngestOpts.force_reingest: bool` 추가, 기본 false.
|
||||
|
||||
## Tests
|
||||
|
||||
### 신규 단위
|
||||
|
||||
- V006 migration smoke (sqlite store): apply → `documents` 에 두 컬럼 존재 + NULL default.
|
||||
- `get_asset_by_workspace_path` / `get_document_by_doc_id` 단위 (kebab-store-sqlite).
|
||||
- `id_for_doc` 변경 없음 (parser_version 만 input — 그대로).
|
||||
|
||||
### 신규 통합 (kebab-app)
|
||||
|
||||
- **Unchanged path**: 한 번 ingest → 두 번째 ingest 시 `IngestReport.unchanged == 1`, embed 호출 0회.
|
||||
- **Checksum mismatch**: 첫 ingest 후 파일 수정 → 두 번째 ingest 가 `updated == 1`.
|
||||
- **Parser version bump**: 첫 ingest 후 `KEBAB_PARSE_MD_VERSION` 상수 변경 simulate → 두 번째 ingest 가 `updated == 1` (doc_id 변경됨).
|
||||
- **Chunker version bump**: 첫 ingest 후 chunker_version 변경 simulate → `updated == 1`.
|
||||
- **Embedder version bump**: 첫 ingest 후 embedder_version 변경 simulate → `updated == 1`.
|
||||
- **`--force-reingest`**: 두 번째 ingest 가 skip 조건 만족하지만 강제로 `updated == 1` (또는 별도 카테고리?).
|
||||
|
||||
### 기존 영향
|
||||
|
||||
- 기존 ingest 통합 테스트 (kebab-app/tests/) 는 빈 KB 에서 시작하므로 모두 첫 번째 ingest path → `unchanged` 가 0 인 채로 그대로 통과.
|
||||
- `IngestReport` JSON 출력 테스트가 `unchanged` 필드 추가됐을 때 호환되는지 검증. additive 라 통과해야 함.
|
||||
|
||||
## Spec contract impact
|
||||
|
||||
- **Design §9 versioning cascade**: 명시적 동작 추가. parser/chunker/embedder version bump 시 다음 ingest 가 자동으로 모든 doc 을 `updated` 로 처리. 기존엔 silently 새 version 으로 overwrite (idempotent UPSERT) 였으나 본 spec 으로 explicit refresh 보장.
|
||||
- **Design §3.x IngestReport**: `unchanged` 필드 추가 (additive). v1 wire schema bump 없음.
|
||||
- **Design §2.4a IngestEvent**: `IngestItemKind::Unchanged` variant 추가. line-delimited JSON consumer 는 unknown variant 무시 (현 default behavior).
|
||||
|
||||
## Risks / notes
|
||||
|
||||
- **Stale skip risk**: 사용자가 외부 도구 (Ollama 모델 swap 등) 로 embedder 바꾸고도 config 의 `models.embedding.id` 갱신 안 하면 `last_embedding_version` 매치 → silently skip. 완화: model_id 도 stamp 에 포함? 또는 doctor 명령이 mismatch 감지 → 권고. 본 spec 은 `embedding_version` (model 명+버전 fingerprint) 만 신뢰 — model 자체 무결성은 별 영역.
|
||||
- **Force-reingest UX**: `--force-reingest` 는 모든 doc 재처리. 큰 corpus 에서 비싸므로 confirm prompt? 일단 flag 만 — 사용자가 명시적으로 입력하니 confirmation 불필요.
|
||||
- **V006 migration 호환**: refinery 가 down-migration 미지원 (one-way). 이전 commit 으로 rollback 시 column 그대로 남음 (sqlite ALTER 의 한계). 무해 — 미사용 column.
|
||||
- **doc_version 와의 관계**: 기존 `doc_version` (ingest 마다 +1) 는 그대로. Unchanged path 에서는 `doc_version` bump 안 함 — "이번 ingest 에서 처리 안 됨" 의미 보존.
|
||||
|
||||
## Live deviations
|
||||
|
||||
추후 발견되는 deviation 은 `tasks/HOTFIXES.md` `2026-05-04 — p9-fb-23` 항목에 dated 로그로 추가. spec 자체는 frozen.
|
||||
@@ -11,6 +11,7 @@
|
||||
"new",
|
||||
"updated",
|
||||
"skipped",
|
||||
"unchanged",
|
||||
"errors",
|
||||
"duration_ms"
|
||||
],
|
||||
@@ -21,6 +22,11 @@
|
||||
"new": { "type": "integer", "minimum": 0 },
|
||||
"updated": { "type": "integer", "minimum": 0 },
|
||||
"skipped": { "type": "integer", "minimum": 0 },
|
||||
"unchanged": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
"description": "p9-fb-23: assets whose checksum + parser_version + chunker_version + embedding_version all matched the existing record. Parse / chunk / embed / vector upsert all skipped."
|
||||
},
|
||||
"errors": { "type": "integer", "minimum": 0 },
|
||||
"duration_ms": { "type": "integer", "minimum": 0 },
|
||||
"items": { "type": ["array", "null"] }
|
||||
|
||||
@@ -62,6 +62,8 @@
|
||||
"doc_id": "6a9ef317c9c097ff3f6aeb317559bd83",
|
||||
"doc_version": 1,
|
||||
"lang": "en",
|
||||
"last_chunker_version": null,
|
||||
"last_embedding_version": null,
|
||||
"metadata": {
|
||||
"aliases": [],
|
||||
"created_at": "2023-11-14T22:13:20Z",
|
||||
|
||||
6
migrations/V006__incremental_ingest.sql
Normal file
6
migrations/V006__incremental_ingest.sql
Normal file
@@ -0,0 +1,6 @@
|
||||
-- p9-fb-23: incremental ingest needs to know which chunker / embedding
|
||||
-- versions were used to populate this document so a re-ingest can
|
||||
-- decide whether to skip (versions match) or re-process (any mismatch).
|
||||
-- parser_version is already on documents from V001.
|
||||
ALTER TABLE documents ADD COLUMN last_chunker_version TEXT;
|
||||
ALTER TABLE documents ADD COLUMN last_embedding_version TEXT;
|
||||
@@ -14,6 +14,31 @@ historical contract that was implemented; this file accumulates the
|
||||
deltas so phase 5+ readers can find the live behavior without diffing
|
||||
git history.
|
||||
|
||||
## 2026-05-04 — p9-fb-23 (post-dogfooding): Incremental ingest
|
||||
|
||||
**Source feedback**: 사용자 도그푸딩 2026-05-04 — "새 문서들이 폴더에 추가되면 ingest 시 변하지 않은 문서는 다시 ingest 하지 않고 변하거나 새로 추가된 문서만 처리하고 싶어."
|
||||
|
||||
**Live binding 변경**:
|
||||
|
||||
- SQLite V006 migration — `documents` 에 `last_chunker_version` + `last_embedding_version` TEXT (nullable) 추가. 기존 row 는 NULL → 첫 번째 ingest 시 항상 mismatch → 강제 재처리 (안전 default).
|
||||
- `kebab-core::IngestItemKind::Unchanged` variant 신규 (기존 `Skipped` 와 의미 분리: `Skipped` = media-type 필터, `Unchanged` = 모든 versions match).
|
||||
- `IngestReport.unchanged: u32` + `AggregateCounts.unchanged: u32` 신규. wire schema `ingest_report.v1` 에 `unchanged` 필드 additive (v1 호환 유지).
|
||||
- `kebab-app::IngestOpts { progress, cancel, force_reingest }` struct 신규 — `AskOpts` 패턴. 기존 `ingest_with_config_cancellable` 등 wrapper 보존, 신규 `ingest_with_config_opts` 가 IngestOpts 받음.
|
||||
- `kebab-app::ingest_with_config_opts` asset 루프에 early-skip 블록: `force_reingest=false` + 4 조건 (asset_blake3 일치 + doc_id 존재 + last_chunker_version 일치 + last_embedding_version 일치) 모두 성립 시 `IngestEvent::AssetFinished{result: Unchanged}` emit + `aggregate.unchanged += 1` + `continue` (parse/chunk/embed/vector upsert 모두 회피). 세 flow (md / image / pdf) 모두 적용.
|
||||
- 정상 path 끝에서 `CanonicalDocument.last_chunker_version` + `last_embedding_version` 을 현 active version 으로 stamp.
|
||||
- `kebab-cli` 에 `--force-reingest` flag 추가 (skip 우회 강제 재처리).
|
||||
- `kebab-tui::ingest_progress::status_line` final / aborted 라인 모두 `unchanged=N` 노출.
|
||||
|
||||
**Spec contract impact**: design §9 versioning cascade 의 명시적 동작 추가 — parser/chunker/embedder version bump 시 다음 ingest 가 자동으로 모든 doc 을 `updated` 로 처리. 기존엔 silently 새 version 으로 overwrite (idempotent UPSERT) 였으나 본 변경으로 explicit refresh + 비용 회피 모두 보장. design §3.x IngestReport / §2.4a IngestEvent 에 `Unchanged` variant 추가 (additive, wire v1 호환).
|
||||
|
||||
**Tests added**: 8 신규 (`crates/kebab-app/tests/incremental_ingest.rs` 2 + `crates/kebab-app/tests/ingest_lexical.rs` 2 + `crates/kebab-store-sqlite/tests/incremental_ingest.rs` 4) + 3 기존 갱신 (`image_pipeline.rs` / `pdf_pipeline.rs` / `ingest_lexical.rs::ingest_idempotent_on_second_run` 의 assertion 이 Updated → Unchanged 로 변경). 기존 ~720 워크스페이스 테스트 무수정 통과.
|
||||
|
||||
**Known limitation (deferred)**:
|
||||
|
||||
- Mtime-based pre-hash skip 미구현 — blake3 streaming 은 매 scan 마다 무조건 발생.
|
||||
- Watch-mode (실시간 file change detection) 후속 task.
|
||||
- Stale skip risk: 사용자가 외부에서 embedder 모델 swap 후 config 의 `models.embedding.id` 갱신 안 하면 last_embedding_version 매치 → silently skip. doctor 명령이 mismatch 감지 → 권고하는 후속 task 가능.
|
||||
|
||||
## 2026-05-04 — p9-fb-24 (post-dogfooding): TUI status bar + Library 헤더 + page scroll
|
||||
|
||||
**Source feedback**: 사용자 도그푸딩 2026-05-04 — (1) Library 컬럼이 무엇을 뜻하는지 헤더 부재, (2) Ask 트랜스크립트 / Inspect 둘 다 페이지 단위 스크롤 키 필요, (3) 모든 모드에서 항상 떠 있는 상태바 + 키 안내바 (버전 정보 포함) 가 있으면 좋겠다.
|
||||
|
||||
@@ -106,6 +106,7 @@ P0~P5 는 직렬. P6~P9 는 P5 이후 병렬 가능.
|
||||
- [p9-fb-20 citation surface](p9/p9-fb-20-citation-surface.md)
|
||||
- [p9-fb-21 Insert-key + F1 visibility (post-도그푸딩)](p9/p9-fb-21-tui-insert-key-discoverability.md)
|
||||
- [p9-fb-22 cursor mid-string editing + Ask follow-tail (post-도그푸딩)](p9/p9-fb-22-tui-cursor-and-autoscroll.md)
|
||||
- [p9-fb-23 incremental ingest (post-도그푸딩)](p9/p9-fb-23-incremental-ingest.md)
|
||||
- [p9-fb-24 status bar + Library header + page scroll (post-도그푸딩)](p9/p9-fb-24-tui-affordances.md)
|
||||
|
||||
## Post-merge 핫픽스
|
||||
|
||||
47
tasks/p9/p9-fb-23-incremental-ingest.md
Normal file
47
tasks/p9/p9-fb-23-incremental-ingest.md
Normal file
@@ -0,0 +1,47 @@
|
||||
---
|
||||
phase: P9
|
||||
component: kebab-app
|
||||
task_id: p9-fb-23
|
||||
title: "Incremental ingest — skip unchanged docs (post-merge dogfooding)"
|
||||
status: completed
|
||||
depends_on: [p9-fb-03, p9-fb-07]
|
||||
unblocks: []
|
||||
contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md
|
||||
contract_sections: [§9 Versioning cascade, §2.4a IngestEvent, §3.x IngestReport]
|
||||
source_feedback: 사용자 도그푸딩 2026-05-04 — 변하지 않은 문서 재처리 회피 요청.
|
||||
---
|
||||
|
||||
# p9-fb-23 — Incremental ingest
|
||||
|
||||
상세 설계: `docs/superpowers/specs/2026-05-04-p9-fb-23-incremental-ingest-design.md`.
|
||||
구현 계획: `docs/superpowers/plans/2026-05-04-p9-fb-23-incremental-ingest.md`.
|
||||
|
||||
## Goal
|
||||
|
||||
`kebab ingest` 가 변경/신규 doc 만 처리. 변하지 않은 doc 은 parse/chunk/embed/vector upsert 모두 회피.
|
||||
|
||||
## Behavior contract
|
||||
|
||||
Skip 조건 4 모두 만족:
|
||||
1. 신규 blake3 == `assets.checksum`.
|
||||
2. `documents.parser_version` == 현 active.
|
||||
3. `documents.last_chunker_version` == 현 active.
|
||||
4. `documents.last_embedding_version` == 현 active (None == None 도 match).
|
||||
|
||||
위 중 하나라도 mismatch → 정상 path. parse/chunk/embed/vector upsert 모두.
|
||||
|
||||
`IngestOpts.force_reingest=true` → skip 무시 강제 재처리.
|
||||
|
||||
## Tests
|
||||
|
||||
- 통합: 두 번째 ingest 가 unchanged 1 / new 0 / updated 0.
|
||||
- 통합: `--force-reingest` 가 skip 우회.
|
||||
- 단위: V006 migration, SQLite put/get_document roundtrip 신규 컬럼, get_asset_by_workspace_path roundtrip.
|
||||
- 통합: 첫 ingest 가 chunker/embedding version stamp.
|
||||
|
||||
## Risks / notes
|
||||
|
||||
- mtime pre-hash skip 미구현 (YAGNI, 후속 가능).
|
||||
- 외부 embedder model swap 후 config 갱신 안 하면 silently skip — doctor 명령이 mismatch 감지하는 후속 task 가능.
|
||||
|
||||
Live deviations 반영 위치: `tasks/HOTFIXES.md` `2026-05-04 — p9-fb-23` 항목.
|
||||
Reference in New Issue
Block a user