diff --git a/crates/kebab-app/src/app.rs b/crates/kebab-app/src/app.rs index 80cedcd..e64b51f 100644 --- a/crates/kebab-app/src/app.rs +++ b/crates/kebab-app/src/app.rs @@ -212,6 +212,34 @@ impl App { sqlite .run_migrations() .context("kb-app: run SqliteStore migrations")?; + // V009 의 tokenized_korean_text column 의 first-boot eager backfill. + // 신규 ingest 의 chunks_ai trigger 가 이미 채우므로 NULL row 가 없으면 즉시 0 반환 (idempotent). + // V007 → V009 업그레이드 시 KB 크기 비례 (~10000 chunk 당 ~30-60s). + let backfill_count = sqlite + .backfill_tokenized_korean_text( + |done, total| { + if total > 0 && done % 500 == 0 { + tracing::info!( + target: "kebab-app", + "korean tokenizer backfill: {done}/{total}" + ); + } + }, + kebab_chunk::tokenize_korean_morphological, + ) + .unwrap_or_else(|e| { + tracing::warn!( + target: "kebab-app", + "korean tokenizer backfill failed: {e}" + ); + 0 + }); + if backfill_count > 0 { + tracing::info!( + target: "kebab-app", + "korean tokenizer backfill complete: {backfill_count} chunks updated" + ); + } // p9-fb-19: build the LRU cache from config. Capacity 0 → // `None` (cache disabled — every search hits the retrievers). let search_cache = NonZeroUsize::new(config.search.cache_capacity) @@ -1177,7 +1205,7 @@ impl App { .context("prepare ms query")?; stmt.query_map([], |r| r.get::<_, u64>(0)) .context("query ms")? - .filter_map(|r| r.ok()) + .filter_map(Result::ok) .collect() }; let (p50_ms, p90_ms, p99_ms, max_ms) = percentiles(&samples); @@ -1191,7 +1219,7 @@ impl App { let rows = stmt .query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, u64>(1)?))) .context("query engine")?; - for row in rows.filter_map(|r| r.ok()) { + for row in rows.filter_map(Result::ok) { by_engine.insert(row.0, row.1); } } @@ -1219,7 +1247,7 @@ impl App { }) }) .context("query by_doc")? - .filter_map(|r| r.ok()) + .filter_map(Result::ok) .collect() }; @@ -1276,7 +1304,7 @@ impl App { }) }) .context("query failures by doc_id")? - .filter_map(|r| r.ok()) + .filter_map(Result::ok) .collect() } else { let mut stmt = conn @@ -1298,7 +1326,7 @@ impl App { }) }) .context("query failures corpus-wide")? - .filter_map(|r| r.ok()) + .filter_map(Result::ok) .collect() }; Ok(OcrFailuresV1 { diff --git a/crates/kebab-app/src/ingest_log.rs b/crates/kebab-app/src/ingest_log.rs index 83f9cd3..ce2b4fa 100644 --- a/crates/kebab-app/src/ingest_log.rs +++ b/crates/kebab-app/src/ingest_log.rs @@ -232,13 +232,12 @@ pub(crate) fn cleanup_old_logs( retention_days: u32, ) -> anyhow::Result<()> { let mut entries: Vec<_> = std::fs::read_dir(log_dir)? - .filter_map(|e| e.ok()) + .filter_map(Result::ok) .filter(|e| { e.path() .file_name() .and_then(|n| n.to_str()) - .map(|s| s.starts_with("ingest-") && s.ends_with(".ndjson")) - .unwrap_or(false) + .is_some_and(|s| s.starts_with("ingest-") && s.ends_with(".ndjson")) }) .collect(); @@ -247,7 +246,7 @@ pub(crate) fn cleanup_old_logs( let cutoff = SystemTime::now() .checked_sub(std::time::Duration::from_secs( - retention_days as u64 * 86400, + u64::from(retention_days) * 86400, )) .unwrap_or(SystemTime::UNIX_EPOCH); @@ -412,7 +411,7 @@ mod tests { cleanup_old_logs(dir, 3, 90).unwrap(); let remaining: Vec<_> = std::fs::read_dir(dir) .unwrap() - .filter_map(|e| e.ok()) + .filter_map(Result::ok) .collect(); assert_eq!(remaining.len(), 3, "expected 3 files after cleanup"); } @@ -436,7 +435,7 @@ mod tests { cleanup_old_logs(dir, 10, 30).unwrap(); let remaining: Vec<_> = std::fs::read_dir(dir) .unwrap() - .filter_map(|e| e.ok()) + .filter_map(Result::ok) .collect(); assert_eq!( remaining.len(), diff --git a/crates/kebab-app/src/pdf_ocr_apply.rs b/crates/kebab-app/src/pdf_ocr_apply.rs index ba375d1..254d8fd 100644 --- a/crates/kebab-app/src/pdf_ocr_apply.rs +++ b/crates/kebab-app/src/pdf_ocr_apply.rs @@ -191,8 +191,7 @@ where note: Some(note), }); let (image_width, image_height) = extract_image_dimensions(&page_image_bytes) - .map(|(w, h)| (Some(w), Some(h))) - .unwrap_or((None, None)); + .map_or((None, None), |(w, h)| (Some(w), Some(h))); emit_progress(PdfOcrProgress::Finished { page: page_num, ms: start.elapsed().as_millis() as u64, @@ -272,8 +271,7 @@ where }); let (image_width, image_height) = extract_image_dimensions(&page_image_bytes) - .map(|(w, h)| (Some(w), Some(h))) - .unwrap_or((None, None)); + .map_or((None, None), |(w, h)| (Some(w), Some(h))); emit_progress(PdfOcrProgress::Finished { page: page_num, ms: elapsed_ms, diff --git a/crates/kebab-app/tests/ocr_inspect_smoke.rs b/crates/kebab-app/tests/ocr_inspect_smoke.rs index 31e1bf4..65d414b 100644 --- a/crates/kebab-app/tests/ocr_inspect_smoke.rs +++ b/crates/kebab-app/tests/ocr_inspect_smoke.rs @@ -15,14 +15,14 @@ fn seed_ocr_events(env: &TestEnv, store: &SqliteStore) { store .record_pdf_ocr_event( "run-aaa", - &format!("2026-05-28T0{}:00:00Z", i), + &format!("2026-05-28T0{i}:00:00Z"), Some("doc-abc"), "path/scanned.pdf", i + 1, Some(50_000), Some(200), Some(150), - 100 + (i as u64) * 20, + 100 + u64::from(i) * 20, 42, true, None, diff --git a/crates/kebab-store-sqlite/src/store.rs b/crates/kebab-store-sqlite/src/store.rs index 722001a..0a0324c 100644 --- a/crates/kebab-store-sqlite/src/store.rs +++ b/crates/kebab-store-sqlite/src/store.rs @@ -492,6 +492,64 @@ impl SqliteStore { Ok(out) } + /// V007 → V009 업그레이드 시 기존 chunks 의 `tokenized_korean_text` 가 NULL — 이 + /// 메서드가 NULL 인 row 를 batch 로 읽어 `tokenize` 콜백으로 형태소 분해 후 UPDATE. + /// chunks_au trigger 가 chunks_fts 를 자동 재-index. + /// + /// - `tokenize`: `kebab_chunk::tokenize_korean_morphological` 등 `&str → Option`. + /// `None` 반환 시 row 를 skip (UPDATE 없음). + /// - `progress`: `(done, total)` 콜백. 1000 row 마다 발화. + /// - 반환값: lindera Some 으로 UPDATE 된 row 수 (idempotent — 이미 채워진 row 는 0). + /// - 실패 시 App open 을 block 하지 않도록 호출자가 `unwrap_or_else` 로 감쌀 것. + pub fn backfill_tokenized_korean_text(&self, progress: F, tokenize: T) -> Result + where + F: Fn(u64, u64), + T: Fn(&str) -> Option, + { + // 1. NULL 후보 수집. + let rows: Vec<(String, String)> = { + let conn = self.lock_conn(); + let mut stmt = conn + .prepare( + "SELECT chunk_id, text FROM chunks \ + WHERE tokenized_korean_text IS NULL \ + ORDER BY chunk_id", + ) + .map_err(StoreError::from)?; + let iter = stmt + .query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))) + .map_err(StoreError::from)?; + let mut out = Vec::new(); + for r in iter { + out.push(r.map_err(StoreError::from)?); + } + out + }; + + let total = rows.len() as u64; + let mut updated: u64 = 0; + + // 2. 1000 row 마다 transaction 으로 batch UPDATE. + for chunk in rows.chunks(1000) { + let conn = self.lock_conn(); + let tx = conn.unchecked_transaction().map_err(StoreError::from)?; + for (chunk_id, text) in chunk { + if let Some(tokenized) = tokenize(text) { + tx.execute( + "UPDATE chunks SET tokenized_korean_text = ?1 WHERE chunk_id = ?2", + params![tokenized, chunk_id], + ) + .map_err(StoreError::from)?; + updated += 1; + } + } + tx.commit().map_err(StoreError::from)?; + progress(updated, total); + } + + Ok(updated) + } + /// v0.17.0 PR-B: sweep the SQLite document chain (`documents` → /// `blocks` / `chunks` / `embedding_records` via CASCADE) for every /// row at `workspace_path` whose `doc_id` differs from `keep_doc_id`. diff --git a/crates/kebab-store-sqlite/tests/fts.rs b/crates/kebab-store-sqlite/tests/fts.rs index 8da02cd..5d945b8 100644 --- a/crates/kebab-store-sqlite/tests/fts.rs +++ b/crates/kebab-store-sqlite/tests/fts.rs @@ -13,6 +13,7 @@ //! that bypasses the `SqliteStore` mutex; that's fine because each test //! gets its own tempdir and no concurrent mutator is in flight. +use kebab_chunk::tokenize_korean_morphological; use kebab_store_sqlite::{SqliteStore, rebuild_chunks_fts}; use rusqlite::Connection; @@ -453,6 +454,53 @@ fn v009_bumps_corpus_revision() { ); } +// ── 5c. backfill_tokenized_korean_text ─────────────────────────────── + +#[test] +fn backfill_tokenized_korean_text_populates_nullable_rows() { + let env = common::TestEnv::new(); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + // chunks 에 한국어 row 두 개 INSERT (tokenized_korean_text 는 chunks_ai trigger + // 가 채우지만, 여기서는 raw_conn_no_fk 로 직접 INSERT 하므로 NULL 로 남음). + let conn = raw_conn_no_fk(&env); + insert_chunk(&conn, &"a".repeat(32), &"d".repeat(32), "[]", "한국 문화는 오래되었다"); + insert_chunk(&conn, &"b".repeat(32), &"d".repeat(32), "[]", "서울특별시는 한국의 수도"); + let null_count_before: i64 = conn + .query_row( + "SELECT COUNT(*) FROM chunks WHERE tokenized_korean_text IS NULL", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(null_count_before, 2); + drop(conn); + + // backfill 호출 → lindera 가 두 row 모두 분해 성공 → 2 반환. + let processed = store + .backfill_tokenized_korean_text(|_, _| {}, tokenize_korean_morphological) + .unwrap(); + assert_eq!(processed, 2, "both rows should be populated by lindera"); + + let conn = raw_conn_no_fk(&env); + let null_count_after: i64 = conn + .query_row( + "SELECT COUNT(*) FROM chunks WHERE tokenized_korean_text IS NULL", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(null_count_after, 0); + + // idempotency: 두 번째 호출 → 0 (모든 row 가 이미 채워져 있음). + drop(conn); + let processed_again = store + .backfill_tokenized_korean_text(|_, _| {}, tokenize_korean_morphological) + .unwrap(); + assert_eq!(processed_again, 0); +} + // ── 6. WAL cleanup: drop store before tempdir reaps WAL/SHM ────────── /// Mirror the P1-6 pattern: opening + migrating + dropping the store