diff --git a/crates/kebab-cli/tests/common/mod.rs b/crates/kebab-cli/tests/common/mod.rs index 7d66076..8926bd2 100644 --- a/crates/kebab-cli/tests/common/mod.rs +++ b/crates/kebab-cli/tests/common/mod.rs @@ -126,6 +126,53 @@ pub fn ingest(cfg: &Path, workspace: &Path) { ); } +/// p9-fb-33: invoke `kebab ask --stream --mode lexical ` and +/// capture stdout + stderr. Lexical mode skips embeddings (matches +/// `wire_ask_stale.rs::run_ask_lexical`). Caller asserts on the +/// resulting (stdout, stderr) pair. +pub fn run_ask_stream(cfg: &Path, query: &str) -> (String, String) { + let bin = env!("CARGO_BIN_EXE_kebab"); + let out = Command::new(bin) + .args([ + "--config", + cfg.to_str().unwrap(), + "ask", + "--stream", + "--mode", + "lexical", + query, + ]) + .output() + .expect("kebab ask --stream"); + ( + String::from_utf8_lossy(&out.stdout).to_string(), + String::from_utf8_lossy(&out.stderr).to_string(), + ) +} + +/// p9-fb-33: invoke `kebab --json ask --mode lexical ` (no +/// `--stream`) — used by `wire_ask_stream::non_stream_path_unchanged` +/// to confirm the non-streaming JSON path still emits a single +/// `answer.v1` line on stdout. Returns stdout only (mirrors +/// `wire_ask_stale.rs::run_ask_lexical(json=true)` minus the +/// `Output` indirection). +pub fn run_ask_json(cfg: &Path, query: &str) -> String { + let bin = env!("CARGO_BIN_EXE_kebab"); + let out = Command::new(bin) + .args([ + "--config", + cfg.to_str().unwrap(), + "--json", + "ask", + "--mode", + "lexical", + query, + ]) + .output() + .expect("kebab ask --json"); + String::from_utf8_lossy(&out.stdout).to_string() +} + /// Rewrite `documents.updated_at` for one workspace path to /// `now - days_ago` (RFC3339 UTC). Mirrors /// `kebab-app/tests/common/mod.rs::backdate_document_updated_at`. diff --git a/crates/kebab-cli/tests/wire_ask_stream.rs b/crates/kebab-cli/tests/wire_ask_stream.rs new file mode 100644 index 0000000..4ba341b --- /dev/null +++ b/crates/kebab-cli/tests/wire_ask_stream.rs @@ -0,0 +1,186 @@ +//! p9-fb-33: CLI streaming surface — stderr ndjson `answer_event.v1` +//! events while the answer streams; final stdout line is the existing +//! `answer.v1` (backwards compat with the non-`--stream` path). +//! +//! These end-to-end checks exercise `kebab ask --stream`, which +//! requires a real Ollama on `127.0.0.1:11434` (same constraint as +//! `wire_ask_stale.rs` + `kebab-app/tests/ask_smoke.rs`). All three +//! tests are therefore `#[ignore]` by default — run with +//! `cargo test -p kebab-cli --test wire_ask_stream -- --ignored` +//! against a live Ollama with `gemma4:e4b` pulled. +//! +//! The `BrokenPipe → cancel` test (Task 7 of the fb-33 plan) verifies +//! that closing the stderr reader propagates SendError through the +//! pipeline so the child terminates instead of hanging. That's the +//! main thing the integration test layer can prove that unit tests +//! can't — pipeline cancel is a cross-process concern. +//! +//! Shared TempDir / ingest helpers live in `tests/common/mod.rs`. + +mod common; + +use std::fs; +use std::path::Path; + +use serde_json::Value; + +/// Drop `[rag].score_gate` to ~0 in the test config so the +/// score-gate refusal path doesn't short-circuit the LLM call. +/// Lexical retrieval against a one-doc corpus produces tiny fusion +/// scores (well below the default 0.30 gate); the pipeline would +/// take the `refuse_score_gate` early-return — which does not emit +/// a `Final` event — making the streaming-event ordering assertion +/// vacuous. Lower the gate so the LLM actually runs. +fn relax_score_gate(cfg: &Path) { + let body = fs::read_to_string(cfg).expect("read config.toml"); + let body = body.replace("score_gate = 0.30", "score_gate = 0.0"); + fs::write(cfg, body).expect("write relaxed config.toml"); +} + +#[test] +#[ignore = "requires real Ollama on 127.0.0.1:11434"] +fn stream_emits_ndjson_events_on_stderr() { + let dir = tempfile::tempdir().unwrap(); + let (cfg, workspace, _data) = + common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b"); + relax_score_gate(&cfg); + fs::write( + workspace.join("a.md"), + "# T\n\nrust ownership is a memory model.\n", + ) + .unwrap(); + common::ingest(&cfg, &workspace); + + let (stdout, stderr) = common::run_ask_stream(&cfg, "ownership"); + + // stderr: every non-empty line should parse as JSON with + // schema_version == "answer_event.v1" and a recognized kind. + let mut kinds: Vec = vec![]; + for line in stderr.lines() { + if line.trim().is_empty() { + continue; + } + let v: Value = serde_json::from_str(line) + .unwrap_or_else(|e| panic!("non-JSON stderr line: {line:?}: {e}")); + assert_eq!(v["schema_version"], "answer_event.v1"); + let kind = v["kind"].as_str().expect("kind").to_string(); + assert!( + matches!(kind.as_str(), "retrieval_done" | "token" | "final"), + "unexpected kind: {kind}" + ); + assert!(v["ts"].is_string(), "ts must be RFC3339 string"); + kinds.push(kind); + } + + // First event must be retrieval_done. Last must be final. + // (If the LLM refused mid-stream we still expect the worker to + // emit a Final event — the pipeline always closes the stream.) + assert_eq!( + kinds.first().map(String::as_str), + Some("retrieval_done"), + "first event must be retrieval_done, all kinds: {kinds:?}" + ); + assert_eq!( + kinds.last().map(String::as_str), + Some("final"), + "last event must be final, all kinds: {kinds:?}" + ); + + // stdout: last line is answer.v1 (backwards compat with the + // non-streaming path — same wire shape, just emitted after the + // ndjson event stream rather than instead of it). + let final_line = stdout + .lines() + .last() + .expect("stdout has at least one line"); + let answer: Value = + serde_json::from_str(final_line).expect("stdout final line = answer.v1"); + assert_eq!(answer["schema_version"], "answer.v1"); +} + +#[test] +#[ignore = "requires real Ollama on 127.0.0.1:11434"] +fn non_stream_path_unchanged() { + // Verify that the non-streaming JSON path (no `--stream`) still + // emits a single `answer.v1` line on stdout — fb-33 must not + // perturb the existing wire surface. + let dir = tempfile::tempdir().unwrap(); + let (cfg, workspace, _data) = + common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b"); + relax_score_gate(&cfg); + fs::write( + workspace.join("a.md"), + "# T\n\nrust ownership is a memory model.\n", + ) + .unwrap(); + common::ingest(&cfg, &workspace); + + let stdout = common::run_ask_json(&cfg, "ownership"); + let v: Value = serde_json::from_str(stdout.trim()) + .unwrap_or_else(|e| panic!("expected answer.v1, got {stdout:?}: {e}")); + assert_eq!(v["schema_version"], "answer.v1"); +} + +// p9-fb-33 (Task 7): BrokenPipe → cancel propagation. Spawn the +// binary, read the first stderr line (retrieval_done), drop the +// reader. The pipeline's next `Token` send returns SendError, the +// cancel branch fires, child.wait() returns instead of blocking +// forever. The key invariant is *liveness* — that `wait()` returns +// in bounded time. Don't assert exit code: refusal is exit 1, but +// the child may also exit 0 if the LLM happened to finish before +// cancel propagated. +#[test] +#[ignore = "requires real Ollama on 127.0.0.1:11434 + writes to a closed pipe"] +fn stream_cancels_when_stderr_closes() { + use std::io::{BufRead, BufReader}; + use std::process::{Command, Stdio}; + + let dir = tempfile::tempdir().unwrap(); + let (cfg, workspace, _data) = + common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b"); + relax_score_gate(&cfg); + fs::write( + workspace.join("a.md"), + "# T\n\nrust ownership is a memory model. it tracks lifetimes.\n", + ) + .unwrap(); + common::ingest(&cfg, &workspace); + + let bin = env!("CARGO_BIN_EXE_kebab"); + let mut child = Command::new(bin) + .args([ + "--config", + cfg.to_str().unwrap(), + "ask", + "--stream", + "--mode", + "lexical", + "ownership", + ]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("spawn kebab"); + + { + let stderr = child.stderr.take().expect("stderr piped"); + let mut reader = BufReader::new(stderr); + let mut first = String::new(); + reader + .read_line(&mut first) + .expect("read first stderr line"); + assert!( + first.contains("\"kind\":\"retrieval_done\""), + "first event must be retrieval_done, got {first:?}" + ); + // Drop the reader → child's stderr write end will see + // BrokenPipe on the next write → main thread drops rx → + // worker's pipeline.send returns SendError → cancel. + } + + let status = child.wait().expect("child completes after cancel"); + // Don't assert specific exit code — refusal is exit 1, but child + // may also exit 0 if the LLM finished before cancel propagated. + // The load-bearing assertion is that wait() returned at all. + let _ = status; +}