Three Ollama-gated integration tests covering: - stderr lines parse as answer_event.v1 (retrieval_done first, final last, all carry RFC3339 ts). - stdout final line is answer.v1 (backwards compat). - non-stream path (--json without --stream) unchanged. - BrokenPipe stderr → child terminates cleanly via cancel propagation through pipeline SendError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
187 lines
7.1 KiB
Rust
187 lines
7.1 KiB
Rust
//! p9-fb-33: CLI streaming surface — stderr ndjson `answer_event.v1`
|
|
//! events while the answer streams; final stdout line is the existing
|
|
//! `answer.v1` (backwards compat with the non-`--stream` path).
|
|
//!
|
|
//! These end-to-end checks exercise `kebab ask --stream`, which
|
|
//! requires a real Ollama on `127.0.0.1:11434` (same constraint as
|
|
//! `wire_ask_stale.rs` + `kebab-app/tests/ask_smoke.rs`). All three
|
|
//! tests are therefore `#[ignore]` by default — run with
|
|
//! `cargo test -p kebab-cli --test wire_ask_stream -- --ignored`
|
|
//! against a live Ollama with `gemma4:e4b` pulled.
|
|
//!
|
|
//! The `BrokenPipe → cancel` test (Task 7 of the fb-33 plan) verifies
|
|
//! that closing the stderr reader propagates SendError through the
|
|
//! pipeline so the child terminates instead of hanging. That's the
|
|
//! main thing the integration test layer can prove that unit tests
|
|
//! can't — pipeline cancel is a cross-process concern.
|
|
//!
|
|
//! Shared TempDir / ingest helpers live in `tests/common/mod.rs`.
|
|
|
|
mod common;
|
|
|
|
use std::fs;
|
|
use std::path::Path;
|
|
|
|
use serde_json::Value;
|
|
|
|
/// Drop `[rag].score_gate` to ~0 in the test config so the
|
|
/// score-gate refusal path doesn't short-circuit the LLM call.
|
|
/// Lexical retrieval against a one-doc corpus produces tiny fusion
|
|
/// scores (well below the default 0.30 gate); the pipeline would
|
|
/// take the `refuse_score_gate` early-return — which does not emit
|
|
/// a `Final` event — making the streaming-event ordering assertion
|
|
/// vacuous. Lower the gate so the LLM actually runs.
|
|
fn relax_score_gate(cfg: &Path) {
|
|
let body = fs::read_to_string(cfg).expect("read config.toml");
|
|
let body = body.replace("score_gate = 0.30", "score_gate = 0.0");
|
|
fs::write(cfg, body).expect("write relaxed config.toml");
|
|
}
|
|
|
|
#[test]
|
|
#[ignore = "requires real Ollama on 127.0.0.1:11434"]
|
|
fn stream_emits_ndjson_events_on_stderr() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let (cfg, workspace, _data) =
|
|
common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
|
|
relax_score_gate(&cfg);
|
|
fs::write(
|
|
workspace.join("a.md"),
|
|
"# T\n\nrust ownership is a memory model.\n",
|
|
)
|
|
.unwrap();
|
|
common::ingest(&cfg, &workspace);
|
|
|
|
let (stdout, stderr) = common::run_ask_stream(&cfg, "ownership");
|
|
|
|
// stderr: every non-empty line should parse as JSON with
|
|
// schema_version == "answer_event.v1" and a recognized kind.
|
|
let mut kinds: Vec<String> = vec![];
|
|
for line in stderr.lines() {
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
let v: Value = serde_json::from_str(line)
|
|
.unwrap_or_else(|e| panic!("non-JSON stderr line: {line:?}: {e}"));
|
|
assert_eq!(v["schema_version"], "answer_event.v1");
|
|
let kind = v["kind"].as_str().expect("kind").to_string();
|
|
assert!(
|
|
matches!(kind.as_str(), "retrieval_done" | "token" | "final"),
|
|
"unexpected kind: {kind}"
|
|
);
|
|
assert!(v["ts"].is_string(), "ts must be RFC3339 string");
|
|
kinds.push(kind);
|
|
}
|
|
|
|
// First event must be retrieval_done. Last must be final.
|
|
// (If the LLM refused mid-stream we still expect the worker to
|
|
// emit a Final event — the pipeline always closes the stream.)
|
|
assert_eq!(
|
|
kinds.first().map(String::as_str),
|
|
Some("retrieval_done"),
|
|
"first event must be retrieval_done, all kinds: {kinds:?}"
|
|
);
|
|
assert_eq!(
|
|
kinds.last().map(String::as_str),
|
|
Some("final"),
|
|
"last event must be final, all kinds: {kinds:?}"
|
|
);
|
|
|
|
// stdout: last line is answer.v1 (backwards compat with the
|
|
// non-streaming path — same wire shape, just emitted after the
|
|
// ndjson event stream rather than instead of it).
|
|
let final_line = stdout
|
|
.lines()
|
|
.last()
|
|
.expect("stdout has at least one line");
|
|
let answer: Value =
|
|
serde_json::from_str(final_line).expect("stdout final line = answer.v1");
|
|
assert_eq!(answer["schema_version"], "answer.v1");
|
|
}
|
|
|
|
#[test]
|
|
#[ignore = "requires real Ollama on 127.0.0.1:11434"]
|
|
fn non_stream_path_unchanged() {
|
|
// Verify that the non-streaming JSON path (no `--stream`) still
|
|
// emits a single `answer.v1` line on stdout — fb-33 must not
|
|
// perturb the existing wire surface.
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let (cfg, workspace, _data) =
|
|
common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
|
|
relax_score_gate(&cfg);
|
|
fs::write(
|
|
workspace.join("a.md"),
|
|
"# T\n\nrust ownership is a memory model.\n",
|
|
)
|
|
.unwrap();
|
|
common::ingest(&cfg, &workspace);
|
|
|
|
let stdout = common::run_ask_json(&cfg, "ownership");
|
|
let v: Value = serde_json::from_str(stdout.trim())
|
|
.unwrap_or_else(|e| panic!("expected answer.v1, got {stdout:?}: {e}"));
|
|
assert_eq!(v["schema_version"], "answer.v1");
|
|
}
|
|
|
|
// p9-fb-33 (Task 7): BrokenPipe → cancel propagation. Spawn the
|
|
// binary, read the first stderr line (retrieval_done), drop the
|
|
// reader. The pipeline's next `Token` send returns SendError, the
|
|
// cancel branch fires, child.wait() returns instead of blocking
|
|
// forever. The key invariant is *liveness* — that `wait()` returns
|
|
// in bounded time. Don't assert exit code: refusal is exit 1, but
|
|
// the child may also exit 0 if the LLM happened to finish before
|
|
// cancel propagated.
|
|
#[test]
|
|
#[ignore = "requires real Ollama on 127.0.0.1:11434 + writes to a closed pipe"]
|
|
fn stream_cancels_when_stderr_closes() {
|
|
use std::io::{BufRead, BufReader};
|
|
use std::process::{Command, Stdio};
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let (cfg, workspace, _data) =
|
|
common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
|
|
relax_score_gate(&cfg);
|
|
fs::write(
|
|
workspace.join("a.md"),
|
|
"# T\n\nrust ownership is a memory model. it tracks lifetimes.\n",
|
|
)
|
|
.unwrap();
|
|
common::ingest(&cfg, &workspace);
|
|
|
|
let bin = env!("CARGO_BIN_EXE_kebab");
|
|
let mut child = Command::new(bin)
|
|
.args([
|
|
"--config",
|
|
cfg.to_str().unwrap(),
|
|
"ask",
|
|
"--stream",
|
|
"--mode",
|
|
"lexical",
|
|
"ownership",
|
|
])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.spawn()
|
|
.expect("spawn kebab");
|
|
|
|
{
|
|
let stderr = child.stderr.take().expect("stderr piped");
|
|
let mut reader = BufReader::new(stderr);
|
|
let mut first = String::new();
|
|
reader
|
|
.read_line(&mut first)
|
|
.expect("read first stderr line");
|
|
assert!(
|
|
first.contains("\"kind\":\"retrieval_done\""),
|
|
"first event must be retrieval_done, got {first:?}"
|
|
);
|
|
// Drop the reader → child's stderr write end will see
|
|
// BrokenPipe on the next write → main thread drops rx →
|
|
// worker's pipeline.send returns SendError → cancel.
|
|
}
|
|
|
|
let status = child.wait().expect("child completes after cancel");
|
|
// Don't assert specific exit code — refusal is exit 1, but child
|
|
// may also exit 0 if the LLM finished before cancel propagated.
|
|
// The load-bearing assertion is that wait() returned at all.
|
|
let _ = status;
|
|
}
|