feat(app): wire IngestLogWriter into 5 ingest emit hooks (Arc<Mutex> sync)

v0.20.x ingest log feature 의 ingest pipeline wiring. 5 emit hook:

  Hook 1: ingest_with_config_opts entry/exit (writer init + summary write + flush)
  Hook 2: apply_ocr_to_pdf_pages closure (PdfOcrProgress::Finished → LogEvent::Ocr)
  Hook 3: ingest_one_*_asset Err arm (LogEvent::Error)
  Hook 4: scan 직후 fs_skips.events enumerate (LogEvent::Skip)
  Hook 5: (Hook 3 통합) per-asset fatal error → LogEvent::Error

Hook 4 의 skip event carry 위해 kebab-source-fs 의 FsScanSkips 에
events: Vec<FsSkipEvent> field 추가 (kebab-source-fs 가 kebab-app
재호출 안 함 — cycle 회피).

Ownership: Option<Arc<Mutex<IngestLogWriter>>> binding 1 곳, 5 hook 이
clone+lock+write. ocr_ms_samples (Vec<u64> success-only) 는 Arc<Mutex>
로 share, summary stage 가 sort+p50/p90/max 계산. single-threaded
per-asset loop 라 deadlock/contention 위험 없음.

Writer 실패는 ingest 자체 fail 시키지 않음 (tracing::warn + 진행).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-28 03:05:07 +00:00
parent bef0c98867
commit f9dc0f749f
2 changed files with 155 additions and 2 deletions

View File

@@ -34,7 +34,7 @@
//! still allowing the cross-crate calls.
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use anyhow::{Context, anyhow};
use serde::{Deserialize, Serialize};
@@ -297,6 +297,24 @@ pub fn ingest_with_config_opts(
let app = App::open_with_config(config)?;
// v0.20.x Hook 1: init per-run log writer (None when disabled or on open failure).
let log_writer: Option<Arc<Mutex<crate::ingest_log::IngestLogWriter>>> =
match crate::ingest_log::IngestLogWriter::open(&app.config.logging) {
Ok(Some(w)) => Some(Arc::new(Mutex::new(w))),
Ok(None) => None,
Err(e) => {
tracing::warn!(
target: "kebab-app",
error = %e,
"ingest_log: failed to open log file; logging disabled for this run"
);
None
}
};
let ocr_ms_samples: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
let ocr_pages_cnt: Arc<Mutex<u32>> = Arc::new(Mutex::new(0u32));
let ocr_failures_cnt: Arc<Mutex<u32>> = Arc::new(Mutex::new(0u32));
// Walk the workspace.
crate::ingest_progress::emit(
progress,
@@ -316,6 +334,20 @@ pub fn ingest_with_config_opts(
},
);
// v0.20.x Hook 4: emit skip events from scan into log writer.
if let Some(ref lw) = log_writer {
for ev in &fs_skips.events {
if let Ok(mut w) = lw.lock() {
let _ = w.write_event(&crate::ingest_log::LogEvent::Skip {
ts: crate::ingest_log::now_ts(),
doc_path: &ev.doc_path,
reason: ev.reason,
detail: ev.detail.as_deref(),
});
}
}
}
// Embedder + vector store: build once at the top so the cold-start
// cost is paid once even when the workspace has 1000 markdown files.
let embedder = app.embedder()?;
@@ -477,6 +509,10 @@ pub fn ingest_with_config_opts(
pdf_ocr_engine.as_ref(),
progress,
opts.cancel.as_ref(),
log_writer.clone(),
ocr_ms_samples.clone(),
ocr_pages_cnt.clone(),
ocr_failures_cnt.clone(),
);
let item = match item {
@@ -488,6 +524,16 @@ pub fn ingest_with_config_opts(
error = %e,
"kb-app::ingest: per-file fatal"
);
// v0.20.x Hook 3: write per-asset error to log writer.
if let Some(ref lw) = log_writer {
if let Ok(mut w) = lw.lock() {
let _ = w.write_event(&crate::ingest_log::LogEvent::Error {
ts: crate::ingest_log::now_ts(),
code: "ingest_asset_error",
message: &format!("{e:#}"),
});
}
}
// Note: `error_count += 1` happens below in the
// `match item.kind { Error => ... }` arm — incrementing
// here too would double-count (a regression first
@@ -714,6 +760,29 @@ pub fn ingest_with_config_opts(
}
}
// v0.20.x Hook 1 exit: write summary record + flush log writer.
if let Some(ref lw) = log_writer {
if let Ok(mut w) = lw.lock() {
let run_id = w.run_id().to_string();
let ms_samples = ocr_ms_samples.lock().map(|v| v.clone()).unwrap_or_default();
let pages = ocr_pages_cnt.lock().map(|v| *v).unwrap_or(0);
let failures = ocr_failures_cnt.lock().map(|v| *v).unwrap_or(0);
let summary = crate::ingest_log::IngestSummary::new(
crate::ingest_log::now_ts(),
run_id,
scanned_count,
new_count,
error_count,
pages,
failures,
&ms_samples,
started_instant.elapsed().as_millis() as u64,
);
let _ = w.write_summary(&summary);
let _ = w.flush();
}
}
Ok(IngestReport {
scope,
scanned: scanned_count,
@@ -1002,6 +1071,10 @@ fn ingest_one_asset(
pdf_ocr_engine: Option<&OllamaVisionOcr>,
progress: Option<&std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
cancel: Option<&std::sync::Arc<std::sync::atomic::AtomicBool>>,
log_writer: Option<Arc<Mutex<crate::ingest_log::IngestLogWriter>>>,
ocr_ms_samples: Arc<Mutex<Vec<u64>>>,
ocr_pages_cnt: Arc<Mutex<u32>>,
ocr_failures_cnt: Arc<Mutex<u32>>,
) -> anyhow::Result<kebab_core::IngestItem> {
tracing::debug!(
target: "kebab-app::ingest",
@@ -1040,6 +1113,10 @@ fn ingest_one_asset(
pdf_ocr_engine,
progress,
cancel,
log_writer,
ocr_ms_samples,
ocr_pages_cnt,
ocr_failures_cnt,
);
}
// p10-1A-2 / 1B: code ingest dispatch. p10-2: Tier 2 langs added. p10-3: shell added. p10-1D: c/cpp added.
@@ -1780,6 +1857,10 @@ fn ingest_one_pdf_asset(
pdf_ocr_engine: Option<&OllamaVisionOcr>,
progress: Option<&std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
cancel: Option<&std::sync::Arc<std::sync::atomic::AtomicBool>>,
log_writer: Option<Arc<Mutex<crate::ingest_log::IngestLogWriter>>>,
ocr_ms_samples: Arc<Mutex<Vec<u64>>>,
ocr_pages_cnt: Arc<Mutex<u32>>,
ocr_failures_cnt: Arc<Mutex<u32>>,
) -> anyhow::Result<kebab_core::IngestItem> {
let path = match &asset.source_uri {
SourceUri::File(p) => p.clone(),
@@ -1849,6 +1930,13 @@ fn ingest_one_pdf_asset(
lang_hint: app.config.pdf.ocr.lang_hint.clone().map(kebab_core::Lang),
cancel: cancel.cloned(),
};
// v0.20.x Hook 2: pre-clone Arcs for capture by OCR closure.
let lw_for_ocr = log_writer.clone();
let samples_for_ocr = ocr_ms_samples.clone();
let pages_for_ocr = ocr_pages_cnt.clone();
let failures_for_ocr = ocr_failures_cnt.clone();
let doc_path_for_log = asset.workspace_path.0.clone();
let summary = crate::pdf_ocr_apply::apply_ocr_to_pdf_pages(
&mut canonical,
engine,
@@ -1872,7 +1960,7 @@ fn ingest_one_pdf_asset(
image_byte_size,
image_width,
image_height,
failure_reason,
ref failure_reason,
} => {
if let Some(sender) = progress {
let _ = sender.send(
@@ -1889,6 +1977,31 @@ fn ingest_one_pdf_asset(
},
);
}
// v0.20.x Hook 2: write OCR event to log writer.
let success = !skipped && failure_reason.is_none();
if let Some(ref lw) = lw_for_ocr {
if let Ok(mut w) = lw.lock() {
let _ = w.write_event(&crate::ingest_log::LogEvent::Ocr {
ts: crate::ingest_log::now_ts(),
doc_path: &doc_path_for_log,
page,
image_byte_size,
image_width,
image_height,
ms,
chars,
success,
reason: failure_reason.as_deref(),
ocr_engine: engine.engine_name(),
});
}
}
if let Ok(mut p) = pages_for_ocr.lock() { *p += 1; }
if success {
if let Ok(mut s) = samples_for_ocr.lock() { s.push(ms); }
} else if let Ok(mut f) = failures_for_ocr.lock() {
*f += 1;
}
}
},
)?;

View File

@@ -108,6 +108,8 @@ impl FsSourceConnector {
// Accumulate per-category skip counts and sample paths.
let mut fs_skips = FsScanSkips::default();
for entry in &skipped_entries {
let rel_path = entry.path.strip_prefix(&root).unwrap_or(&entry.path);
let doc_path = rel_path.to_string_lossy().replace('\\', "/");
match entry.category {
SkipCategory::BuiltinBlacklist => {
fs_skips.skipped_builtin_blacklist =
@@ -117,6 +119,14 @@ impl FsSourceConnector {
&entry.path,
&root,
);
let ext = entry.path.extension()
.map(|e| format!(".{}", e.to_string_lossy()))
.unwrap_or_default();
fs_skips.events.push(FsSkipEvent {
doc_path,
reason: "builtin_blacklist",
detail: if ext.is_empty() { None } else { Some(ext) },
});
}
SkipCategory::Gitignore => {
fs_skips.skipped_gitignore =
@@ -126,11 +136,21 @@ impl FsSourceConnector {
&entry.path,
&root,
);
fs_skips.events.push(FsSkipEvent {
doc_path,
reason: "gitignore",
detail: None,
});
}
SkipCategory::Kebabignore => {
fs_skips.skipped_kebabignore =
fs_skips.skipped_kebabignore.saturating_add(1);
// kebabignore intentionally NOT in skip_examples per spec §5.5.
fs_skips.events.push(FsSkipEvent {
doc_path,
reason: "kebabignore",
detail: None,
});
}
SkipCategory::Other => {
// DEFAULT_EXCLUDES or config.workspace.exclude — no dedicated
@@ -162,6 +182,11 @@ impl FsSourceConnector {
path = %rel_path.display(),
"skip: generated-file marker detected"
);
fs_skips.events.push(FsSkipEvent {
doc_path: rel_path.to_string_lossy().replace('\\', "/"),
reason: "generated",
detail: None,
});
continue;
}
@@ -189,6 +214,11 @@ impl FsSourceConnector {
max_lines = self.max_file_lines,
"skip: code file exceeds size cap"
);
fs_skips.events.push(FsSkipEvent {
doc_path: rel_path.to_string_lossy().replace('\\', "/"),
reason: "size_exceeded",
detail: None,
});
continue;
}
@@ -218,6 +248,16 @@ pub struct FsScanSkips {
/// Sample paths per spec §5.5 (≤ 5 per category). Paths are
/// workspace-relative POSIX strings when available, absolute otherwise.
pub skip_examples: SkipExamples,
/// v0.20.x ingest log: per-file skip events for structured log writing.
pub events: Vec<FsSkipEvent>,
}
/// A single per-file skip event for structured ingest log (v0.20.x).
#[derive(Debug)]
pub struct FsSkipEvent {
pub doc_path: String,
pub reason: &'static str,
pub detail: Option<String>,
}
/// Push a path into a sample vec (cap = 5) as a workspace-relative POSIX