From f9dc0f749fb3d609e2ccb05dbea6f074f97ff0b5 Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 28 May 2026 03:05:07 +0000 Subject: [PATCH] feat(app): wire IngestLogWriter into 5 ingest emit hooks (Arc sync) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit v0.20.x ingest log feature 의 ingest pipeline wiring. 5 emit hook: Hook 1: ingest_with_config_opts entry/exit (writer init + summary write + flush) Hook 2: apply_ocr_to_pdf_pages closure (PdfOcrProgress::Finished → LogEvent::Ocr) Hook 3: ingest_one_*_asset Err arm (LogEvent::Error) Hook 4: scan 직후 fs_skips.events enumerate (LogEvent::Skip) Hook 5: (Hook 3 통합) per-asset fatal error → LogEvent::Error Hook 4 의 skip event carry 위해 kebab-source-fs 의 FsScanSkips 에 events: Vec field 추가 (kebab-source-fs 가 kebab-app 재호출 안 함 — cycle 회피). Ownership: Option>> binding 1 곳, 5 hook 이 clone+lock+write. ocr_ms_samples (Vec success-only) 는 Arc 로 share, summary stage 가 sort+p50/p90/max 계산. single-threaded per-asset loop 라 deadlock/contention 위험 없음. Writer 실패는 ingest 자체 fail 시키지 않음 (tracing::warn + 진행). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-app/src/lib.rs | 117 +++++++++++++++++++++++- crates/kebab-source-fs/src/connector.rs | 40 ++++++++ 2 files changed, 155 insertions(+), 2 deletions(-) diff --git a/crates/kebab-app/src/lib.rs b/crates/kebab-app/src/lib.rs index 7fbda13..8f983aa 100644 --- a/crates/kebab-app/src/lib.rs +++ b/crates/kebab-app/src/lib.rs @@ -34,7 +34,7 @@ //! still allowing the cross-crate calls. use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use anyhow::{Context, anyhow}; use serde::{Deserialize, Serialize}; @@ -297,6 +297,24 @@ pub fn ingest_with_config_opts( let app = App::open_with_config(config)?; + // v0.20.x Hook 1: init per-run log writer (None when disabled or on open failure). + let log_writer: Option>> = + match crate::ingest_log::IngestLogWriter::open(&app.config.logging) { + Ok(Some(w)) => Some(Arc::new(Mutex::new(w))), + Ok(None) => None, + Err(e) => { + tracing::warn!( + target: "kebab-app", + error = %e, + "ingest_log: failed to open log file; logging disabled for this run" + ); + None + } + }; + let ocr_ms_samples: Arc>> = Arc::new(Mutex::new(Vec::new())); + let ocr_pages_cnt: Arc> = Arc::new(Mutex::new(0u32)); + let ocr_failures_cnt: Arc> = Arc::new(Mutex::new(0u32)); + // Walk the workspace. crate::ingest_progress::emit( progress, @@ -316,6 +334,20 @@ pub fn ingest_with_config_opts( }, ); + // v0.20.x Hook 4: emit skip events from scan into log writer. + if let Some(ref lw) = log_writer { + for ev in &fs_skips.events { + if let Ok(mut w) = lw.lock() { + let _ = w.write_event(&crate::ingest_log::LogEvent::Skip { + ts: crate::ingest_log::now_ts(), + doc_path: &ev.doc_path, + reason: ev.reason, + detail: ev.detail.as_deref(), + }); + } + } + } + // Embedder + vector store: build once at the top so the cold-start // cost is paid once even when the workspace has 1000 markdown files. let embedder = app.embedder()?; @@ -477,6 +509,10 @@ pub fn ingest_with_config_opts( pdf_ocr_engine.as_ref(), progress, opts.cancel.as_ref(), + log_writer.clone(), + ocr_ms_samples.clone(), + ocr_pages_cnt.clone(), + ocr_failures_cnt.clone(), ); let item = match item { @@ -488,6 +524,16 @@ pub fn ingest_with_config_opts( error = %e, "kb-app::ingest: per-file fatal" ); + // v0.20.x Hook 3: write per-asset error to log writer. + if let Some(ref lw) = log_writer { + if let Ok(mut w) = lw.lock() { + let _ = w.write_event(&crate::ingest_log::LogEvent::Error { + ts: crate::ingest_log::now_ts(), + code: "ingest_asset_error", + message: &format!("{e:#}"), + }); + } + } // Note: `error_count += 1` happens below in the // `match item.kind { Error => ... }` arm — incrementing // here too would double-count (a regression first @@ -714,6 +760,29 @@ pub fn ingest_with_config_opts( } } + // v0.20.x Hook 1 exit: write summary record + flush log writer. + if let Some(ref lw) = log_writer { + if let Ok(mut w) = lw.lock() { + let run_id = w.run_id().to_string(); + let ms_samples = ocr_ms_samples.lock().map(|v| v.clone()).unwrap_or_default(); + let pages = ocr_pages_cnt.lock().map(|v| *v).unwrap_or(0); + let failures = ocr_failures_cnt.lock().map(|v| *v).unwrap_or(0); + let summary = crate::ingest_log::IngestSummary::new( + crate::ingest_log::now_ts(), + run_id, + scanned_count, + new_count, + error_count, + pages, + failures, + &ms_samples, + started_instant.elapsed().as_millis() as u64, + ); + let _ = w.write_summary(&summary); + let _ = w.flush(); + } + } + Ok(IngestReport { scope, scanned: scanned_count, @@ -1002,6 +1071,10 @@ fn ingest_one_asset( pdf_ocr_engine: Option<&OllamaVisionOcr>, progress: Option<&std::sync::mpsc::Sender>, cancel: Option<&std::sync::Arc>, + log_writer: Option>>, + ocr_ms_samples: Arc>>, + ocr_pages_cnt: Arc>, + ocr_failures_cnt: Arc>, ) -> anyhow::Result { tracing::debug!( target: "kebab-app::ingest", @@ -1040,6 +1113,10 @@ fn ingest_one_asset( pdf_ocr_engine, progress, cancel, + log_writer, + ocr_ms_samples, + ocr_pages_cnt, + ocr_failures_cnt, ); } // p10-1A-2 / 1B: code ingest dispatch. p10-2: Tier 2 langs added. p10-3: shell added. p10-1D: c/cpp added. @@ -1780,6 +1857,10 @@ fn ingest_one_pdf_asset( pdf_ocr_engine: Option<&OllamaVisionOcr>, progress: Option<&std::sync::mpsc::Sender>, cancel: Option<&std::sync::Arc>, + log_writer: Option>>, + ocr_ms_samples: Arc>>, + ocr_pages_cnt: Arc>, + ocr_failures_cnt: Arc>, ) -> anyhow::Result { let path = match &asset.source_uri { SourceUri::File(p) => p.clone(), @@ -1849,6 +1930,13 @@ fn ingest_one_pdf_asset( lang_hint: app.config.pdf.ocr.lang_hint.clone().map(kebab_core::Lang), cancel: cancel.cloned(), }; + // v0.20.x Hook 2: pre-clone Arcs for capture by OCR closure. + let lw_for_ocr = log_writer.clone(); + let samples_for_ocr = ocr_ms_samples.clone(); + let pages_for_ocr = ocr_pages_cnt.clone(); + let failures_for_ocr = ocr_failures_cnt.clone(); + let doc_path_for_log = asset.workspace_path.0.clone(); + let summary = crate::pdf_ocr_apply::apply_ocr_to_pdf_pages( &mut canonical, engine, @@ -1872,7 +1960,7 @@ fn ingest_one_pdf_asset( image_byte_size, image_width, image_height, - failure_reason, + ref failure_reason, } => { if let Some(sender) = progress { let _ = sender.send( @@ -1889,6 +1977,31 @@ fn ingest_one_pdf_asset( }, ); } + // v0.20.x Hook 2: write OCR event to log writer. + let success = !skipped && failure_reason.is_none(); + if let Some(ref lw) = lw_for_ocr { + if let Ok(mut w) = lw.lock() { + let _ = w.write_event(&crate::ingest_log::LogEvent::Ocr { + ts: crate::ingest_log::now_ts(), + doc_path: &doc_path_for_log, + page, + image_byte_size, + image_width, + image_height, + ms, + chars, + success, + reason: failure_reason.as_deref(), + ocr_engine: engine.engine_name(), + }); + } + } + if let Ok(mut p) = pages_for_ocr.lock() { *p += 1; } + if success { + if let Ok(mut s) = samples_for_ocr.lock() { s.push(ms); } + } else if let Ok(mut f) = failures_for_ocr.lock() { + *f += 1; + } } }, )?; diff --git a/crates/kebab-source-fs/src/connector.rs b/crates/kebab-source-fs/src/connector.rs index 95ddc57..c37abc4 100644 --- a/crates/kebab-source-fs/src/connector.rs +++ b/crates/kebab-source-fs/src/connector.rs @@ -108,6 +108,8 @@ impl FsSourceConnector { // Accumulate per-category skip counts and sample paths. let mut fs_skips = FsScanSkips::default(); for entry in &skipped_entries { + let rel_path = entry.path.strip_prefix(&root).unwrap_or(&entry.path); + let doc_path = rel_path.to_string_lossy().replace('\\', "/"); match entry.category { SkipCategory::BuiltinBlacklist => { fs_skips.skipped_builtin_blacklist = @@ -117,6 +119,14 @@ impl FsSourceConnector { &entry.path, &root, ); + let ext = entry.path.extension() + .map(|e| format!(".{}", e.to_string_lossy())) + .unwrap_or_default(); + fs_skips.events.push(FsSkipEvent { + doc_path, + reason: "builtin_blacklist", + detail: if ext.is_empty() { None } else { Some(ext) }, + }); } SkipCategory::Gitignore => { fs_skips.skipped_gitignore = @@ -126,11 +136,21 @@ impl FsSourceConnector { &entry.path, &root, ); + fs_skips.events.push(FsSkipEvent { + doc_path, + reason: "gitignore", + detail: None, + }); } SkipCategory::Kebabignore => { fs_skips.skipped_kebabignore = fs_skips.skipped_kebabignore.saturating_add(1); // kebabignore intentionally NOT in skip_examples per spec §5.5. + fs_skips.events.push(FsSkipEvent { + doc_path, + reason: "kebabignore", + detail: None, + }); } SkipCategory::Other => { // DEFAULT_EXCLUDES or config.workspace.exclude — no dedicated @@ -162,6 +182,11 @@ impl FsSourceConnector { path = %rel_path.display(), "skip: generated-file marker detected" ); + fs_skips.events.push(FsSkipEvent { + doc_path: rel_path.to_string_lossy().replace('\\', "/"), + reason: "generated", + detail: None, + }); continue; } @@ -189,6 +214,11 @@ impl FsSourceConnector { max_lines = self.max_file_lines, "skip: code file exceeds size cap" ); + fs_skips.events.push(FsSkipEvent { + doc_path: rel_path.to_string_lossy().replace('\\', "/"), + reason: "size_exceeded", + detail: None, + }); continue; } @@ -218,6 +248,16 @@ pub struct FsScanSkips { /// Sample paths per spec §5.5 (≤ 5 per category). Paths are /// workspace-relative POSIX strings when available, absolute otherwise. pub skip_examples: SkipExamples, + /// v0.20.x ingest log: per-file skip events for structured log writing. + pub events: Vec, +} + +/// A single per-file skip event for structured ingest log (v0.20.x). +#[derive(Debug)] +pub struct FsSkipEvent { + pub doc_path: String, + pub reason: &'static str, + pub detail: Option, } /// Push a path into a sample vec (cap = 5) as a workspace-relative POSIX