From f8a4c79727dc88edb9d247b2cc28eb9cec8f53ea Mon Sep 17 00:00:00 2001 From: altair823 Date: Thu, 28 May 2026 02:53:09 +0000 Subject: [PATCH] feat(app): IngestLogWriter + LogEvent enum (per-ingest-run ndjson log) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit v0.20.x ingest log surface 의 module side. crates/kebab-app/src/ ingest_log.rs 신규: * IngestLogWriter — open/write_event/write_summary/flush + Drop flush * LogEvent enum 4 variant (ocr / parse_error / skip / error) * IngestSummary struct (kind="summary" literal + 11 stat field) * generate_run_id (ISO 8601 prefix + uuid v7 마지막 8 hex) * expand_log_dir ({state_dir} placeholder 의 hand-roll expand) * now_ts (Rfc3339 UTC helper) * percentiles helper (sorted Vec p50/p90/max) uuid v7 = workspace dep, rand 신규 의존 회피 (spec §6 R-5). 본 step 은 self-contained writer + 5 unit test. ingest pipeline 의 emit hook 5개 wiring 은 step 4. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-app/Cargo.toml | 1 + crates/kebab-app/src/ingest_log.rs | 306 +++++++++++++++++++++++++++++ crates/kebab-app/src/lib.rs | 2 + 3 files changed, 309 insertions(+) create mode 100644 crates/kebab-app/src/ingest_log.rs diff --git a/crates/kebab-app/Cargo.toml b/crates/kebab-app/Cargo.toml index 239b380..102c5af 100644 --- a/crates/kebab-app/Cargo.toml +++ b/crates/kebab-app/Cargo.toml @@ -45,6 +45,7 @@ blake3 = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } time = { workspace = true } +uuid = { workspace = true } tracing = { workspace = true } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] } tracing-appender = "0.2" diff --git a/crates/kebab-app/src/ingest_log.rs b/crates/kebab-app/src/ingest_log.rs new file mode 100644 index 0000000..0ce0ef5 --- /dev/null +++ b/crates/kebab-app/src/ingest_log.rs @@ -0,0 +1,306 @@ +//! Per-ingest-run structured ndjson log writer (v0.20.x ingest log feature). +//! +//! Each `kebab ingest` run produces one `ingest-{run_id}.ndjson` file in +//! `config.logging.ingest_log_dir`. Records are appended line by line; the +//! last record is always `kind="summary"`. `IngestLogWriter::open` returns +//! `Ok(None)` when `ingest_log_enabled = false` so callers need not branch. + +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +use serde::{Deserialize, Serialize}; +use time::format_description::well_known::Rfc3339; + +pub struct IngestLogWriter { + file: BufWriter, + path: PathBuf, + run_id: String, + started_at: SystemTime, +} + +impl IngestLogWriter { + /// Open a new log file. Returns `Ok(None)` when `cfg.ingest_log_enabled == false` (AC-6). + pub fn open(cfg: &kebab_config::LoggingCfg) -> anyhow::Result> { + if !cfg.ingest_log_enabled { + return Ok(None); + } + let run_id = generate_run_id(); + let log_dir = expand_log_dir(&cfg.ingest_log_dir); + std::fs::create_dir_all(&log_dir)?; + let path = log_dir.join(format!("ingest-{run_id}.ndjson")); + let file = BufWriter::new(File::create(&path)?); + Ok(Some(Self { + file, + path, + run_id, + started_at: SystemTime::now(), + })) + } + + pub fn write_event(&mut self, event: &LogEvent<'_>) -> anyhow::Result<()> { + serde_json::to_writer(&mut self.file, event)?; + writeln!(self.file)?; + Ok(()) + } + + pub fn write_summary(&mut self, summary: &IngestSummary) -> anyhow::Result<()> { + serde_json::to_writer(&mut self.file, summary)?; + writeln!(self.file)?; + Ok(()) + } + + pub fn flush(&mut self) -> anyhow::Result<()> { + self.file.flush()?; + Ok(()) + } + + pub fn run_id(&self) -> &str { + &self.run_id + } + + pub fn path(&self) -> &Path { + &self.path + } + + pub fn started_at(&self) -> SystemTime { + self.started_at + } +} + +impl Drop for IngestLogWriter { + fn drop(&mut self) { + let _ = self.file.flush(); + } +} + +/// ISO 8601 compact timestamp + uuid v7 suffix: `20260528T013000Z-abc123de`. +/// uuid v7 is the workspace dep (Cargo.toml); `rand` is not added (spec §6 R-5). +fn generate_run_id() -> String { + use time::macros::format_description; + let now = time::OffsetDateTime::now_utc(); + let ts = now + .format(format_description!("[year][month][day]T[hour][minute][second]Z")) + .unwrap_or_else(|_| "19700101T000000Z".to_string()); + let uid = uuid::Uuid::now_v7().simple().to_string(); + let suffix = &uid[uid.len() - 8..]; + format!("{ts}-{suffix}") +} + +/// Expand `{state_dir}` placeholder → XDG state dir (spec §6 R-3). +/// Other tilde/env expansion is delegated to `kebab_config::expand_path`. +fn expand_log_dir(path: &Path) -> PathBuf { + let path_str = path.to_string_lossy(); + if path_str.contains("{state_dir}") { + let state_dir = kebab_config::Config::xdg_state_dir(); + PathBuf::from(path_str.replace("{state_dir}", &state_dir.to_string_lossy())) + } else { + path.to_path_buf() + } +} + +/// RFC 3339 UTC timestamp for log records. +#[allow(dead_code)] +pub(crate) fn now_ts() -> String { + time::OffsetDateTime::now_utc() + .format(&Rfc3339) + .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()) +} + +/// Ingest event record (ndjson line). `kind` is the discriminator. +#[derive(Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum LogEvent<'a> { + Ocr { + ts: String, + doc_path: &'a str, + page: u32, + image_byte_size: Option, + image_width: Option, + image_height: Option, + ms: u64, + chars: u32, + success: bool, + reason: Option<&'a str>, + ocr_engine: &'a str, + }, + ParseError { + ts: String, + doc_path: &'a str, + reason: &'a str, + message: &'a str, + }, + Skip { + ts: String, + doc_path: &'a str, + reason: &'a str, + detail: Option<&'a str>, + }, + Error { + ts: String, + code: &'a str, + message: &'a str, + }, +} + +/// Final summary record — always the last line of the log file. +/// Explicit `kind` field serializes to `"kind": "summary"`. +#[derive(Serialize, Deserialize)] +pub struct IngestSummary { + pub kind: String, + pub ts: String, + pub run_id: String, + pub scanned: u32, + pub new: u32, + pub errors: u32, + pub ocr_pages: u32, + pub ocr_failures: u32, + pub ocr_p50_ms: Option, + pub ocr_p90_ms: Option, + pub ocr_max_ms: Option, + pub duration_ms: u64, +} + +impl IngestSummary { + #[allow(clippy::too_many_arguments)] + pub fn new( + ts: String, + run_id: String, + scanned: u32, + new: u32, + errors: u32, + ocr_pages: u32, + ocr_failures: u32, + ocr_ms_samples: &[u64], + duration_ms: u64, + ) -> Self { + let (p50, p90, max) = percentiles(ocr_ms_samples); + Self { + kind: "summary".to_string(), + ts, + run_id, + scanned, + new, + errors, + ocr_pages, + ocr_failures, + ocr_p50_ms: p50, + ocr_p90_ms: p90, + ocr_max_ms: max, + duration_ms, + } + } +} + +/// Simple percentile extraction on a sorted copy of `samples`. +/// Returns `(p50, p90, max)`. All `None` when samples is empty. +pub(crate) fn percentiles(samples: &[u64]) -> (Option, Option, Option) { + if samples.is_empty() { + return (None, None, None); + } + let mut sorted = samples.to_vec(); + sorted.sort_unstable(); + let n = sorted.len(); + let p50 = sorted[n * 50 / 100]; + let p90 = sorted[n * 90 / 100]; + let max = *sorted.last().unwrap(); + (Some(p50), Some(p90), Some(max)) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + use kebab_config::LoggingCfg; + + #[test] + fn generate_run_id_has_iso_prefix_and_8_hex_suffix() { + let id = generate_run_id(); + // Format: YYYYMMDDTHHmmssZ-xxxxxxxx (total len = 16+1+8 = 25) + assert_eq!(id.len(), 25, "run_id len should be 25: {id}"); + let (prefix, suffix) = id.split_once('-').expect("run_id should contain '-'"); + assert_eq!(prefix.len(), 16, "prefix should be 16 chars: {prefix}"); + assert!(prefix.contains('T'), "prefix should contain T: {prefix}"); + assert!(prefix.ends_with('Z'), "prefix should end with Z: {prefix}"); + assert_eq!(suffix.len(), 8, "suffix should be 8 chars: {suffix}"); + assert!(suffix.chars().all(|c| c.is_ascii_hexdigit()), "suffix should be hex: {suffix}"); + } + + #[test] + fn expand_log_dir_substitutes_state_dir_placeholder() { + let input = PathBuf::from("{state_dir}/logs"); + let expanded = expand_log_dir(&input); + let expected = kebab_config::Config::xdg_state_dir().join("logs"); + assert_eq!(expanded, expected); + assert!(!expanded.to_string_lossy().contains("{state_dir}")); + } + + #[test] + fn writer_disabled_returns_none() { + let cfg = LoggingCfg { + ingest_log_enabled: false, + ingest_log_dir: PathBuf::from("/tmp/should-not-exist"), + }; + let result = IngestLogWriter::open(&cfg).expect("open should not error"); + assert!(result.is_none(), "disabled writer should return None"); + } + + #[test] + fn writer_writes_one_event_per_line_with_kind_discriminator() { + let tmp = TempDir::new().unwrap(); + let cfg = LoggingCfg { + ingest_log_enabled: true, + ingest_log_dir: tmp.path().to_path_buf(), + }; + let mut writer = IngestLogWriter::open(&cfg).unwrap().unwrap(); + let path = writer.path().to_path_buf(); + + writer.write_event(&LogEvent::Skip { + ts: now_ts(), + doc_path: "a.zip", + reason: "builtin_blacklist", + detail: Some(".zip extension"), + }).unwrap(); + writer.write_event(&LogEvent::Error { + ts: now_ts(), + code: "ingest_fatal", + message: "something bad", + }).unwrap(); + writer.write_event(&LogEvent::ParseError { + ts: now_ts(), + doc_path: "weird.pdf", + reason: "lopdf_error", + message: "unexpected EOF", + }).unwrap(); + writer.flush().unwrap(); + + let contents = std::fs::read_to_string(&path).unwrap(); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!(lines.len(), 3, "expected 3 lines, got: {}", lines.len()); + for line in &lines { + assert!(line.starts_with('{'), "each line should be JSON object: {line}"); + assert!(line.contains("\"kind\""), "each line should have 'kind': {line}"); + } + } + + #[test] + fn drop_flushes_pending_buffer() { + let tmp = TempDir::new().unwrap(); + let cfg = LoggingCfg { + ingest_log_enabled: true, + ingest_log_dir: tmp.path().to_path_buf(), + }; + let mut writer = IngestLogWriter::open(&cfg).unwrap().unwrap(); + let path = writer.path().to_path_buf(); + writer.write_event(&LogEvent::Error { + ts: now_ts(), + code: "test", + message: "drop flush test", + }).unwrap(); + // Drop without explicit flush — Drop impl should flush BufWriter. + drop(writer); + let contents = std::fs::read_to_string(&path).unwrap(); + assert!(contents.lines().count() >= 1, "file should have at least 1 line after drop"); + } +} diff --git a/crates/kebab-app/src/lib.rs b/crates/kebab-app/src/lib.rs index 1761551..47cb9ec 100644 --- a/crates/kebab-app/src/lib.rs +++ b/crates/kebab-app/src/lib.rs @@ -60,6 +60,7 @@ pub mod error_signal; pub mod error_wire; pub mod external; pub mod fetch; +pub mod ingest_log; pub mod ingest_progress; pub mod logging; pub mod pdf_ocr_apply; @@ -68,6 +69,7 @@ pub mod schema; mod staleness; pub use app::{App, SearchResponse, short_query_hint}; +pub use ingest_log::{IngestLogWriter, IngestSummary, LogEvent}; pub use ingest_progress::{AggregateCounts, IngestEvent, render_skipped_breakdown}; pub use reset::{ResetReport, ResetScope, enumerate_orphans}; pub use error_wire::{ERROR_V1_ID, ErrorV1, StructuredError, classify};