feat(cli): kebab ask --stream emits ndjson on stderr (fb-33)
Background-thread driver runs ask_with_config; main thread drains the receiver, serializes each StreamEvent to ndjson on stderr. BrokenPipe → drop receiver → pipeline SendError → cancel + LlmStreamAborted refusal. Final stdout line is the existing answer.v1 (ingest_progress.v1 backwards-compat pattern). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -153,6 +153,12 @@ enum Cmd {
|
||||
/// (e.g. `kebab-rust-async-2026-05`).
|
||||
#[arg(long, value_name = "ID")]
|
||||
session: Option<String>,
|
||||
|
||||
/// p9-fb-33: emit ndjson `answer_event.v1` events on stderr
|
||||
/// while streaming. Final stdout line is the existing
|
||||
/// `answer.v1`. Off by default to preserve final-only behavior.
|
||||
#[arg(long)]
|
||||
stream: bool,
|
||||
},
|
||||
|
||||
/// Wipe XDG data dirs (and optionally the Lance vector store) so the
|
||||
@@ -578,55 +584,138 @@ fn run(cli: &Cli) -> anyhow::Result<()> {
|
||||
show_citations,
|
||||
hide_citations,
|
||||
session,
|
||||
stream,
|
||||
} => {
|
||||
let cfg = kebab_config::Config::load(cli.config.as_deref())?;
|
||||
let opts = kebab_app::AskOpts {
|
||||
k: *k,
|
||||
explain: *explain,
|
||||
mode: (*mode).into(),
|
||||
temperature: *temperature,
|
||||
seed: *seed,
|
||||
// CLI ask is non-streaming today (the answer prints all at
|
||||
// once on completion). The TUI ask pane (P9-3) is what
|
||||
// wires up a real `mpsc::Sender` here.
|
||||
stream_sink: None,
|
||||
// p9-fb-18: when `--session` is set, the facade
|
||||
// (`ask_with_session_with_config`) loads prior turns
|
||||
// from SQLite and stuffs them into AskOpts.history
|
||||
// before calling `ask_with_history`. Single-shot path
|
||||
// (no `--session`) keeps the empty defaults.
|
||||
history: Vec::new(),
|
||||
conversation_id: None,
|
||||
turn_index: None,
|
||||
};
|
||||
let ans = match session.as_deref() {
|
||||
Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?,
|
||||
None => kebab_app::ask_with_config(cfg, query, opts)?,
|
||||
};
|
||||
if cli.json {
|
||||
println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?);
|
||||
} else {
|
||||
println!("{}", ans.answer);
|
||||
// p9-fb-20: print the citation block after the
|
||||
// answer body when --hide-citations is not set
|
||||
// (--show-citations is the default). Skipped on
|
||||
// refusal-with-zero-citations to avoid an empty
|
||||
// `근거:` header.
|
||||
let print_citations = *show_citations && !*hide_citations;
|
||||
if print_citations && !ans.citations.is_empty() {
|
||||
// p9-fb-32: yellow `[stale]` prefix on TTY (mirrors
|
||||
// the search renderer's pattern in `Cmd::Search`).
|
||||
use std::io::IsTerminal;
|
||||
let color = std::io::stdout().is_terminal();
|
||||
let mut out = std::io::stdout().lock();
|
||||
render_ask_plain_citations(&mut out, &ans, color)?;
|
||||
if *stream {
|
||||
// p9-fb-33: streaming branch. Background thread runs
|
||||
// ask_with_config (which calls into the rag pipeline);
|
||||
// main thread drains the receiver and writes
|
||||
// `answer_event.v1` ndjson to stderr. On BrokenPipe
|
||||
// (downstream consumer closed), drop the receiver so
|
||||
// the worker's next `send` returns SendError →
|
||||
// pipeline cancels with LlmStreamAborted. Final stdout
|
||||
// line is the existing `answer.v1` (mirrors
|
||||
// ingest_progress.v1 + ingest_report.v1 split).
|
||||
use std::io::Write;
|
||||
use std::sync::mpsc;
|
||||
|
||||
let (tx, rx) = mpsc::channel::<kebab_app::StreamEvent>();
|
||||
let opts = kebab_app::AskOpts {
|
||||
k: *k,
|
||||
explain: *explain,
|
||||
mode: (*mode).into(),
|
||||
temperature: *temperature,
|
||||
seed: *seed,
|
||||
stream_sink: Some(tx),
|
||||
history: Vec::new(),
|
||||
conversation_id: None,
|
||||
turn_index: None,
|
||||
};
|
||||
let cfg2 = cfg.clone();
|
||||
let q = query.clone();
|
||||
let session2 = session.clone();
|
||||
let handle = std::thread::spawn(
|
||||
move || -> anyhow::Result<kebab_core::Answer> {
|
||||
match session2.as_deref() {
|
||||
Some(sid) => kebab_app::ask_with_session_with_config(
|
||||
cfg2, sid, &q, opts,
|
||||
),
|
||||
None => kebab_app::ask_with_config(cfg2, &q, opts),
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Drain receiver, write ndjson to stderr until
|
||||
// completion or BrokenPipe.
|
||||
let mut cancelled_pipe = false;
|
||||
{
|
||||
let mut stderr = std::io::stderr().lock();
|
||||
for ev in &rx {
|
||||
let now = time::OffsetDateTime::now_utc();
|
||||
let v = wire::wire_answer_event(&ev, now);
|
||||
let line = serde_json::to_string(&v)?;
|
||||
if let Err(e) = writeln!(stderr, "{line}") {
|
||||
if e.kind() == std::io::ErrorKind::BrokenPipe {
|
||||
cancelled_pipe = true;
|
||||
break;
|
||||
}
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
if cancelled_pipe {
|
||||
// Dropping the receiver signals to the worker —
|
||||
// the next `send` returns SendError, which the
|
||||
// pipeline interprets as a cancel.
|
||||
drop(rx);
|
||||
}
|
||||
|
||||
let result = handle
|
||||
.join()
|
||||
.map_err(|_| anyhow::anyhow!("ask worker panicked"))?;
|
||||
let ans = result?;
|
||||
|
||||
// Final stdout line — answer.v1 for backwards
|
||||
// compat. BrokenPipe on stdout is silent (caller
|
||||
// already gone).
|
||||
let final_json = serde_json::to_string(&wire::wire_answer(&ans))?;
|
||||
let _ = writeln!(std::io::stdout().lock(), "{final_json}");
|
||||
|
||||
if !ans.grounded {
|
||||
return Err(RefusalSignal.into());
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
let opts = kebab_app::AskOpts {
|
||||
k: *k,
|
||||
explain: *explain,
|
||||
mode: (*mode).into(),
|
||||
temperature: *temperature,
|
||||
seed: *seed,
|
||||
// CLI ask is non-streaming by default (the answer
|
||||
// prints all at once on completion). `--stream`
|
||||
// takes the branch above; the TUI ask pane (P9-3)
|
||||
// wires up its own `mpsc::Sender`.
|
||||
stream_sink: None,
|
||||
// p9-fb-18: when `--session` is set, the facade
|
||||
// (`ask_with_session_with_config`) loads prior turns
|
||||
// from SQLite and stuffs them into AskOpts.history
|
||||
// before calling `ask_with_history`. Single-shot path
|
||||
// (no `--session`) keeps the empty defaults.
|
||||
history: Vec::new(),
|
||||
conversation_id: None,
|
||||
turn_index: None,
|
||||
};
|
||||
let ans = match session.as_deref() {
|
||||
Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?,
|
||||
None => kebab_app::ask_with_config(cfg, query, opts)?,
|
||||
};
|
||||
if cli.json {
|
||||
println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?);
|
||||
} else {
|
||||
println!("{}", ans.answer);
|
||||
// p9-fb-20: print the citation block after the
|
||||
// answer body when --hide-citations is not set
|
||||
// (--show-citations is the default). Skipped on
|
||||
// refusal-with-zero-citations to avoid an empty
|
||||
// `근거:` header.
|
||||
let print_citations = *show_citations && !*hide_citations;
|
||||
if print_citations && !ans.citations.is_empty() {
|
||||
// p9-fb-32: yellow `[stale]` prefix on TTY (mirrors
|
||||
// the search renderer's pattern in `Cmd::Search`).
|
||||
use std::io::IsTerminal;
|
||||
let color = std::io::stdout().is_terminal();
|
||||
let mut out = std::io::stdout().lock();
|
||||
render_ask_plain_citations(&mut out, &ans, color)?;
|
||||
}
|
||||
}
|
||||
// Refusal → exit 1.
|
||||
if !ans.grounded {
|
||||
return Err(RefusalSignal.into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// Refusal → exit 1.
|
||||
if !ans.grounded {
|
||||
return Err(RefusalSignal.into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Cmd::Reset {
|
||||
|
||||
@@ -87,6 +87,25 @@ pub fn wire_answer(a: &Answer) -> Value {
|
||||
tag_object(v, "answer.v1")
|
||||
}
|
||||
|
||||
/// p9-fb-33: tag a [`StreamEvent`] as `answer_event.v1` ndjson.
|
||||
///
|
||||
/// The timestamp is added at emit time (caller fills `ts`), since the
|
||||
/// pipeline doesn't carry one in the in-process enum — mirrors the
|
||||
/// `wire_ingest_progress` pattern (§2 ingest_progress.v1).
|
||||
pub fn wire_answer_event(
|
||||
ev: &kebab_app::StreamEvent,
|
||||
ts: time::OffsetDateTime,
|
||||
) -> Value {
|
||||
let mut v = serde_json::to_value(ev).expect("StreamEvent serializes");
|
||||
let ts_str = ts
|
||||
.format(&time::format_description::well_known::Rfc3339)
|
||||
.expect("OffsetDateTime formats as RFC3339");
|
||||
if let Value::Object(ref mut map) = v {
|
||||
map.insert("ts".to_string(), Value::String(ts_str));
|
||||
}
|
||||
tag_object(v, "answer_event.v1")
|
||||
}
|
||||
|
||||
/// Idempotent pass-through for [`DoctorReport`] — the type already carries
|
||||
/// `schema_version: "doctor.v1"` (struct-field convention, the one
|
||||
/// exception called out in the module doc above). This helper exists so
|
||||
|
||||
Reference in New Issue
Block a user