//! Integration coverage for `ingest_with_config_cancellable` //! (p9-fb-04). Asserts the §10 invariants: //! //! - Cancel set BEFORE the loop starts → no asset is processed. //! Terminal event is `Aborted` with all-zero counts. //! - Cancel set MID-LOOP → at least one asset committed; remaining //! assets skipped; terminal event is `Aborted` with partial counts; //! re-running on the same workspace finishes the job (idempotent). mod common; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use common::TestEnv; use kebab_app::IngestEvent; fn run_with( env: &TestEnv, cancel: Arc, progress: Option>, ) -> kebab_core::IngestReport { kebab_app::ingest_with_config_cancellable( env.config.clone(), env.scope(), true, progress, Some(cancel), ) .unwrap() } #[test] fn cancel_before_loop_emits_aborted_with_zero_counts() { let env = TestEnv::lexical_only(); let (tx, rx) = mpsc::channel::(); let cancel = Arc::new(AtomicBool::new(true)); // pre-set let report = run_with(&env, cancel, Some(tx)); // Report itself surfaces partial counts — no assets processed // because the very first iteration check tripped. assert_eq!(report.scanned, 3, "scanned reflects discovery, not work"); assert_eq!(report.new, 0, "no asset committed: {report:?}"); // Drain the channel; the terminal event must be Aborted. let events: Vec<_> = rx.into_iter().collect(); let last = events.last().expect("at least one event"); assert!( matches!(last, IngestEvent::Aborted { .. }), "expected Aborted, got {last:?}" ); if let IngestEvent::Aborted { counts } = last { assert_eq!(counts.new, 0); } } #[test] fn cancel_mid_loop_after_first_asset_keeps_idempotent_resume() { // Strategy: subscribe to progress, flip cancel as soon as the // first AssetFinished arrives. The ingest loop will see cancel=true // on the *next* iteration and break — exactly one asset committed. let env = TestEnv::lexical_only(); let (tx, rx) = mpsc::channel::(); let cancel = Arc::new(AtomicBool::new(false)); let cancel_for_listener = cancel.clone(); // Background listener flips cancel after the first AssetFinished. let listener = std::thread::spawn(move || { for event in rx { if let IngestEvent::AssetFinished { .. } = event { cancel_for_listener.store(true, Ordering::Relaxed); break; } } // Drain the rest so the channel doesn't fill while ingest // continues emitting (until the next iteration check). }); let report = run_with(&env, cancel, Some(tx)); listener.join().unwrap(); // cancel-mid is timing-dependent: the listener flips cancel // after the first AssetFinished, but the loop may have started // 1 more asset by the time the next iteration check runs. // 0 (race won by listener), 1 (first only), or 2 (one extra // slipped in) are all valid outcomes; report.new == 3 means // cancel never propagated and is the only failure mode. assert!(report.new < 3, "loop should have broken: {report:?}"); // Idempotent re-ingest finishes the job. let r2 = kebab_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap(); assert_eq!(r2.scanned, 3, "re-scan: {r2:?}"); // Total committed across both runs covers all 3 docs (some New // first run, rest New on second; or first run was 0 → all New on // second). let total_new = report.new + r2.new; let total_updated = report.updated + r2.updated; assert!( total_new + total_updated >= 3, "across both runs: report={report:?}, r2={r2:?}" ); } #[test] fn cancel_none_is_uncancellable_default() { // ingest_with_config_progress (no cancel) runs to completion. let env = TestEnv::lexical_only(); let (tx, rx) = mpsc::channel::(); let report = kebab_app::ingest_with_config_progress(env.config.clone(), env.scope(), true, Some(tx)) .unwrap(); assert_eq!(report.scanned, 3); assert_eq!(report.new, 3); let events: Vec<_> = rx.into_iter().collect(); let last = events.last().expect("events"); assert!( matches!(last, IngestEvent::Completed { .. }), "expected Completed (no cancel), got {last:?}" ); }