feat(app): first-boot eager backfill for tokenized_korean_text
V007 → V009 업그레이드 시 기존 chunks 의 tokenized_korean_text 가 NULL — 첫 App::open_with_config 호출 시 자동으로 lindera ko-dic 으로 분해 후 UPDATE. chunks_au trigger 가 chunks_fts 를 자동 재-index. 사용자 재-ingest 불필요. - crates/kebab-store-sqlite/src/store.rs: backfill_tokenized_korean_text(progress_cb, tokenize) API. 1000 row 마다 commit + progress 콜백. idempotent (IS NULL 필터로 partial completion 재실행 안전). tokenizer 를 파라미터로 받아 §8 dep 경계 유지. - crates/kebab-app/src/app.rs::open_with_config: run_migrations 직후 backfill 호출. 실패 시 warn log 만 (App open 은 성공 — vector/hybrid mode 계속 가능). 500 row 마다 info log progress. - crates/kebab-store-sqlite/tests/fts.rs: backfill_tokenized_korean_text_populates_nullable_rows 단위 test (idempotency 포함). - clippy pre-existing 오류 수정 (redundant_closure, map_unwrap_or, cast_lossless, uninlined_format_args — kebab-app/ingest_log.rs, pdf_ocr_apply.rs, app.rs, tests/ocr_inspect_smoke.rs). Spec: docs/superpowers/specs/2026-05-28-v0.20.x-korean-morphological-tokenizer-spec.md §8.1, §8.2 Plan: docs/superpowers/plans/2026-05-28-v0.20.x-korean-morphological-tokenizer-plan.md (S4) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -212,6 +212,34 @@ impl App {
|
|||||||
sqlite
|
sqlite
|
||||||
.run_migrations()
|
.run_migrations()
|
||||||
.context("kb-app: run SqliteStore migrations")?;
|
.context("kb-app: run SqliteStore migrations")?;
|
||||||
|
// V009 의 tokenized_korean_text column 의 first-boot eager backfill.
|
||||||
|
// 신규 ingest 의 chunks_ai trigger 가 이미 채우므로 NULL row 가 없으면 즉시 0 반환 (idempotent).
|
||||||
|
// V007 → V009 업그레이드 시 KB 크기 비례 (~10000 chunk 당 ~30-60s).
|
||||||
|
let backfill_count = sqlite
|
||||||
|
.backfill_tokenized_korean_text(
|
||||||
|
|done, total| {
|
||||||
|
if total > 0 && done % 500 == 0 {
|
||||||
|
tracing::info!(
|
||||||
|
target: "kebab-app",
|
||||||
|
"korean tokenizer backfill: {done}/{total}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
kebab_chunk::tokenize_korean_morphological,
|
||||||
|
)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
tracing::warn!(
|
||||||
|
target: "kebab-app",
|
||||||
|
"korean tokenizer backfill failed: {e}"
|
||||||
|
);
|
||||||
|
0
|
||||||
|
});
|
||||||
|
if backfill_count > 0 {
|
||||||
|
tracing::info!(
|
||||||
|
target: "kebab-app",
|
||||||
|
"korean tokenizer backfill complete: {backfill_count} chunks updated"
|
||||||
|
);
|
||||||
|
}
|
||||||
// p9-fb-19: build the LRU cache from config. Capacity 0 →
|
// p9-fb-19: build the LRU cache from config. Capacity 0 →
|
||||||
// `None` (cache disabled — every search hits the retrievers).
|
// `None` (cache disabled — every search hits the retrievers).
|
||||||
let search_cache = NonZeroUsize::new(config.search.cache_capacity)
|
let search_cache = NonZeroUsize::new(config.search.cache_capacity)
|
||||||
@@ -1177,7 +1205,7 @@ impl App {
|
|||||||
.context("prepare ms query")?;
|
.context("prepare ms query")?;
|
||||||
stmt.query_map([], |r| r.get::<_, u64>(0))
|
stmt.query_map([], |r| r.get::<_, u64>(0))
|
||||||
.context("query ms")?
|
.context("query ms")?
|
||||||
.filter_map(|r| r.ok())
|
.filter_map(Result::ok)
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
let (p50_ms, p90_ms, p99_ms, max_ms) = percentiles(&samples);
|
let (p50_ms, p90_ms, p99_ms, max_ms) = percentiles(&samples);
|
||||||
@@ -1191,7 +1219,7 @@ impl App {
|
|||||||
let rows = stmt
|
let rows = stmt
|
||||||
.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, u64>(1)?)))
|
.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, u64>(1)?)))
|
||||||
.context("query engine")?;
|
.context("query engine")?;
|
||||||
for row in rows.filter_map(|r| r.ok()) {
|
for row in rows.filter_map(Result::ok) {
|
||||||
by_engine.insert(row.0, row.1);
|
by_engine.insert(row.0, row.1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1219,7 +1247,7 @@ impl App {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
.context("query by_doc")?
|
.context("query by_doc")?
|
||||||
.filter_map(|r| r.ok())
|
.filter_map(Result::ok)
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1276,7 +1304,7 @@ impl App {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
.context("query failures by doc_id")?
|
.context("query failures by doc_id")?
|
||||||
.filter_map(|r| r.ok())
|
.filter_map(Result::ok)
|
||||||
.collect()
|
.collect()
|
||||||
} else {
|
} else {
|
||||||
let mut stmt = conn
|
let mut stmt = conn
|
||||||
@@ -1298,7 +1326,7 @@ impl App {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
.context("query failures corpus-wide")?
|
.context("query failures corpus-wide")?
|
||||||
.filter_map(|r| r.ok())
|
.filter_map(Result::ok)
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
Ok(OcrFailuresV1 {
|
Ok(OcrFailuresV1 {
|
||||||
|
|||||||
@@ -232,13 +232,12 @@ pub(crate) fn cleanup_old_logs(
|
|||||||
retention_days: u32,
|
retention_days: u32,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut entries: Vec<_> = std::fs::read_dir(log_dir)?
|
let mut entries: Vec<_> = std::fs::read_dir(log_dir)?
|
||||||
.filter_map(|e| e.ok())
|
.filter_map(Result::ok)
|
||||||
.filter(|e| {
|
.filter(|e| {
|
||||||
e.path()
|
e.path()
|
||||||
.file_name()
|
.file_name()
|
||||||
.and_then(|n| n.to_str())
|
.and_then(|n| n.to_str())
|
||||||
.map(|s| s.starts_with("ingest-") && s.ends_with(".ndjson"))
|
.is_some_and(|s| s.starts_with("ingest-") && s.ends_with(".ndjson"))
|
||||||
.unwrap_or(false)
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@@ -247,7 +246,7 @@ pub(crate) fn cleanup_old_logs(
|
|||||||
|
|
||||||
let cutoff = SystemTime::now()
|
let cutoff = SystemTime::now()
|
||||||
.checked_sub(std::time::Duration::from_secs(
|
.checked_sub(std::time::Duration::from_secs(
|
||||||
retention_days as u64 * 86400,
|
u64::from(retention_days) * 86400,
|
||||||
))
|
))
|
||||||
.unwrap_or(SystemTime::UNIX_EPOCH);
|
.unwrap_or(SystemTime::UNIX_EPOCH);
|
||||||
|
|
||||||
@@ -412,7 +411,7 @@ mod tests {
|
|||||||
cleanup_old_logs(dir, 3, 90).unwrap();
|
cleanup_old_logs(dir, 3, 90).unwrap();
|
||||||
let remaining: Vec<_> = std::fs::read_dir(dir)
|
let remaining: Vec<_> = std::fs::read_dir(dir)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.filter_map(|e| e.ok())
|
.filter_map(Result::ok)
|
||||||
.collect();
|
.collect();
|
||||||
assert_eq!(remaining.len(), 3, "expected 3 files after cleanup");
|
assert_eq!(remaining.len(), 3, "expected 3 files after cleanup");
|
||||||
}
|
}
|
||||||
@@ -436,7 +435,7 @@ mod tests {
|
|||||||
cleanup_old_logs(dir, 10, 30).unwrap();
|
cleanup_old_logs(dir, 10, 30).unwrap();
|
||||||
let remaining: Vec<_> = std::fs::read_dir(dir)
|
let remaining: Vec<_> = std::fs::read_dir(dir)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.filter_map(|e| e.ok())
|
.filter_map(Result::ok)
|
||||||
.collect();
|
.collect();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
remaining.len(),
|
remaining.len(),
|
||||||
|
|||||||
@@ -191,8 +191,7 @@ where
|
|||||||
note: Some(note),
|
note: Some(note),
|
||||||
});
|
});
|
||||||
let (image_width, image_height) = extract_image_dimensions(&page_image_bytes)
|
let (image_width, image_height) = extract_image_dimensions(&page_image_bytes)
|
||||||
.map(|(w, h)| (Some(w), Some(h)))
|
.map_or((None, None), |(w, h)| (Some(w), Some(h)));
|
||||||
.unwrap_or((None, None));
|
|
||||||
emit_progress(PdfOcrProgress::Finished {
|
emit_progress(PdfOcrProgress::Finished {
|
||||||
page: page_num,
|
page: page_num,
|
||||||
ms: start.elapsed().as_millis() as u64,
|
ms: start.elapsed().as_millis() as u64,
|
||||||
@@ -272,8 +271,7 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
let (image_width, image_height) = extract_image_dimensions(&page_image_bytes)
|
let (image_width, image_height) = extract_image_dimensions(&page_image_bytes)
|
||||||
.map(|(w, h)| (Some(w), Some(h)))
|
.map_or((None, None), |(w, h)| (Some(w), Some(h)));
|
||||||
.unwrap_or((None, None));
|
|
||||||
emit_progress(PdfOcrProgress::Finished {
|
emit_progress(PdfOcrProgress::Finished {
|
||||||
page: page_num,
|
page: page_num,
|
||||||
ms: elapsed_ms,
|
ms: elapsed_ms,
|
||||||
|
|||||||
@@ -15,14 +15,14 @@ fn seed_ocr_events(env: &TestEnv, store: &SqliteStore) {
|
|||||||
store
|
store
|
||||||
.record_pdf_ocr_event(
|
.record_pdf_ocr_event(
|
||||||
"run-aaa",
|
"run-aaa",
|
||||||
&format!("2026-05-28T0{}:00:00Z", i),
|
&format!("2026-05-28T0{i}:00:00Z"),
|
||||||
Some("doc-abc"),
|
Some("doc-abc"),
|
||||||
"path/scanned.pdf",
|
"path/scanned.pdf",
|
||||||
i + 1,
|
i + 1,
|
||||||
Some(50_000),
|
Some(50_000),
|
||||||
Some(200),
|
Some(200),
|
||||||
Some(150),
|
Some(150),
|
||||||
100 + (i as u64) * 20,
|
100 + u64::from(i) * 20,
|
||||||
42,
|
42,
|
||||||
true,
|
true,
|
||||||
None,
|
None,
|
||||||
|
|||||||
@@ -492,6 +492,64 @@ impl SqliteStore {
|
|||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// V007 → V009 업그레이드 시 기존 chunks 의 `tokenized_korean_text` 가 NULL — 이
|
||||||
|
/// 메서드가 NULL 인 row 를 batch 로 읽어 `tokenize` 콜백으로 형태소 분해 후 UPDATE.
|
||||||
|
/// chunks_au trigger 가 chunks_fts 를 자동 재-index.
|
||||||
|
///
|
||||||
|
/// - `tokenize`: `kebab_chunk::tokenize_korean_morphological` 등 `&str → Option<String>`.
|
||||||
|
/// `None` 반환 시 row 를 skip (UPDATE 없음).
|
||||||
|
/// - `progress`: `(done, total)` 콜백. 1000 row 마다 발화.
|
||||||
|
/// - 반환값: lindera Some 으로 UPDATE 된 row 수 (idempotent — 이미 채워진 row 는 0).
|
||||||
|
/// - 실패 시 App open 을 block 하지 않도록 호출자가 `unwrap_or_else` 로 감쌀 것.
|
||||||
|
pub fn backfill_tokenized_korean_text<F, T>(&self, progress: F, tokenize: T) -> Result<u64>
|
||||||
|
where
|
||||||
|
F: Fn(u64, u64),
|
||||||
|
T: Fn(&str) -> Option<String>,
|
||||||
|
{
|
||||||
|
// 1. NULL 후보 수집.
|
||||||
|
let rows: Vec<(String, String)> = {
|
||||||
|
let conn = self.lock_conn();
|
||||||
|
let mut stmt = conn
|
||||||
|
.prepare(
|
||||||
|
"SELECT chunk_id, text FROM chunks \
|
||||||
|
WHERE tokenized_korean_text IS NULL \
|
||||||
|
ORDER BY chunk_id",
|
||||||
|
)
|
||||||
|
.map_err(StoreError::from)?;
|
||||||
|
let iter = stmt
|
||||||
|
.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))
|
||||||
|
.map_err(StoreError::from)?;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for r in iter {
|
||||||
|
out.push(r.map_err(StoreError::from)?);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
};
|
||||||
|
|
||||||
|
let total = rows.len() as u64;
|
||||||
|
let mut updated: u64 = 0;
|
||||||
|
|
||||||
|
// 2. 1000 row 마다 transaction 으로 batch UPDATE.
|
||||||
|
for chunk in rows.chunks(1000) {
|
||||||
|
let conn = self.lock_conn();
|
||||||
|
let tx = conn.unchecked_transaction().map_err(StoreError::from)?;
|
||||||
|
for (chunk_id, text) in chunk {
|
||||||
|
if let Some(tokenized) = tokenize(text) {
|
||||||
|
tx.execute(
|
||||||
|
"UPDATE chunks SET tokenized_korean_text = ?1 WHERE chunk_id = ?2",
|
||||||
|
params![tokenized, chunk_id],
|
||||||
|
)
|
||||||
|
.map_err(StoreError::from)?;
|
||||||
|
updated += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx.commit().map_err(StoreError::from)?;
|
||||||
|
progress(updated, total);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(updated)
|
||||||
|
}
|
||||||
|
|
||||||
/// v0.17.0 PR-B: sweep the SQLite document chain (`documents` →
|
/// v0.17.0 PR-B: sweep the SQLite document chain (`documents` →
|
||||||
/// `blocks` / `chunks` / `embedding_records` via CASCADE) for every
|
/// `blocks` / `chunks` / `embedding_records` via CASCADE) for every
|
||||||
/// row at `workspace_path` whose `doc_id` differs from `keep_doc_id`.
|
/// row at `workspace_path` whose `doc_id` differs from `keep_doc_id`.
|
||||||
|
|||||||
@@ -13,6 +13,7 @@
|
|||||||
//! that bypasses the `SqliteStore` mutex; that's fine because each test
|
//! that bypasses the `SqliteStore` mutex; that's fine because each test
|
||||||
//! gets its own tempdir and no concurrent mutator is in flight.
|
//! gets its own tempdir and no concurrent mutator is in flight.
|
||||||
|
|
||||||
|
use kebab_chunk::tokenize_korean_morphological;
|
||||||
use kebab_store_sqlite::{SqliteStore, rebuild_chunks_fts};
|
use kebab_store_sqlite::{SqliteStore, rebuild_chunks_fts};
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
|
|
||||||
@@ -453,6 +454,53 @@ fn v009_bumps_corpus_revision() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── 5c. backfill_tokenized_korean_text ───────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn backfill_tokenized_korean_text_populates_nullable_rows() {
|
||||||
|
let env = common::TestEnv::new();
|
||||||
|
let store = SqliteStore::open(&env.config()).unwrap();
|
||||||
|
store.run_migrations().unwrap();
|
||||||
|
|
||||||
|
// chunks 에 한국어 row 두 개 INSERT (tokenized_korean_text 는 chunks_ai trigger
|
||||||
|
// 가 채우지만, 여기서는 raw_conn_no_fk 로 직접 INSERT 하므로 NULL 로 남음).
|
||||||
|
let conn = raw_conn_no_fk(&env);
|
||||||
|
insert_chunk(&conn, &"a".repeat(32), &"d".repeat(32), "[]", "한국 문화는 오래되었다");
|
||||||
|
insert_chunk(&conn, &"b".repeat(32), &"d".repeat(32), "[]", "서울특별시는 한국의 수도");
|
||||||
|
let null_count_before: i64 = conn
|
||||||
|
.query_row(
|
||||||
|
"SELECT COUNT(*) FROM chunks WHERE tokenized_korean_text IS NULL",
|
||||||
|
[],
|
||||||
|
|r| r.get(0),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(null_count_before, 2);
|
||||||
|
drop(conn);
|
||||||
|
|
||||||
|
// backfill 호출 → lindera 가 두 row 모두 분해 성공 → 2 반환.
|
||||||
|
let processed = store
|
||||||
|
.backfill_tokenized_korean_text(|_, _| {}, tokenize_korean_morphological)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(processed, 2, "both rows should be populated by lindera");
|
||||||
|
|
||||||
|
let conn = raw_conn_no_fk(&env);
|
||||||
|
let null_count_after: i64 = conn
|
||||||
|
.query_row(
|
||||||
|
"SELECT COUNT(*) FROM chunks WHERE tokenized_korean_text IS NULL",
|
||||||
|
[],
|
||||||
|
|r| r.get(0),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(null_count_after, 0);
|
||||||
|
|
||||||
|
// idempotency: 두 번째 호출 → 0 (모든 row 가 이미 채워져 있음).
|
||||||
|
drop(conn);
|
||||||
|
let processed_again = store
|
||||||
|
.backfill_tokenized_korean_text(|_, _| {}, tokenize_korean_morphological)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(processed_again, 0);
|
||||||
|
}
|
||||||
|
|
||||||
// ── 6. WAL cleanup: drop store before tempdir reaps WAL/SHM ──────────
|
// ── 6. WAL cleanup: drop store before tempdir reaps WAL/SHM ──────────
|
||||||
|
|
||||||
/// Mirror the P1-6 pattern: opening + migrating + dropping the store
|
/// Mirror the P1-6 pattern: opening + migrating + dropping the store
|
||||||
|
|||||||
Reference in New Issue
Block a user