From 39bf0de94906889f3f096c4659caa369dbd40dad Mon Sep 17 00:00:00 2001
From: th-kim0823
Date: Sat, 9 May 2026 15:14:00 +0900
Subject: [PATCH] =?UTF-8?q?test(cli):=20wire=5Fask=5Fstream=20=E2=80=94=20?=
=?UTF-8?q?stderr=20ndjson=20+=20stdout=20final=20+=20BrokenPipe=20cancel?=
=?UTF-8?q?=20(fb-33)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Three Ollama-gated integration tests covering:
- stderr lines parse as answer_event.v1 (retrieval_done first,
final last, all carry RFC3339 ts).
- stdout final line is answer.v1 (backwards compat).
- non-stream path (--json without --stream) unchanged.
- BrokenPipe stderr → child terminates cleanly via cancel
propagation through pipeline SendError.
Co-Authored-By: Claude Opus 4.7 (1M context)
---
crates/kebab-cli/tests/common/mod.rs | 47 ++++++
crates/kebab-cli/tests/wire_ask_stream.rs | 186 ++++++++++++++++++++++
2 files changed, 233 insertions(+)
create mode 100644 crates/kebab-cli/tests/wire_ask_stream.rs
diff --git a/crates/kebab-cli/tests/common/mod.rs b/crates/kebab-cli/tests/common/mod.rs
index 7d66076..8926bd2 100644
--- a/crates/kebab-cli/tests/common/mod.rs
+++ b/crates/kebab-cli/tests/common/mod.rs
@@ -126,6 +126,53 @@ pub fn ingest(cfg: &Path, workspace: &Path) {
);
}
+/// p9-fb-33: invoke `kebab ask --stream --mode lexical ` and
+/// capture stdout + stderr. Lexical mode skips embeddings (matches
+/// `wire_ask_stale.rs::run_ask_lexical`). Caller asserts on the
+/// resulting (stdout, stderr) pair.
+pub fn run_ask_stream(cfg: &Path, query: &str) -> (String, String) {
+ let bin = env!("CARGO_BIN_EXE_kebab");
+ let out = Command::new(bin)
+ .args([
+ "--config",
+ cfg.to_str().unwrap(),
+ "ask",
+ "--stream",
+ "--mode",
+ "lexical",
+ query,
+ ])
+ .output()
+ .expect("kebab ask --stream");
+ (
+ String::from_utf8_lossy(&out.stdout).to_string(),
+ String::from_utf8_lossy(&out.stderr).to_string(),
+ )
+}
+
+/// p9-fb-33: invoke `kebab --json ask --mode lexical ` (no
+/// `--stream`) — used by `wire_ask_stream::non_stream_path_unchanged`
+/// to confirm the non-streaming JSON path still emits a single
+/// `answer.v1` line on stdout. Returns stdout only (mirrors
+/// `wire_ask_stale.rs::run_ask_lexical(json=true)` minus the
+/// `Output` indirection).
+pub fn run_ask_json(cfg: &Path, query: &str) -> String {
+ let bin = env!("CARGO_BIN_EXE_kebab");
+ let out = Command::new(bin)
+ .args([
+ "--config",
+ cfg.to_str().unwrap(),
+ "--json",
+ "ask",
+ "--mode",
+ "lexical",
+ query,
+ ])
+ .output()
+ .expect("kebab ask --json");
+ String::from_utf8_lossy(&out.stdout).to_string()
+}
+
/// Rewrite `documents.updated_at` for one workspace path to
/// `now - days_ago` (RFC3339 UTC). Mirrors
/// `kebab-app/tests/common/mod.rs::backdate_document_updated_at`.
diff --git a/crates/kebab-cli/tests/wire_ask_stream.rs b/crates/kebab-cli/tests/wire_ask_stream.rs
new file mode 100644
index 0000000..4ba341b
--- /dev/null
+++ b/crates/kebab-cli/tests/wire_ask_stream.rs
@@ -0,0 +1,186 @@
+//! p9-fb-33: CLI streaming surface — stderr ndjson `answer_event.v1`
+//! events while the answer streams; final stdout line is the existing
+//! `answer.v1` (backwards compat with the non-`--stream` path).
+//!
+//! These end-to-end checks exercise `kebab ask --stream`, which
+//! requires a real Ollama on `127.0.0.1:11434` (same constraint as
+//! `wire_ask_stale.rs` + `kebab-app/tests/ask_smoke.rs`). All three
+//! tests are therefore `#[ignore]` by default — run with
+//! `cargo test -p kebab-cli --test wire_ask_stream -- --ignored`
+//! against a live Ollama with `gemma4:e4b` pulled.
+//!
+//! The `BrokenPipe → cancel` test (Task 7 of the fb-33 plan) verifies
+//! that closing the stderr reader propagates SendError through the
+//! pipeline so the child terminates instead of hanging. That's the
+//! main thing the integration test layer can prove that unit tests
+//! can't — pipeline cancel is a cross-process concern.
+//!
+//! Shared TempDir / ingest helpers live in `tests/common/mod.rs`.
+
+mod common;
+
+use std::fs;
+use std::path::Path;
+
+use serde_json::Value;
+
+/// Drop `[rag].score_gate` to ~0 in the test config so the
+/// score-gate refusal path doesn't short-circuit the LLM call.
+/// Lexical retrieval against a one-doc corpus produces tiny fusion
+/// scores (well below the default 0.30 gate); the pipeline would
+/// take the `refuse_score_gate` early-return — which does not emit
+/// a `Final` event — making the streaming-event ordering assertion
+/// vacuous. Lower the gate so the LLM actually runs.
+fn relax_score_gate(cfg: &Path) {
+ let body = fs::read_to_string(cfg).expect("read config.toml");
+ let body = body.replace("score_gate = 0.30", "score_gate = 0.0");
+ fs::write(cfg, body).expect("write relaxed config.toml");
+}
+
+#[test]
+#[ignore = "requires real Ollama on 127.0.0.1:11434"]
+fn stream_emits_ndjson_events_on_stderr() {
+ let dir = tempfile::tempdir().unwrap();
+ let (cfg, workspace, _data) =
+ common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
+ relax_score_gate(&cfg);
+ fs::write(
+ workspace.join("a.md"),
+ "# T\n\nrust ownership is a memory model.\n",
+ )
+ .unwrap();
+ common::ingest(&cfg, &workspace);
+
+ let (stdout, stderr) = common::run_ask_stream(&cfg, "ownership");
+
+ // stderr: every non-empty line should parse as JSON with
+ // schema_version == "answer_event.v1" and a recognized kind.
+ let mut kinds: Vec = vec![];
+ for line in stderr.lines() {
+ if line.trim().is_empty() {
+ continue;
+ }
+ let v: Value = serde_json::from_str(line)
+ .unwrap_or_else(|e| panic!("non-JSON stderr line: {line:?}: {e}"));
+ assert_eq!(v["schema_version"], "answer_event.v1");
+ let kind = v["kind"].as_str().expect("kind").to_string();
+ assert!(
+ matches!(kind.as_str(), "retrieval_done" | "token" | "final"),
+ "unexpected kind: {kind}"
+ );
+ assert!(v["ts"].is_string(), "ts must be RFC3339 string");
+ kinds.push(kind);
+ }
+
+ // First event must be retrieval_done. Last must be final.
+ // (If the LLM refused mid-stream we still expect the worker to
+ // emit a Final event — the pipeline always closes the stream.)
+ assert_eq!(
+ kinds.first().map(String::as_str),
+ Some("retrieval_done"),
+ "first event must be retrieval_done, all kinds: {kinds:?}"
+ );
+ assert_eq!(
+ kinds.last().map(String::as_str),
+ Some("final"),
+ "last event must be final, all kinds: {kinds:?}"
+ );
+
+ // stdout: last line is answer.v1 (backwards compat with the
+ // non-streaming path — same wire shape, just emitted after the
+ // ndjson event stream rather than instead of it).
+ let final_line = stdout
+ .lines()
+ .last()
+ .expect("stdout has at least one line");
+ let answer: Value =
+ serde_json::from_str(final_line).expect("stdout final line = answer.v1");
+ assert_eq!(answer["schema_version"], "answer.v1");
+}
+
+#[test]
+#[ignore = "requires real Ollama on 127.0.0.1:11434"]
+fn non_stream_path_unchanged() {
+ // Verify that the non-streaming JSON path (no `--stream`) still
+ // emits a single `answer.v1` line on stdout — fb-33 must not
+ // perturb the existing wire surface.
+ let dir = tempfile::tempdir().unwrap();
+ let (cfg, workspace, _data) =
+ common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
+ relax_score_gate(&cfg);
+ fs::write(
+ workspace.join("a.md"),
+ "# T\n\nrust ownership is a memory model.\n",
+ )
+ .unwrap();
+ common::ingest(&cfg, &workspace);
+
+ let stdout = common::run_ask_json(&cfg, "ownership");
+ let v: Value = serde_json::from_str(stdout.trim())
+ .unwrap_or_else(|e| panic!("expected answer.v1, got {stdout:?}: {e}"));
+ assert_eq!(v["schema_version"], "answer.v1");
+}
+
+// p9-fb-33 (Task 7): BrokenPipe → cancel propagation. Spawn the
+// binary, read the first stderr line (retrieval_done), drop the
+// reader. The pipeline's next `Token` send returns SendError, the
+// cancel branch fires, child.wait() returns instead of blocking
+// forever. The key invariant is *liveness* — that `wait()` returns
+// in bounded time. Don't assert exit code: refusal is exit 1, but
+// the child may also exit 0 if the LLM happened to finish before
+// cancel propagated.
+#[test]
+#[ignore = "requires real Ollama on 127.0.0.1:11434 + writes to a closed pipe"]
+fn stream_cancels_when_stderr_closes() {
+ use std::io::{BufRead, BufReader};
+ use std::process::{Command, Stdio};
+
+ let dir = tempfile::tempdir().unwrap();
+ let (cfg, workspace, _data) =
+ common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
+ relax_score_gate(&cfg);
+ fs::write(
+ workspace.join("a.md"),
+ "# T\n\nrust ownership is a memory model. it tracks lifetimes.\n",
+ )
+ .unwrap();
+ common::ingest(&cfg, &workspace);
+
+ let bin = env!("CARGO_BIN_EXE_kebab");
+ let mut child = Command::new(bin)
+ .args([
+ "--config",
+ cfg.to_str().unwrap(),
+ "ask",
+ "--stream",
+ "--mode",
+ "lexical",
+ "ownership",
+ ])
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()
+ .expect("spawn kebab");
+
+ {
+ let stderr = child.stderr.take().expect("stderr piped");
+ let mut reader = BufReader::new(stderr);
+ let mut first = String::new();
+ reader
+ .read_line(&mut first)
+ .expect("read first stderr line");
+ assert!(
+ first.contains("\"kind\":\"retrieval_done\""),
+ "first event must be retrieval_done, got {first:?}"
+ );
+ // Drop the reader → child's stderr write end will see
+ // BrokenPipe on the next write → main thread drops rx →
+ // worker's pipeline.send returns SendError → cancel.
+ }
+
+ let status = child.wait().expect("child completes after cancel");
+ // Don't assert specific exit code — refusal is exit 1, but child
+ // may also exit 0 if the LLM finished before cancel propagated.
+ // The load-bearing assertion is that wait() returned at all.
+ let _ = status;
+}