Merge pull request 'feat: ingest cooperative cancellation (p9-fb-04)' (#57) from feat/p9-fb-04-cancel into main
This commit was merged in pull request #57.
This commit is contained in:
60
Cargo.lock
generated
60
Cargo.lock
generated
@@ -693,6 +693,15 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "block2"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5"
|
||||
dependencies = [
|
||||
"objc2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bon"
|
||||
version = "3.9.1"
|
||||
@@ -1189,6 +1198,17 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ctrlc"
|
||||
version = "3.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162"
|
||||
dependencies = [
|
||||
"dispatch2",
|
||||
"nix",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling"
|
||||
version = "0.20.11"
|
||||
@@ -2057,6 +2077,18 @@ dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dispatch2"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"block2",
|
||||
"libc",
|
||||
"objc2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "displaydoc"
|
||||
version = "0.2.5"
|
||||
@@ -3517,6 +3549,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"ctrlc",
|
||||
"indicatif",
|
||||
"kebab-app",
|
||||
"kebab-config",
|
||||
@@ -4956,6 +4989,18 @@ version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086"
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.31.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "no_std_io2"
|
||||
version = "0.9.3"
|
||||
@@ -5128,6 +5173,21 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
|
||||
|
||||
[[package]]
|
||||
name = "objc2"
|
||||
version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3a12a8ed07aefc768292f076dc3ac8c48f3781c8f2d5851dd3d98950e8c5a89f"
|
||||
dependencies = [
|
||||
"objc2-encode",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "objc2-encode"
|
||||
version = "4.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.37.3"
|
||||
|
||||
@@ -42,7 +42,8 @@ P0~P5 직렬. P6~P9 P5 이후 병렬 가능.
|
||||
- **P9-4 enter_inspect helper + Search `i` 키** — spec 의 진입 경로 (Library Enter → Doc inspect, Search `i` → Chunk inspect) 를 한 helper 로 묶음. `InspectTarget` enum (`Doc(DocumentId) | Chunk(ChunkId)`), `return_to: Pane` 가 Esc 시 원래 pane 으로 복귀. `c` 키가 모든 section (metadata / provenance / blocks / spans / text / embeddings) 일괄 collapse/expand — spec 의 \"focus 기반 selective collapse\" 는 v1 단순화.
|
||||
- **2026-05-02 P9 도그푸딩 후속 (p9-fb-06)** — `kebab reset --all|--data-only|--vector-only|--config-only [--yes]` 추가. TTY 가 아니면 `--yes` 필수 (silent destruction 금지). `--vector-only` 가 SQLite `embedding_records` 도 함께 truncate (off-disk Lance dir 만 wipe 시 orphan 방지). 도그푸딩 막힘 강도 1위 (수동 4 경로 `rm -rf` 부담) 해소. spec: `tasks/p9/p9-fb-06-data-reset-command.md`, plan: `docs/superpowers/plans/2026-05-02-p9-fb-06-reset-command.md`.
|
||||
- **2026-05-02 P9 도그푸딩 후속 (spec PR #51 + p9-fb-01 + p9-fb-02)** — `kebab ingest` 진행 표시 도입. frozen design §2.4a 신설 (wire schema `ingest_progress.v1` line-delimited streaming) + §10 의 long-running 작업 절 추가. `kebab-app::ingest_with_config_progress(.., progress: Option<Sender<IngestEvent>>)` facade 추가, 기존 `_with_config` 가 `progress=None` forwarding wrapper. CLI 가 indicatif TTY 진행 바 (stderr) / non-TTY 한 줄씩 / `--json` 모드는 line-delimited stdout. p9-fb-03 (TUI background worker) + p9-fb-04 (cancel) 가 같은 stream 위에 build.
|
||||
- **2026-05-02 P9 도그푸딩 후속 (p9-fb-03)** — TUI 의 background ingest worker. Library 의 `r` 키가 `kebab_app::ingest_with_config_progress` 를 spawned thread 에서 호출, run loop 가 매 frame 마다 progress channel drain → 화면 하단 status bar 1 줄 갱신. terminal event (`Completed`/`Aborted`) 후 3 초 final 라인 hold + 자동 hide + Library auto-refresh. `IngestState` 의 `cancel_tx: Sender<()>` slot 만 정의 (p9-fb-04 가 wire). spec: `tasks/p9/p9-fb-03-tui-ingest-background.md`.
|
||||
- **2026-05-02 P9 도그푸딩 후속 (p9-fb-03)** — TUI 의 background ingest worker. Library 의 `r` 키가 `kebab_app::ingest_with_config_progress` 를 spawned thread 에서 호출, run loop 가 매 frame 마다 progress channel drain → 화면 하단 status bar 1 줄 갱신. terminal event (`Completed`/`Aborted`) 후 3 초 final 라인 hold + 자동 hide + Library auto-refresh. spec: `tasks/p9/p9-fb-03-tui-ingest-background.md`. (cancel slot 은 p9-fb-04 가 추가하는 형태로 단일화 — 회차 1 review 결과.)
|
||||
- **2026-05-02 P9 도그푸딩 후속 (p9-fb-04)** — ingest cooperative cancellation. `kebab-app::ingest_with_config_cancellable(.., cancel: Option<Arc<AtomicBool>>)` facade 추가, 기존 `_progress` 가 `cancel=None` forwarding. asset loop iter 시작 boundary 마다 cancel poll → true 면 break + `IngestEvent::Aborted { partial_counts }` + `Ok(IngestReport)` 정상 반환 (Err 아님). 부분 commit 보존, 다음 ingest 가 idempotent 재개. CLI Ctrl-C SIGINT handler (`ctrlc` crate) — 1회: cancel, 2회: hard exit (130). TUI Esc / Ctrl-C 가 cancel signal (in-flight 시), 그 외에는 quit. `IngestState` 에 `cancel: Arc<AtomicBool>` field 추가. spec: `tasks/p9/p9-fb-04-ingest-cancellation.md`.
|
||||
|
||||
## 다음 task 후보
|
||||
|
||||
|
||||
@@ -70,13 +70,13 @@ kebab doctor
|
||||
| 명령 | 동작 |
|
||||
|------|------|
|
||||
| `kebab init` | XDG 경로에 데이터 디렉토리 + config.toml 생성 |
|
||||
| `kebab ingest [<path>]` | Markdown / 이미지 / PDF 색인 (idempotent). TTY 에서는 stderr 진행 바, non-TTY (CI / pipe) 는 stderr 한 줄씩, `--json` 은 stdout 에 `ingest_progress.v1` 라인 streaming 후 마지막에 `ingest_report.v1` |
|
||||
| `kebab ingest [<path>]` | Markdown / 이미지 / PDF 색인 (idempotent). TTY 에서는 stderr 진행 바, non-TTY (CI / pipe) 는 stderr 한 줄씩, `--json` 은 stdout 에 `ingest_progress.v1` 라인 streaming 후 마지막에 `ingest_report.v1`. Ctrl-C 한 번이면 현재 asset 마무리 후 abort (부분 commit 보존, idempotent re-run), 두 번째 Ctrl-C 는 hard exit |
|
||||
| `kebab search --mode {lexical,vector,hybrid} "<query>"` | 검색. hybrid는 RRF fusion, citation 포함 |
|
||||
| `kebab list docs` | 색인된 문서 목록 |
|
||||
| `kebab inspect doc <id>` / `kebab inspect chunk <id>` | raw record 보기 |
|
||||
| `kebab ask "<query>"` | RAG 답변 + 근거 인용. 근거 부족 시 거절. Ollama 필요 |
|
||||
| `kebab doctor` | 설정/모델/DB 헬스 체크 |
|
||||
| `kebab tui` | Ratatui 셸 (Library + Search + Ask + Inspect 패널, desktop 진행 중). Library 에서 `r` 키로 background ingest 시작 — 화면 하단 status bar 가 진행 표시, 완료/abort 시 final 라인 잠시 유지 후 자동 hide |
|
||||
| `kebab tui` | Ratatui 셸 (Library + Search + Ask + Inspect 패널, desktop 진행 중). Library 에서 `r` 키로 background ingest 시작 — 화면 하단 status bar 가 진행 표시, 완료/abort 시 final 라인 잠시 유지 후 자동 hide. ingest 진행 중 `Esc` / `Ctrl-C` 가 cancel signal (그 외에는 quit) |
|
||||
| `kebab reset [--all / --data-only / --vector-only / --config-only] [--yes]` | XDG 데이터 wipe. **Irreversible.** TTY 면 confirm prompt, 아니면 `--yes` 필수. `--vector-only` 는 SQLite `embedding_records` 도 함께 truncate (orphan 방지) |
|
||||
| `kebab eval run / compare` | golden query 회귀 측정 |
|
||||
|
||||
|
||||
@@ -191,8 +191,43 @@ pub fn ingest_with_config_progress(
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
ingest_with_config_cancellable(config, scope, summary_only, progress, None)
|
||||
}
|
||||
|
||||
/// Config + progress + cancel variant (p9-fb-04). The caller injects
|
||||
/// an `Arc<AtomicBool>` cancel token; setting it to `true` causes the
|
||||
/// ingest loop to break at the next step boundary (asset loop iter
|
||||
/// start), emit `IngestEvent::Aborted { counts: <partial> }`, and
|
||||
/// return `Ok(IngestReport)` with whatever assets were committed
|
||||
/// before cancellation. Per design §10:
|
||||
///
|
||||
/// - The current in-flight asset finishes (rollback would break
|
||||
/// idempotent re-run). Subsequent assets are skipped.
|
||||
/// - Cancellation is a normal exit, not an error — `Result::Err` is
|
||||
/// reserved for actual failures.
|
||||
/// - Partial commits in SQLite are kept; the next `kebab ingest` run
|
||||
/// picks up where this one left off (deterministic asset_id +
|
||||
/// doc_id recipes).
|
||||
///
|
||||
/// CLI's `Ctrl-C` SIGINT handler and TUI's `Esc` / `Ctrl-C` both
|
||||
/// flip the same `AtomicBool`. Pass `None` to retain pre-p9-fb-04
|
||||
/// behaviour (uncancellable).
|
||||
#[doc(hidden)]
|
||||
pub fn ingest_with_config_cancellable(
|
||||
config: kebab_config::Config,
|
||||
scope: SourceScope,
|
||||
summary_only: bool,
|
||||
progress: Option<std::sync::mpsc::Sender<crate::ingest_progress::IngestEvent>>,
|
||||
cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
|
||||
) -> anyhow::Result<IngestReport> {
|
||||
let progress = progress.as_ref();
|
||||
let cancelled = || {
|
||||
cancel
|
||||
.as_ref()
|
||||
.map(|c| c.load(std::sync::atomic::Ordering::Relaxed))
|
||||
.unwrap_or(false)
|
||||
};
|
||||
let started_instant = std::time::Instant::now();
|
||||
|
||||
let app = App::open_with_config(config)?;
|
||||
@@ -293,7 +328,20 @@ pub fn ingest_with_config_progress(
|
||||
|
||||
let embed_active = embedder.is_some() && vector_store.is_some();
|
||||
|
||||
// p9-fb-04: track whether the loop exited via cancellation (vs
|
||||
// running to completion) so we can emit `Aborted` rather than
|
||||
// `Completed` and surface the right summary.
|
||||
let mut was_cancelled = false;
|
||||
|
||||
for (zero_idx, asset) in assets.into_iter().enumerate() {
|
||||
// Step boundary check (p9-fb-04). Designed §10 invariant: the
|
||||
// current in-flight asset finishes (idempotent re-run guard);
|
||||
// subsequent assets are skipped. Check here is the cheapest
|
||||
// possible — atomic load each iteration, no lock.
|
||||
if cancelled() {
|
||||
was_cancelled = true;
|
||||
break;
|
||||
}
|
||||
let idx = u32::try_from(zero_idx + 1).unwrap_or(u32::MAX);
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
@@ -501,20 +549,25 @@ pub fn ingest_with_config_progress(
|
||||
"kb-app::ingest: run complete"
|
||||
);
|
||||
|
||||
crate::ingest_progress::emit(
|
||||
progress,
|
||||
let final_counts = crate::ingest_progress::AggregateCounts {
|
||||
scanned: scanned_count,
|
||||
new: new_count,
|
||||
updated: updated_count,
|
||||
skipped: skipped_count,
|
||||
errors: error_count,
|
||||
chunks_indexed,
|
||||
embeddings_indexed,
|
||||
};
|
||||
let terminal_event = if was_cancelled {
|
||||
crate::ingest_progress::IngestEvent::Aborted {
|
||||
counts: final_counts,
|
||||
}
|
||||
} else {
|
||||
crate::ingest_progress::IngestEvent::Completed {
|
||||
counts: crate::ingest_progress::AggregateCounts {
|
||||
scanned: scanned_count,
|
||||
new: new_count,
|
||||
updated: updated_count,
|
||||
skipped: skipped_count,
|
||||
errors: error_count,
|
||||
chunks_indexed,
|
||||
embeddings_indexed,
|
||||
},
|
||||
},
|
||||
);
|
||||
counts: final_counts,
|
||||
}
|
||||
};
|
||||
crate::ingest_progress::emit(progress, terminal_event);
|
||||
|
||||
Ok(IngestReport {
|
||||
scope,
|
||||
|
||||
126
crates/kebab-app/tests/ingest_cancel.rs
Normal file
126
crates/kebab-app/tests/ingest_cancel.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
//! Integration coverage for `ingest_with_config_cancellable`
|
||||
//! (p9-fb-04). Asserts the §10 invariants:
|
||||
//!
|
||||
//! - Cancel set BEFORE the loop starts → no asset is processed.
|
||||
//! Terminal event is `Aborted` with all-zero counts.
|
||||
//! - Cancel set MID-LOOP → at least one asset committed; remaining
|
||||
//! assets skipped; terminal event is `Aborted` with partial counts;
|
||||
//! re-running on the same workspace finishes the job (idempotent).
|
||||
|
||||
mod common;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc;
|
||||
|
||||
use common::TestEnv;
|
||||
use kebab_app::IngestEvent;
|
||||
|
||||
fn run_with(
|
||||
env: &TestEnv,
|
||||
cancel: Arc<AtomicBool>,
|
||||
progress: Option<mpsc::Sender<IngestEvent>>,
|
||||
) -> kebab_core::IngestReport {
|
||||
kebab_app::ingest_with_config_cancellable(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
true,
|
||||
progress,
|
||||
Some(cancel),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_before_loop_emits_aborted_with_zero_counts() {
|
||||
let env = TestEnv::lexical_only();
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
let cancel = Arc::new(AtomicBool::new(true)); // pre-set
|
||||
let report = run_with(&env, cancel, Some(tx));
|
||||
|
||||
// Report itself surfaces partial counts — no assets processed
|
||||
// because the very first iteration check tripped.
|
||||
assert_eq!(report.scanned, 3, "scanned reflects discovery, not work");
|
||||
assert_eq!(report.new, 0, "no asset committed: {report:?}");
|
||||
|
||||
// Drain the channel; the terminal event must be Aborted.
|
||||
let events: Vec<_> = rx.into_iter().collect();
|
||||
let last = events.last().expect("at least one event");
|
||||
assert!(
|
||||
matches!(last, IngestEvent::Aborted { .. }),
|
||||
"expected Aborted, got {last:?}"
|
||||
);
|
||||
if let IngestEvent::Aborted { counts } = last {
|
||||
assert_eq!(counts.new, 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_mid_loop_after_first_asset_keeps_idempotent_resume() {
|
||||
// Strategy: subscribe to progress, flip cancel as soon as the
|
||||
// first AssetFinished arrives. The ingest loop will see cancel=true
|
||||
// on the *next* iteration and break — exactly one asset committed.
|
||||
let env = TestEnv::lexical_only();
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
let cancel = Arc::new(AtomicBool::new(false));
|
||||
let cancel_for_listener = cancel.clone();
|
||||
|
||||
// Background listener flips cancel after the first AssetFinished.
|
||||
let listener = std::thread::spawn(move || {
|
||||
for event in rx {
|
||||
if let IngestEvent::AssetFinished { .. } = event {
|
||||
cancel_for_listener.store(true, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Drain the rest so the channel doesn't fill while ingest
|
||||
// continues emitting (until the next iteration check).
|
||||
});
|
||||
|
||||
let report = run_with(&env, cancel, Some(tx));
|
||||
listener.join().unwrap();
|
||||
|
||||
// cancel-mid is timing-dependent: the listener flips cancel
|
||||
// after the first AssetFinished, but the loop may have started
|
||||
// 1 more asset by the time the next iteration check runs.
|
||||
// 0 (race won by listener), 1 (first only), or 2 (one extra
|
||||
// slipped in) are all valid outcomes; report.new == 3 means
|
||||
// cancel never propagated and is the only failure mode.
|
||||
assert!(report.new < 3, "loop should have broken: {report:?}");
|
||||
|
||||
// Idempotent re-ingest finishes the job.
|
||||
let r2 = kebab_app::ingest_with_config(env.config.clone(), env.scope(), true).unwrap();
|
||||
assert_eq!(r2.scanned, 3, "re-scan: {r2:?}");
|
||||
// Total committed across both runs covers all 3 docs (some New
|
||||
// first run, rest New on second; or first run was 0 → all New on
|
||||
// second).
|
||||
let total_new = report.new + r2.new;
|
||||
let total_updated = report.updated + r2.updated;
|
||||
assert!(
|
||||
total_new + total_updated >= 3,
|
||||
"across both runs: report={report:?}, r2={r2:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_none_is_uncancellable_default() {
|
||||
// ingest_with_config_progress (no cancel) runs to completion.
|
||||
let env = TestEnv::lexical_only();
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
let report = kebab_app::ingest_with_config_progress(
|
||||
env.config.clone(),
|
||||
env.scope(),
|
||||
true,
|
||||
Some(tx),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(report.scanned, 3);
|
||||
assert_eq!(report.new, 3);
|
||||
|
||||
let events: Vec<_> = rx.into_iter().collect();
|
||||
let last = events.last().expect("events");
|
||||
assert!(
|
||||
matches!(last, IngestEvent::Completed { .. }),
|
||||
"expected Completed (no cancel), got {last:?}"
|
||||
);
|
||||
}
|
||||
@@ -36,6 +36,10 @@ clap = { version = "4", features = ["derive"] }
|
||||
# - timestamp formatting (RFC 3339) 은 time crate.
|
||||
indicatif = "0.17"
|
||||
time = { workspace = true }
|
||||
# p9-fb-04: SIGINT handler for `kebab ingest` cooperative cancel.
|
||||
# `ctrlc` registers a single cross-platform handler; we count signal
|
||||
# arrivals to implement spec §10's "second Ctrl-C is a hard exit".
|
||||
ctrlc = "3"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = { workspace = true }
|
||||
|
||||
83
crates/kebab-cli/src/cancel.rs
Normal file
83
crates/kebab-cli/src/cancel.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
//! `kebab ingest` SIGINT (Ctrl-C) handler — flips a shared
|
||||
//! `Arc<AtomicBool>` so `kebab_app::ingest_with_config_cancellable`
|
||||
//! can break at the next step boundary.
|
||||
//!
|
||||
//! Per spec §10: the second Ctrl-C is a hard exit (130 = SIGINT
|
||||
//! conventional). We count signal arrivals via a private atomic and
|
||||
//! call `std::process::exit` on the second arrival — past the point
|
||||
//! where the user has signalled both "let me out gracefully" and
|
||||
//! "no, really, get me out now". This sidesteps the indicatif
|
||||
//! cleanup path; the terminal may be left in a slightly odd state
|
||||
//! after a hard exit, which is the acceptable tradeoff for "really
|
||||
//! exit now".
|
||||
//!
|
||||
//! `ctrlc` is the only cross-platform SIGINT helper that doesn't
|
||||
//! drag in a tokio runtime; it registers a single OS-level handler
|
||||
//! per process. Because the handler is process-global, calling
|
||||
//! `install` more than once per `kebab` invocation is forbidden
|
||||
//! (would clobber the previous handler) — `Cmd::Ingest` is the only
|
||||
//! caller today, but a future `kebab eval run` etc. would need to
|
||||
//! either share the same atomic or deliberately re-install before
|
||||
//! its run begins.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
|
||||
|
||||
/// Install a SIGINT handler that:
|
||||
/// - on first signal: sets `cancel.store(true)` so the cooperative
|
||||
/// cancel loop in `kebab_app::ingest_with_config_cancellable`
|
||||
/// breaks at its next step boundary.
|
||||
/// - on second signal: hard-exits with code 130 (SIGINT
|
||||
/// convention).
|
||||
///
|
||||
/// Returns the same `Arc<AtomicBool>` for the caller to thread
|
||||
/// through to the facade. Errors only on duplicate install; first
|
||||
/// caller wins.
|
||||
pub fn install_sigint_cancel() -> anyhow::Result<Arc<AtomicBool>> {
|
||||
let cancel = Arc::new(AtomicBool::new(false));
|
||||
let cancel_for_handler = cancel.clone();
|
||||
// Per-process count of received SIGINTs. Static so the closure
|
||||
// owns no extra state; first signal flips cancel, second exits.
|
||||
//
|
||||
// Process-lifetime: never reset. ctrlc::set_handler rejects
|
||||
// multi-install with `Err(MultipleHandlers)`, so this counter
|
||||
// is effectively single-use per `kebab` invocation. A future
|
||||
// command that needs its own cancel token (e.g. `kebab eval
|
||||
// run --with-cancel`) must factor the install path into a
|
||||
// helper that takes the token as an arg and shares it across
|
||||
// callers — not call `install_sigint_cancel` twice.
|
||||
static SIGNAL_COUNT: AtomicU8 = AtomicU8::new(0);
|
||||
ctrlc::set_handler(move || {
|
||||
let prev = SIGNAL_COUNT.fetch_add(1, Ordering::Relaxed);
|
||||
if prev == 0 {
|
||||
cancel_for_handler.store(true, Ordering::Relaxed);
|
||||
// Helpful hint on stderr — the run loop will surface
|
||||
// its own "aborting…" line once the cancel propagates.
|
||||
let _ = std::io::Write::write_all(
|
||||
&mut std::io::stderr().lock(),
|
||||
b"\nreceived Ctrl-C; aborting after current asset (press again to force quit)\n",
|
||||
);
|
||||
} else {
|
||||
// Second signal → bail. 130 is the canonical SIGINT
|
||||
// exit code (128 + signal number).
|
||||
std::process::exit(130);
|
||||
}
|
||||
})?;
|
||||
Ok(cancel)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
// The handler is process-global and can only be installed once
|
||||
// per binary invocation (ctrlc constraint), so unit-testing the
|
||||
// happy path here is brittle — see `tests/ingest_cancel_cli.rs`
|
||||
// for the integration coverage that runs the bin in a fresh
|
||||
// subprocess.
|
||||
|
||||
#[test]
|
||||
fn cancel_module_compiles() {
|
||||
// Trivial sanity — confirm the module compiles in dev profile
|
||||
// (the install function is exercised by the CLI integration
|
||||
// test, not directly here).
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ use clap::{Parser, Subcommand};
|
||||
|
||||
use kebab_app::doctor_signal::{DoctorUnhealthy, NoHitSignal, RefusalSignal};
|
||||
|
||||
mod cancel;
|
||||
mod progress;
|
||||
mod wire;
|
||||
|
||||
@@ -296,11 +297,17 @@ fn run(cli: &Cli) -> anyhow::Result<()> {
|
||||
progress::ProgressDisplay::new(mode).run(rx)
|
||||
});
|
||||
|
||||
let ingest_result = kebab_app::ingest_with_config_progress(
|
||||
// p9-fb-04: register a Ctrl-C handler that flips the same
|
||||
// AtomicBool the facade polls at each step boundary. The
|
||||
// *second* Ctrl-C is a hard exit (handled inside `cancel`).
|
||||
let cancel_token = cancel::install_sigint_cancel()?;
|
||||
|
||||
let ingest_result = kebab_app::ingest_with_config_cancellable(
|
||||
cfg,
|
||||
scope,
|
||||
*summary_only,
|
||||
Some(tx),
|
||||
Some(cancel_token),
|
||||
);
|
||||
|
||||
// Join the display thread *before* surfacing the ingest
|
||||
|
||||
@@ -166,25 +166,23 @@ impl Default for InspectState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Background-ingest state — owned by p9-fb-03.
|
||||
/// Background-ingest state — owned by p9-fb-03 + extended by
|
||||
/// p9-fb-04 (cancel).
|
||||
///
|
||||
/// The TUI lets the user fire `kebab ingest` from inside the shell
|
||||
/// without blocking the event loop. Pressing `r` on the Library pane
|
||||
/// spawns a worker thread that calls
|
||||
/// `kebab_app::ingest_with_config_progress(.., Some(tx))`; the run
|
||||
/// loop drains `rx` once per frame and updates the visible status
|
||||
/// bar. When the worker thread joins (Sender dropped → `recv()` Err),
|
||||
/// the final aggregate counts stay on screen for a few seconds and
|
||||
/// then the slot clears.
|
||||
/// `kebab_app::ingest_with_config_cancellable(.., Some(tx), Some(cancel))`;
|
||||
/// the run loop drains `rx` once per frame and updates the visible
|
||||
/// status bar. When the worker thread joins (Sender dropped →
|
||||
/// `recv()` Err), the final aggregate counts stay on screen for a
|
||||
/// few seconds and then the slot clears.
|
||||
///
|
||||
/// `p9-fb-04` adds the cancel surface — at that point this struct
|
||||
/// gains a real `(cancel_tx, cancel_rx)` pair (the receiver moved
|
||||
/// into the worker thread alongside the progress sender). We do
|
||||
/// NOT pre-define a `cancel_tx` slot here because doing so without
|
||||
/// a matching receiver-bound worker would yield a dead channel
|
||||
/// (`send` returning `Err(SendError)` forever) — empty slot that
|
||||
/// pretends to be a future-compat shim is worse than no slot
|
||||
/// (CLAUDE.md "backward-compat shim 금지").
|
||||
/// `cancel` is the same `Arc<AtomicBool>` the worker polls at each
|
||||
/// step boundary. The `Esc` / `Ctrl-C` key (only while ingest is
|
||||
/// in flight) flips it via `cancel.store(true, Ordering::Relaxed)`
|
||||
/// — the worker breaks at its next iteration check, emits
|
||||
/// `IngestEvent::Aborted { counts: <partial> }`, and joins.
|
||||
pub struct IngestState {
|
||||
pub rx: std::sync::mpsc::Receiver<kebab_app::IngestEvent>,
|
||||
pub counts: kebab_app::AggregateCounts,
|
||||
@@ -201,6 +199,9 @@ pub struct IngestState {
|
||||
/// Worker thread handle. `take()`n at clear time so the join
|
||||
/// happens after the user has had time to read the final line.
|
||||
pub thread: Option<std::thread::JoinHandle<anyhow::Result<kebab_core::IngestReport>>>,
|
||||
/// p9-fb-04: shared cancel token. `Esc` / `Ctrl-C` flip it; the
|
||||
/// worker thread polls it at each asset-loop boundary.
|
||||
pub cancel: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
}
|
||||
|
||||
/// Seconds the final ingest status line stays on screen after a run
|
||||
|
||||
@@ -10,12 +10,13 @@
|
||||
//! seconds (`TERMINAL_LINE_HOLD_SECS`) and then `tick_clear` returns
|
||||
//! true so the run loop can drop the slot.
|
||||
//!
|
||||
//! Cancel surface (Esc / Ctrl-C) lands in `p9-fb-04`; that task
|
||||
//! adds a `(cancel_tx, cancel_rx)` pair to `IngestState` and
|
||||
//! threads the receiver through `kebab_app::ingest_with_config_cancellable`.
|
||||
//! This task does NOT pre-allocate the channel — see the comment on
|
||||
//! `IngestState` for the rationale.
|
||||
//! Cancel (p9-fb-04) is wired by sharing an `Arc<AtomicBool>`
|
||||
//! between the worker thread (polled at each asset-loop boundary
|
||||
//! inside `kebab_app::ingest_with_config_cancellable`) and the TUI
|
||||
//! key handler (`Esc` / `Ctrl-C` flips it via `cancel_running_ingest`).
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
|
||||
@@ -39,9 +40,17 @@ pub fn start_ingest(app: &mut App) -> anyhow::Result<()> {
|
||||
exclude: cfg.workspace.exclude.clone(),
|
||||
};
|
||||
let (tx, rx) = mpsc::channel::<IngestEvent>();
|
||||
let cancel = Arc::new(AtomicBool::new(false));
|
||||
let cancel_for_worker = cancel.clone();
|
||||
let cfg_for_thread = cfg;
|
||||
let thread = thread::spawn(move || {
|
||||
kebab_app::ingest_with_config_progress(cfg_for_thread, scope, true, Some(tx))
|
||||
kebab_app::ingest_with_config_cancellable(
|
||||
cfg_for_thread,
|
||||
scope,
|
||||
true,
|
||||
Some(tx),
|
||||
Some(cancel_for_worker),
|
||||
)
|
||||
});
|
||||
app.ingest_state = Some(IngestState {
|
||||
rx,
|
||||
@@ -52,10 +61,26 @@ pub fn start_ingest(app: &mut App) -> anyhow::Result<()> {
|
||||
terminal_at: None,
|
||||
aborted: false,
|
||||
thread: Some(thread),
|
||||
cancel,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flip the cancel token of an in-flight ingest. Returns `true` if a
|
||||
/// run was actually in flight (and thus the signal will reach the
|
||||
/// worker), `false` if there was nothing to cancel — the caller
|
||||
/// (key handler) can decide whether to swallow the keypress or let
|
||||
/// the original pane action run.
|
||||
pub fn cancel_running_ingest(app: &App) -> bool {
|
||||
match app.ingest_state.as_ref() {
|
||||
Some(state) if state.terminal_at.is_none() => {
|
||||
state.cancel.store(true, Ordering::Relaxed);
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain whatever progress events have arrived since the last tick.
|
||||
/// Non-blocking. Caller (the run loop) calls this once per frame.
|
||||
///
|
||||
@@ -201,6 +226,7 @@ mod tests {
|
||||
terminal_at: None,
|
||||
aborted: false,
|
||||
thread: None,
|
||||
cancel: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -355,4 +381,33 @@ mod tests {
|
||||
let s = fresh_state();
|
||||
assert!(!ready_to_clear(&s));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_running_ingest_returns_false_when_no_state() {
|
||||
let cfg = kebab_config::Config::defaults();
|
||||
let app = App::new(cfg).unwrap();
|
||||
assert!(!cancel_running_ingest(&app));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_running_ingest_flips_token_when_in_flight() {
|
||||
let cfg = kebab_config::Config::defaults();
|
||||
let mut app = App::new(cfg).unwrap();
|
||||
app.ingest_state = Some(fresh_state());
|
||||
let token = app.ingest_state.as_ref().unwrap().cancel.clone();
|
||||
assert!(!token.load(Ordering::Relaxed));
|
||||
assert!(cancel_running_ingest(&app));
|
||||
assert!(token.load(Ordering::Relaxed));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel_running_ingest_returns_false_when_terminal_already_seen() {
|
||||
let cfg = kebab_config::Config::defaults();
|
||||
let mut app = App::new(cfg).unwrap();
|
||||
let mut s = fresh_state();
|
||||
s.terminal_at = Some(std::time::Instant::now());
|
||||
app.ingest_state = Some(s);
|
||||
// No worker to cancel — already terminated.
|
||||
assert!(!cancel_running_ingest(&app));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,9 @@ pub use app::{
|
||||
};
|
||||
pub use ask::{handle_key_ask, render_ask};
|
||||
pub use error_popup::{ErrorOverlay, render_error_overlay};
|
||||
pub use ingest_progress::{drain_progress, ready_to_clear, start_ingest, status_line};
|
||||
pub use ingest_progress::{
|
||||
cancel_running_ingest, drain_progress, ready_to_clear, start_ingest, status_line,
|
||||
};
|
||||
pub use inspect::{enter_inspect, handle_key_inspect, render_inspect};
|
||||
pub use library::{handle_key_library, render_library};
|
||||
pub use search::{build_jump_command, handle_key_search, jump_to_citation, render_search};
|
||||
|
||||
@@ -239,6 +239,18 @@ pub fn handle_key_library(state: &mut App, key: KeyEvent) -> KeyOutcome {
|
||||
return handle_filter_edit_key(state, key);
|
||||
}
|
||||
|
||||
// p9-fb-04: Esc / Ctrl-C while ingest is in flight flips the
|
||||
// worker's cancel token (instead of triggering the quit path).
|
||||
// Done BEFORE the `inner` borrow so we can re-borrow `state`.
|
||||
let is_cancel_chord = match (key.code, key.modifiers) {
|
||||
(KeyCode::Esc, _) => true,
|
||||
(KeyCode::Char('c'), m) => m.contains(KeyModifiers::CONTROL),
|
||||
_ => false,
|
||||
};
|
||||
if is_cancel_chord && crate::ingest_progress::cancel_running_ingest(state) {
|
||||
return KeyOutcome::Continue;
|
||||
}
|
||||
|
||||
let inner = &mut state.library.inner;
|
||||
let pending_g = std::mem::take(&mut inner.pending_g);
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ phase: P9
|
||||
component: kebab-app + kebab-cli + kebab-tui
|
||||
task_id: p9-fb-04
|
||||
title: "Cooperative ingest cancellation (Ctrl-C / Esc)"
|
||||
status: planned
|
||||
status: in_progress
|
||||
depends_on: [p9-fb-01]
|
||||
unblocks: []
|
||||
contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md
|
||||
|
||||
Reference in New Issue
Block a user