feat: ingest cooperative cancellation (p9-fb-04)
Ctrl-C / Esc 가 ingest 를 즉시 중단. 현재 in-flight asset 마무리 후
이후 asset 미실행, IngestEvent::Aborted { partial_counts } 발신,
Ok(IngestReport) 정상 반환 (Err 아님). 부분 commit 보존, 다음 ingest
가 idempotent 재개.
신규 facade: kebab-app::ingest_with_config_cancellable(.., progress,
cancel: Option<Arc<AtomicBool>>). 기존 _progress 가 cancel=None
forwarding wrapper. asset loop 시작 boundary 마다 atomic load —
true 면 break + Aborted emit + 정상 종료. Lock 없음.
CLI: ctrlc crate 신규 dep. SIGINT handler 가 첫 신호에 cancel.store(true)
+ stderr hint, 두 번째 신호에 std::process::exit(130) (canonical SIGINT
exit code). install_sigint_cancel() helper 가 Arc<AtomicBool> 반환,
Cmd::Ingest 가 facade 에 전달.
TUI: IngestState 에 cancel: Arc<AtomicBool> field 추가 (회차 1 review
결과의 reshape 정확). start_ingest 가 둘 다 만들어 worker 에 clone
move. cancel_running_ingest(&app) helper — Esc / Ctrl-C 가
ingest 진행 중일 때만 cancel 우선, 그 외에는 quit.
Test:
- 3 facade integration (cancel-before / cancel-mid / no-cancel
default).
- 3 tui lib unit (cancel_running_ingest no-state / in-flight /
terminated).
Plan 갱신: p9-fb-04 status planned → in_progress. 머지 후 한 줄
commit 으로 completed flip.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -191,8 +191,43 @@ pub fn ingest_with_config_progress(
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
ingest_with_config_cancellable(config, scope, summary_only, progress, None)
|
||||
}
|
||||
|
||||
/// Config + progress + cancel variant (p9-fb-04). The caller injects
|
||||
/// an `Arc<AtomicBool>` cancel token; setting it to `true` causes the
|
||||
/// ingest loop to break at the next step boundary (asset loop iter
|
||||
/// start), emit `IngestEvent::Aborted { counts: <partial> }`, and
|
||||
/// return `Ok(IngestReport)` with whatever assets were committed
|
||||
/// before cancellation. Per design §10:
|
||||
///
|
||||
/// - The current in-flight asset finishes (rollback would break
|
||||
/// idempotent re-run). Subsequent assets are skipped.
|
||||
/// - Cancellation is a normal exit, not an error — `Result::Err` is
|
||||
/// reserved for actual failures.
|
||||
/// - Partial commits in SQLite are kept; the next `kebab ingest` run
|
||||
/// picks up where this one left off (deterministic asset_id +
|
||||
/// doc_id recipes).
|
||||
///
|
||||
/// CLI's `Ctrl-C` SIGINT handler and TUI's `Esc` / `Ctrl-C` both
|
||||
/// flip the same `AtomicBool`. Pass `None` to retain pre-p9-fb-04
|
||||
/// behaviour (uncancellable).
|
||||
#[doc(hidden)]
|
||||
pub fn ingest_with_config_cancellable(
|
||||
config: kebab_config::Config,
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
let progress = progress.as_ref();
|
||||
let cancelled = || {
|
||||
cancel
|
||||
.as_ref()
|
||||
.map(|c| c.load(std::sync::atomic::Ordering::Relaxed))
|
||||
.unwrap_or(false)
|
||||
};
|
||||
let started_instant = std::time::Instant::now();
|
||||
|
||||
let app = App::open_with_config(config)?;
|
||||
@@ -293,7 +328,20 @@ pub fn ingest_with_config_progress(
|
||||
|
||||
let embed_active = embedder.is_some() && vector_store.is_some();
|
||||
|
||||
// p9-fb-04: track whether the loop exited via cancellation (vs
|
||||
// running to completion) so we can emit `Aborted` rather than
|
||||
// `Completed` and surface the right summary.
|
||||
let mut was_cancelled = false;
|
||||
|
||||
for (zero_idx, asset) in assets.into_iter().enumerate() {
|
||||
// Step boundary check (p9-fb-04). Designed §10 invariant: the
|
||||
// current in-flight asset finishes (idempotent re-run guard);
|
||||
// subsequent assets are skipped. Check here is the cheapest
|
||||
// possible — atomic load each iteration, no lock.
|
||||
if cancelled() {
|
||||
was_cancelled = true;
|
||||
break;
|
||||
}
|
||||
let idx = u32::try_from(zero_idx + 1).unwrap_or(u32::MAX);
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
@@ -501,20 +549,25 @@ pub fn ingest_with_config_progress(
|
||||
"kb-app::ingest: run complete"
|
||||
);
|
||||
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
let final_counts = crate::ingest_progress::AggregateCounts {
|
||||
scanned: scanned_count,
|
||||
new: new_count,
|
||||
updated: updated_count,
|
||||
skipped: skipped_count,
|
||||
errors: error_count,
|
||||
chunks_indexed,
|
||||
embeddings_indexed,
|
||||
};
|
||||
let terminal_event = if was_cancelled {
|
||||
crate::ingest_progress::IngestEvent::Aborted {
|
||||
counts: final_counts,
|
||||
}
|
||||
} else {
|
||||
crate::ingest_progress::IngestEvent::Completed {
|
||||
counts: crate::ingest_progress::AggregateCounts {
|
||||
scanned: scanned_count,
|
||||
new: new_count,
|
||||
updated: updated_count,
|
||||
skipped: skipped_count,
|
||||
errors: error_count,
|
||||
chunks_indexed,
|
||||
embeddings_indexed,
|
||||
},
|
||||
},
|
||||
);
|
||||
counts: final_counts,
|
||||
}
|
||||
};
|
||||
crate::ingest_progress::emit(progress, terminal_event);
|
||||
|
||||
Ok(IngestReport {
|
||||
scope,
|
||||
|
||||
125
crates/kebab-app/tests/ingest_cancel.rs
Normal file
125
crates/kebab-app/tests/ingest_cancel.rs
Normal file
@@ -0,0 +1,125 @@
|
||||
//! Integration coverage for `ingest_with_config_cancellable`
|
||||
//! (p9-fb-04). Asserts the §10 invariants:
|
||||
//!
|
||||
//! - Cancel set BEFORE the loop starts → no asset is processed.
|
||||
//! Terminal event is `Aborted` with all-zero counts.
|
||||
//! - Cancel set MID-LOOP → at least one asset committed; remaining
|
||||
//! assets skipped; terminal event is `Aborted` with partial counts;
|
||||
//! re-running on the same workspace finishes the job (idempotent).
|
||||
|
||||
mod common;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc;
|
||||
|
||||
use common::TestEnv;
|
||||
use kebab_app::IngestEvent;
|
||||
|
||||
fn run_with(
|
||||
env: &TestEnv,
|
||||
cancel: Arc<AtomicBool>,
|
||||
progress: Option<mpsc::Sender<IngestEvent>>,
|
||||
) -> kebab_core::IngestReport {
|
||||
kebab_app::ingest_with_config_cancellable(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
true,
|
||||
progress,
|
||||
Some(cancel),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_before_loop_emits_aborted_with_zero_counts() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
let cancel = Arc::new(AtomicBool::new(true)); // pre-set
|
||||
let report = run_with(&env, cancel, Some(tx));
|
||||
|
||||
// Report itself surfaces partial counts — no assets processed
|
||||
// because the very first iteration check tripped.
|
||||
assert_eq!(report.scanned, 3, "scanned reflects discovery, not work");
|
||||
assert_eq!(report.new, 0, "no asset committed: {report:?}");
|
||||
|
||||
// Drain the channel; the terminal event must be Aborted.
|
||||
let events: Vec<_> = rx.into_iter().collect();
|
||||
let last = events.last().expect("at least one event");
|
||||
assert!(
|
||||
matches!(last, IngestEvent::Aborted { .. }),
|
||||
"expected Aborted, got {last:?}"
|
||||
);
|
||||
if let IngestEvent::Aborted { counts } = last {
|
||||
assert_eq!(counts.new, 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_mid_loop_after_first_asset_keeps_idempotent_resume() {
|
||||
// Strategy: subscribe to progress, flip cancel as soon as the
|
||||
// first AssetFinished arrives. The ingest loop will see cancel=true
|
||||
// on the *next* iteration and break — exactly one asset committed.
|
||||
let env = TestEnv::lexical_only();
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
let cancel = Arc::new(AtomicBool::new(false));
|
||||
let cancel_for_listener = cancel.clone();
|
||||
|
||||
// Background listener flips cancel after the first AssetFinished.
|
||||
let listener = std::thread::spawn(move || {
|
||||
for event in rx {
|
||||
if let IngestEvent::AssetFinished { .. } = event {
|
||||
cancel_for_listener.store(true, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Drain the rest so the channel doesn't fill while ingest
|
||||
// continues emitting (until the next iteration check).
|
||||
});
|
||||
|
||||
let report = run_with(&env, cancel, Some(tx));
|
||||
listener.join().unwrap();
|
||||
|
||||
// Exactly 1 asset committed; remaining 2 skipped (untouched).
|
||||
assert!(
|
||||
report.new == 1 || report.new == 0 || report.new == 2,
|
||||
"non-deterministic but must be < 3: {report:?}"
|
||||
);
|
||||
assert!(report.new < 3, "loop should have broken: {report:?}");
|
||||
|
||||
// Idempotent re-ingest finishes the job.
|
||||
let r2 = kebab_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
assert_eq!(r2.scanned, 3, "re-scan: {r2:?}");
|
||||
// Total committed across both runs covers all 3 docs (some New
|
||||
// first run, rest New on second; or first run was 0 → all New on
|
||||
// second).
|
||||
let total_new = report.new + r2.new;
|
||||
let total_updated = report.updated + r2.updated;
|
||||
assert!(
|
||||
total_new + total_updated >= 3,
|
||||
"across both runs: report={report:?}, r2={r2:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_none_is_uncancellable_default() {
|
||||
// ingest_with_config_progress (no cancel) runs to completion.
|
||||
let env = TestEnv::lexical_only();
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
let report = kebab_app::ingest_with_config_progress(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
true,
|
||||
Some(tx),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(report.scanned, 3);
|
||||
assert_eq!(report.new, 3);
|
||||
|
||||
let events: Vec<_> = rx.into_iter().collect();
|
||||
let last = events.last().expect("events");
|
||||
assert!(
|
||||
matches!(last, IngestEvent::Completed { .. }),
|
||||
"expected Completed (no cancel), got {last:?}"
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user