From 29629e6786d7009f47538be0a45f914a6d6863c7 Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 15:03:41 +0900 Subject: [PATCH] feat(cli): kebab ask --stream emits ndjson on stderr (fb-33) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background-thread driver runs ask_with_config; main thread drains the receiver, serializes each StreamEvent to ndjson on stderr. BrokenPipe → drop receiver → pipeline SendError → cancel + LlmStreamAborted refusal. Final stdout line is the existing answer.v1 (ingest_progress.v1 backwards-compat pattern). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-cli/src/main.rs | 179 ++++++++++++++++++++++++++--------- crates/kebab-cli/src/wire.rs | 19 ++++ 2 files changed, 153 insertions(+), 45 deletions(-) diff --git a/crates/kebab-cli/src/main.rs b/crates/kebab-cli/src/main.rs index 4614bee..f3df832 100644 --- a/crates/kebab-cli/src/main.rs +++ b/crates/kebab-cli/src/main.rs @@ -153,6 +153,12 @@ enum Cmd { /// (e.g. `kebab-rust-async-2026-05`). #[arg(long, value_name = "ID")] session: Option, + + /// p9-fb-33: emit ndjson `answer_event.v1` events on stderr + /// while streaming. Final stdout line is the existing + /// `answer.v1`. Off by default to preserve final-only behavior. + #[arg(long)] + stream: bool, }, /// Wipe XDG data dirs (and optionally the Lance vector store) so the @@ -578,55 +584,138 @@ fn run(cli: &Cli) -> anyhow::Result<()> { show_citations, hide_citations, session, + stream, } => { let cfg = kebab_config::Config::load(cli.config.as_deref())?; - let opts = kebab_app::AskOpts { - k: *k, - explain: *explain, - mode: (*mode).into(), - temperature: *temperature, - seed: *seed, - // CLI ask is non-streaming today (the answer prints all at - // once on completion). The TUI ask pane (P9-3) is what - // wires up a real `mpsc::Sender` here. - stream_sink: None, - // p9-fb-18: when `--session` is set, the facade - // (`ask_with_session_with_config`) loads prior turns - // from SQLite and stuffs them into AskOpts.history - // before calling `ask_with_history`. Single-shot path - // (no `--session`) keeps the empty defaults. - history: Vec::new(), - conversation_id: None, - turn_index: None, - }; - let ans = match session.as_deref() { - Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?, - None => kebab_app::ask_with_config(cfg, query, opts)?, - }; - if cli.json { - println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?); - } else { - println!("{}", ans.answer); - // p9-fb-20: print the citation block after the - // answer body when --hide-citations is not set - // (--show-citations is the default). Skipped on - // refusal-with-zero-citations to avoid an empty - // `근거:` header. - let print_citations = *show_citations && !*hide_citations; - if print_citations && !ans.citations.is_empty() { - // p9-fb-32: yellow `[stale]` prefix on TTY (mirrors - // the search renderer's pattern in `Cmd::Search`). - use std::io::IsTerminal; - let color = std::io::stdout().is_terminal(); - let mut out = std::io::stdout().lock(); - render_ask_plain_citations(&mut out, &ans, color)?; + if *stream { + // p9-fb-33: streaming branch. Background thread runs + // ask_with_config (which calls into the rag pipeline); + // main thread drains the receiver and writes + // `answer_event.v1` ndjson to stderr. On BrokenPipe + // (downstream consumer closed), drop the receiver so + // the worker's next `send` returns SendError → + // pipeline cancels with LlmStreamAborted. Final stdout + // line is the existing `answer.v1` (mirrors + // ingest_progress.v1 + ingest_report.v1 split). + use std::io::Write; + use std::sync::mpsc; + + let (tx, rx) = mpsc::channel::(); + let opts = kebab_app::AskOpts { + k: *k, + explain: *explain, + mode: (*mode).into(), + temperature: *temperature, + seed: *seed, + stream_sink: Some(tx), + history: Vec::new(), + conversation_id: None, + turn_index: None, + }; + let cfg2 = cfg.clone(); + let q = query.clone(); + let session2 = session.clone(); + let handle = std::thread::spawn( + move || -> anyhow::Result { + match session2.as_deref() { + Some(sid) => kebab_app::ask_with_session_with_config( + cfg2, sid, &q, opts, + ), + None => kebab_app::ask_with_config(cfg2, &q, opts), + } + }, + ); + + // Drain receiver, write ndjson to stderr until + // completion or BrokenPipe. + let mut cancelled_pipe = false; + { + let mut stderr = std::io::stderr().lock(); + for ev in &rx { + let now = time::OffsetDateTime::now_utc(); + let v = wire::wire_answer_event(&ev, now); + let line = serde_json::to_string(&v)?; + if let Err(e) = writeln!(stderr, "{line}") { + if e.kind() == std::io::ErrorKind::BrokenPipe { + cancelled_pipe = true; + break; + } + return Err(e.into()); + } + } } + if cancelled_pipe { + // Dropping the receiver signals to the worker — + // the next `send` returns SendError, which the + // pipeline interprets as a cancel. + drop(rx); + } + + let result = handle + .join() + .map_err(|_| anyhow::anyhow!("ask worker panicked"))?; + let ans = result?; + + // Final stdout line — answer.v1 for backwards + // compat. BrokenPipe on stdout is silent (caller + // already gone). + let final_json = serde_json::to_string(&wire::wire_answer(&ans))?; + let _ = writeln!(std::io::stdout().lock(), "{final_json}"); + + if !ans.grounded { + return Err(RefusalSignal.into()); + } + Ok(()) + } else { + let opts = kebab_app::AskOpts { + k: *k, + explain: *explain, + mode: (*mode).into(), + temperature: *temperature, + seed: *seed, + // CLI ask is non-streaming by default (the answer + // prints all at once on completion). `--stream` + // takes the branch above; the TUI ask pane (P9-3) + // wires up its own `mpsc::Sender`. + stream_sink: None, + // p9-fb-18: when `--session` is set, the facade + // (`ask_with_session_with_config`) loads prior turns + // from SQLite and stuffs them into AskOpts.history + // before calling `ask_with_history`. Single-shot path + // (no `--session`) keeps the empty defaults. + history: Vec::new(), + conversation_id: None, + turn_index: None, + }; + let ans = match session.as_deref() { + Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?, + None => kebab_app::ask_with_config(cfg, query, opts)?, + }; + if cli.json { + println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?); + } else { + println!("{}", ans.answer); + // p9-fb-20: print the citation block after the + // answer body when --hide-citations is not set + // (--show-citations is the default). Skipped on + // refusal-with-zero-citations to avoid an empty + // `근거:` header. + let print_citations = *show_citations && !*hide_citations; + if print_citations && !ans.citations.is_empty() { + // p9-fb-32: yellow `[stale]` prefix on TTY (mirrors + // the search renderer's pattern in `Cmd::Search`). + use std::io::IsTerminal; + let color = std::io::stdout().is_terminal(); + let mut out = std::io::stdout().lock(); + render_ask_plain_citations(&mut out, &ans, color)?; + } + } + // Refusal → exit 1. + if !ans.grounded { + return Err(RefusalSignal.into()); + } + Ok(()) } - // Refusal → exit 1. - if !ans.grounded { - return Err(RefusalSignal.into()); - } - Ok(()) } Cmd::Reset { diff --git a/crates/kebab-cli/src/wire.rs b/crates/kebab-cli/src/wire.rs index 8fd58e7..e1e35d3 100644 --- a/crates/kebab-cli/src/wire.rs +++ b/crates/kebab-cli/src/wire.rs @@ -87,6 +87,25 @@ pub fn wire_answer(a: &Answer) -> Value { tag_object(v, "answer.v1") } +/// p9-fb-33: tag a [`StreamEvent`] as `answer_event.v1` ndjson. +/// +/// The timestamp is added at emit time (caller fills `ts`), since the +/// pipeline doesn't carry one in the in-process enum — mirrors the +/// `wire_ingest_progress` pattern (§2 ingest_progress.v1). +pub fn wire_answer_event( + ev: &kebab_app::StreamEvent, + ts: time::OffsetDateTime, +) -> Value { + let mut v = serde_json::to_value(ev).expect("StreamEvent serializes"); + let ts_str = ts + .format(&time::format_description::well_known::Rfc3339) + .expect("OffsetDateTime formats as RFC3339"); + if let Value::Object(ref mut map) = v { + map.insert("ts".to_string(), Value::String(ts_str)); + } + tag_object(v, "answer_event.v1") +} + /// Idempotent pass-through for [`DoctorReport`] — the type already carries /// `schema_version: "doctor.v1"` (struct-field convention, the one /// exception called out in the module doc above). This helper exists so