diff --git a/crates/kebab-app/src/ingest_log.rs b/crates/kebab-app/src/ingest_log.rs index 3702793..4e5d4fe 100644 --- a/crates/kebab-app/src/ingest_log.rs +++ b/crates/kebab-app/src/ingest_log.rs @@ -116,6 +116,10 @@ pub(crate) fn now_ts() -> String { pub enum LogEvent<'a> { Ocr { ts: String, + /// v0.20.x r2: additive field — doc_id for dual-write SQLite correlation. + /// Round 1 ndjson logs deserialize with doc_id=None (Serde Option default). + #[serde(skip_serializing_if = "Option::is_none")] + doc_id: Option<&'a str>, doc_path: &'a str, page: u32, image_byte_size: Option, diff --git a/crates/kebab-app/src/lib.rs b/crates/kebab-app/src/lib.rs index 328e219..97fbeab 100644 --- a/crates/kebab-app/src/lib.rs +++ b/crates/kebab-app/src/lib.rs @@ -1933,6 +1933,13 @@ fn ingest_one_pdf_asset( let pages_for_ocr = ocr_pages_cnt.clone(); let failures_for_ocr = ocr_failures_cnt.clone(); let doc_path_for_log = asset.workspace_path.0.clone(); + // v0.20.x r2 Step 3: pre-capture for dual-write (F1 + G1 resolution). + let doc_id_for_log: String = canonical.doc_id.0.clone(); + let store_for_ocr = Arc::clone(&app.sqlite); + let run_id_for_log: String = lw_for_ocr + .as_ref() + .and_then(|lw| lw.lock().ok().map(|w| w.run_id().to_string())) + .unwrap_or_default(); let summary = crate::pdf_ocr_apply::apply_ocr_to_pdf_pages( &mut canonical, @@ -1974,10 +1981,12 @@ fn ingest_one_pdf_asset( } // v0.20.x Hook 2: write OCR event to log writer. let success = !skipped && failure_reason.is_none(); + let ts_for_event = crate::ingest_log::now_ts(); if let Some(ref lw) = lw_for_ocr { if let Ok(mut w) = lw.lock() { let _ = w.write_event(&crate::ingest_log::LogEvent::Ocr { - ts: crate::ingest_log::now_ts(), + ts: ts_for_event.clone(), + doc_id: Some(&doc_id_for_log), doc_path: &doc_path_for_log, page, image_byte_size, @@ -1991,6 +2000,27 @@ fn ingest_one_pdf_asset( }); } } + // v0.20.x r2: SQLite dual-write (non-critical — R-1). + if let Err(e) = store_for_ocr.record_pdf_ocr_event( + &run_id_for_log, + &ts_for_event, + Some(&doc_id_for_log), + &doc_path_for_log, + page, + image_byte_size, + image_width, + image_height, + ms, + chars, + success, + failure_reason.as_deref(), + engine.engine_name(), + ) { + tracing::warn!( + target: "kebab-app", + "sqlite ocr event insert failed: {e}" + ); + } if let Ok(mut p) = pages_for_ocr.lock() { *p += 1; } diff --git a/crates/kebab-app/tests/pdf_ocr_events_insert_smoke.rs b/crates/kebab-app/tests/pdf_ocr_events_insert_smoke.rs new file mode 100644 index 0000000..8808473 --- /dev/null +++ b/crates/kebab-app/tests/pdf_ocr_events_insert_smoke.rs @@ -0,0 +1,139 @@ +//! Integration smoke test: dual-write (ndjson + SQLite) for PDF OCR events. +//! AC-3: SQLite row count and doc_id matches ndjson LogEvent::Ocr. +//! +//! Uses wiremock to stub the Ollama `/api/generate` endpoint so the test +//! runs without a live Ollama instance. + +mod common; + +use std::path::PathBuf; + +use common::TestEnv; +use kebab_config::LoggingCfg; +use serde_json::Value; +use tokio::task::spawn_blocking; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +fn scanned_pdf_src() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .join("kebab-parse-pdf/tests/fixtures/scanned_page1.pdf") +} + +/// AC-3: ndjson OCR line count == pdf_ocr_events row count, and doc_id matches. +#[tokio::test] +async fn ingest_dual_write_doc_id_matches_ndjson() { + let src = scanned_pdf_src(); + if !src.exists() { + eprintln!("skipping test: scanned_page1.pdf fixture not found"); + return; + } + + let server = MockServer::start().await; + // Stub Ollama /api/generate to return a minimal OCR response. + Mock::given(method("POST")) + .and(path("/api/generate")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "model": "qwen2.5vl:3b", + "response": "test ocr output", + "done": true, + "done_reason": "stop" + }))) + .mount(&server) + .await; + + let mock_url = server.uri(); + + let result = spawn_blocking(move || { + let mut env = TestEnv::lexical_only(); + // Enable PDF OCR + set up mock endpoint + env.config.pdf.ocr.enabled = true; + env.config.pdf.ocr.endpoint = Some(mock_url.clone()); + env.config.pdf.ocr.model = "qwen2.5vl:3b".to_string(); + // Enable ingest log + let log_dir = env.temp.path().join("logs"); + std::fs::create_dir_all(&log_dir).unwrap(); + env.config.logging = LoggingCfg { + ingest_log_enabled: true, + ingest_log_dir: log_dir.clone(), + }; + + // Copy scanned PDF into workspace + let dest = env.workspace_root.join("scanned.pdf"); + std::fs::copy(scanned_pdf_src(), &dest).expect("copy scanned PDF"); + + // Run ingest + kebab_app::ingest_with_config(env.config.clone(), env.scope(), false) + .expect("ingest"); + + // Read ndjson log + let log_files: Vec<_> = std::fs::read_dir(&log_dir) + .unwrap() + .filter_map(Result::ok) + .filter(|e| { + let name = e.file_name().to_string_lossy().to_string(); + name.starts_with("ingest-") && name.ends_with(".ndjson") + }) + .collect(); + assert_eq!(log_files.len(), 1, "expected 1 ndjson log file"); + + let body = std::fs::read_to_string(log_files[0].path()).unwrap(); + let ocr_lines: Vec = body + .lines() + .filter_map(|l| serde_json::from_str(l).ok()) + .filter(|v: &Value| v.get("kind").and_then(Value::as_str) == Some("ocr")) + .collect(); + + // Read pdf_ocr_events from SQLite + let db_path = PathBuf::from(&env.config.storage.data_dir).join("kebab.sqlite"); + let conn = rusqlite::Connection::open(&db_path).expect("open db"); + let rows: Vec<(Option, String)> = { + let mut stmt = conn + .prepare("SELECT doc_id, doc_path FROM pdf_ocr_events ORDER BY id") + .expect("prepare"); + stmt.query_map([], |r| Ok((r.get(0)?, r.get(1)?))) + .expect("query") + .map(|r| r.expect("row")) + .collect() + }; + + (ocr_lines, rows) + }) + .await + .expect("spawn_blocking"); + + let (ocr_lines, rows) = result; + + // At least one OCR event must be produced + assert!(!ocr_lines.is_empty(), "expected ≥1 ndjson ocr line"); + assert!(!rows.is_empty(), "expected ≥1 pdf_ocr_events row"); + + // Row counts must match + assert_eq!( + ocr_lines.len(), + rows.len(), + "ndjson ocr lines ({}) must equal pdf_ocr_events rows ({})", + ocr_lines.len(), + rows.len() + ); + + // doc_id in both sources must be non-null and consistent + for (line, (sql_doc_id, _sql_doc_path)) in ocr_lines.iter().zip(rows.iter()) { + let json_doc_id = line.get("doc_id").and_then(Value::as_str); + assert!( + json_doc_id.is_some(), + "ndjson ocr line should have doc_id: {line}" + ); + assert!( + sql_doc_id.is_some(), + "pdf_ocr_events row should have doc_id" + ); + assert_eq!( + json_doc_id, + sql_doc_id.as_deref(), + "ndjson doc_id must equal SQLite doc_id" + ); + } +}