feat(app): dual-write PDF OCR events to SQLite + ndjson (Enhancement 2 wiring)

Pre-capture canonical.doc_id and Arc<SqliteStore> before the OCR
emit_progress closure so both the ndjson file and the SQLite mirror
carry the same doc_id for every event. File write is durable
(errors propagate); SQLite insert is non-critical (tracing::warn on
failure, ingest does not abort) per spec R-1.

LogEvent::Ocr gains a doc_id: Option<&str> field as an additive
Serde change — round 1 ndjson logs deserialize with doc_id=None.

Closure r1 F1: doc_id NULL in dual-write resolved via
let doc_id_for_log = canonical.doc_id.0.clone() pre-capture.
Closure r2 G1: Arc::clone(&app.sqlite) reused instead of opening a
second SqliteStore — eliminates double-open lock contention and
duplicate migration runs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 06:06:03 +00:00
parent 6482bf1321
commit 4e451c9f7c
3 changed files with 174 additions and 1 deletions

View File

@@ -116,6 +116,10 @@ pub(crate) fn now_ts() -> String {
pub enum LogEvent<'a> {
Ocr {
ts: String,
/// v0.20.x r2: additive field — doc_id for dual-write SQLite correlation.
/// Round 1 ndjson logs deserialize with doc_id=None (Serde Option default).
#[serde(skip_serializing_if = "Option::is_none")]
doc_id: Option<&'a str>,
doc_path: &'a str,
page: u32,
image_byte_size: Option<u64>,

View File

@@ -1933,6 +1933,13 @@ fn ingest_one_pdf_asset(
let pages_for_ocr = ocr_pages_cnt.clone();
let failures_for_ocr = ocr_failures_cnt.clone();
let doc_path_for_log = asset.workspace_path.0.clone();
// v0.20.x r2 Step 3: pre-capture for dual-write (F1 + G1 resolution).
let doc_id_for_log: String = canonical.doc_id.0.clone();
let store_for_ocr = Arc::clone(&app.sqlite);
let run_id_for_log: String = lw_for_ocr
.as_ref()
.and_then(|lw| lw.lock().ok().map(|w| w.run_id().to_string()))
.unwrap_or_default();
let summary = crate::pdf_ocr_apply::apply_ocr_to_pdf_pages(
&mut canonical,
@@ -1974,10 +1981,12 @@ fn ingest_one_pdf_asset(
}
// v0.20.x Hook 2: write OCR event to log writer.
let success = !skipped && failure_reason.is_none();
let ts_for_event = crate::ingest_log::now_ts();
if let Some(ref lw) = lw_for_ocr {
if let Ok(mut w) = lw.lock() {
let _ = w.write_event(&crate::ingest_log::LogEvent::Ocr {
ts: crate::ingest_log::now_ts(),
ts: ts_for_event.clone(),
doc_id: Some(&doc_id_for_log),
doc_path: &doc_path_for_log,
page,
image_byte_size,
@@ -1991,6 +2000,27 @@ fn ingest_one_pdf_asset(
});
}
}
// v0.20.x r2: SQLite dual-write (non-critical — R-1).
if let Err(e) = store_for_ocr.record_pdf_ocr_event(
&run_id_for_log,
&ts_for_event,
Some(&doc_id_for_log),
&doc_path_for_log,
page,
image_byte_size,
image_width,
image_height,
ms,
chars,
success,
failure_reason.as_deref(),
engine.engine_name(),
) {
tracing::warn!(
target: "kebab-app",
"sqlite ocr event insert failed: {e}"
);
}
if let Ok(mut p) = pages_for_ocr.lock() {
*p += 1;
}

View File

@@ -0,0 +1,139 @@
//! Integration smoke test: dual-write (ndjson + SQLite) for PDF OCR events.
//! AC-3: SQLite row count and doc_id matches ndjson LogEvent::Ocr.
//!
//! Uses wiremock to stub the Ollama `/api/generate` endpoint so the test
//! runs without a live Ollama instance.
mod common;
use std::path::PathBuf;
use common::TestEnv;
use kebab_config::LoggingCfg;
use serde_json::Value;
use tokio::task::spawn_blocking;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn scanned_pdf_src() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.join("kebab-parse-pdf/tests/fixtures/scanned_page1.pdf")
}
/// AC-3: ndjson OCR line count == pdf_ocr_events row count, and doc_id matches.
#[tokio::test]
async fn ingest_dual_write_doc_id_matches_ndjson() {
let src = scanned_pdf_src();
if !src.exists() {
eprintln!("skipping test: scanned_page1.pdf fixture not found");
return;
}
let server = MockServer::start().await;
// Stub Ollama /api/generate to return a minimal OCR response.
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"model": "qwen2.5vl:3b",
"response": "test ocr output",
"done": true,
"done_reason": "stop"
})))
.mount(&server)
.await;
let mock_url = server.uri();
let result = spawn_blocking(move || {
let mut env = TestEnv::lexical_only();
// Enable PDF OCR + set up mock endpoint
env.config.pdf.ocr.enabled = true;
env.config.pdf.ocr.endpoint = Some(mock_url.clone());
env.config.pdf.ocr.model = "qwen2.5vl:3b".to_string();
// Enable ingest log
let log_dir = env.temp.path().join("logs");
std::fs::create_dir_all(&log_dir).unwrap();
env.config.logging = LoggingCfg {
ingest_log_enabled: true,
ingest_log_dir: log_dir.clone(),
};
// Copy scanned PDF into workspace
let dest = env.workspace_root.join("scanned.pdf");
std::fs::copy(scanned_pdf_src(), &dest).expect("copy scanned PDF");
// Run ingest
kebab_app::ingest_with_config(env.config.clone(), env.scope(), false)
.expect("ingest");
// Read ndjson log
let log_files: Vec<_> = std::fs::read_dir(&log_dir)
.unwrap()
.filter_map(Result::ok)
.filter(|e| {
let name = e.file_name().to_string_lossy().to_string();
name.starts_with("ingest-") && name.ends_with(".ndjson")
})
.collect();
assert_eq!(log_files.len(), 1, "expected 1 ndjson log file");
let body = std::fs::read_to_string(log_files[0].path()).unwrap();
let ocr_lines: Vec<Value> = body
.lines()
.filter_map(|l| serde_json::from_str(l).ok())
.filter(|v: &Value| v.get("kind").and_then(Value::as_str) == Some("ocr"))
.collect();
// Read pdf_ocr_events from SQLite
let db_path = PathBuf::from(&env.config.storage.data_dir).join("kebab.sqlite");
let conn = rusqlite::Connection::open(&db_path).expect("open db");
let rows: Vec<(Option<String>, String)> = {
let mut stmt = conn
.prepare("SELECT doc_id, doc_path FROM pdf_ocr_events ORDER BY id")
.expect("prepare");
stmt.query_map([], |r| Ok((r.get(0)?, r.get(1)?)))
.expect("query")
.map(|r| r.expect("row"))
.collect()
};
(ocr_lines, rows)
})
.await
.expect("spawn_blocking");
let (ocr_lines, rows) = result;
// At least one OCR event must be produced
assert!(!ocr_lines.is_empty(), "expected ≥1 ndjson ocr line");
assert!(!rows.is_empty(), "expected ≥1 pdf_ocr_events row");
// Row counts must match
assert_eq!(
ocr_lines.len(),
rows.len(),
"ndjson ocr lines ({}) must equal pdf_ocr_events rows ({})",
ocr_lines.len(),
rows.len()
);
// doc_id in both sources must be non-null and consistent
for (line, (sql_doc_id, _sql_doc_path)) in ocr_lines.iter().zip(rows.iter()) {
let json_doc_id = line.get("doc_id").and_then(Value::as_str);
assert!(
json_doc_id.is_some(),
"ndjson ocr line should have doc_id: {line}"
);
assert!(
sql_doc_id.is_some(),
"pdf_ocr_events row should have doc_id"
);
assert_eq!(
json_doc_id,
sql_doc_id.as_deref(),
"ndjson doc_id must equal SQLite doc_id"
);
}
}