diff --git a/crates/kebab-cli/tests/wire_ask_stream.rs b/crates/kebab-cli/tests/wire_ask_stream.rs index 4ba341b..98c995a 100644 --- a/crates/kebab-cli/tests/wire_ask_stream.rs +++ b/crates/kebab-cli/tests/wire_ask_stream.rs @@ -73,8 +73,12 @@ fn stream_emits_ndjson_events_on_stderr() { } // First event must be retrieval_done. Last must be final. - // (If the LLM refused mid-stream we still expect the worker to - // emit a Final event — the pipeline always closes the stream.) + // Note: this test only exercises the LLM-running path which always + // closes with `final`. score-gate / no-chunks refusal paths emit + // only `retrieval_done` and skip `final` — that's why the test uses + // `relax_score_gate()` above to force the LLM path. See + // `stream_score_gate_refusal_emits_only_retrieval_done` for the + // refusal-path coverage. assert_eq!( kinds.first().map(String::as_str), Some("retrieval_done"), @@ -184,3 +188,54 @@ fn stream_cancels_when_stderr_closes() { // The load-bearing assertion is that wait() returned at all. let _ = status; } + +// p9-fb-33 (PR #124 round 1, item 4): score-gate refusal path — +// thin doc + unrelated query trips the default 0.30 score gate +// before the LLM runs. The pipeline emits only `retrieval_done` +// on stderr (no `token`, no `final`); stdout still carries the +// canonical `answer.v1` with `grounded=false`. +#[test] +#[ignore = "requires real Ollama on 127.0.0.1:11434"] +fn stream_score_gate_refusal_emits_only_retrieval_done() { + let dir = tempfile::tempdir().unwrap(); + let (cfg, workspace, _data) = + common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b"); + // Intentionally NO relax_score_gate — keep the default 0.30 + // so the thin-doc + unrelated-query combo trips refusal. + fs::write( + workspace.join("a.md"), + "# Title\n\nrust is a language.\n", + ) + .unwrap(); + common::ingest(&cfg, &workspace); + + let (stdout, stderr) = + common::run_ask_stream(&cfg, "completely unrelated topic about cooking pasta"); + + let kinds: Vec = stderr + .lines() + .filter(|l| !l.trim().is_empty()) + .filter_map(|l| serde_json::from_str::(l).ok()) + .filter_map(|v| v["kind"].as_str().map(String::from)) + .collect(); + + // Refusal path: only retrieval_done, no token, no final. + assert!( + kinds.iter().all(|k| k == "retrieval_done"), + "refusal path must emit only retrieval_done, got {kinds:?}" + ); + assert!( + !kinds.is_empty(), + "expected at least one retrieval_done event, got empty stderr" + ); + + // Stdout still has answer.v1 with grounded=false. + let final_line = stdout + .lines() + .last() + .expect("stdout has at least one line"); + let answer: Value = + serde_json::from_str(final_line).expect("answer.v1"); + assert_eq!(answer["schema_version"], "answer.v1"); + assert_eq!(answer["grounded"], false); +} diff --git a/crates/kebab-rag/src/pipeline.rs b/crates/kebab-rag/src/pipeline.rs index 21e7844..bf70900 100644 --- a/crates/kebab-rag/src/pipeline.rs +++ b/crates/kebab-rag/src/pipeline.rs @@ -12,9 +12,12 @@ //! ~4 chars / token, matching the kb-chunk convention). //! 4. Render the `rag-v1` prompt (system + user) verbatim per design. //! 5. Generate via `LanguageModel::generate_stream`. The token loop runs -//! on the calling thread; `opts.stream_sink` (if any) gets each -//! token forwarded synchronously and a dropped receiver does not -//! abort generation. +//! on the calling thread; `opts.stream_sink` (if any) emits +//! `StreamEvent::RetrievalDone` once after retrieve+stale-stamp, +//! `StreamEvent::Token` per LM chunk, and `StreamEvent::Final` on +//! success. A dropped receiver triggers cancel: SendError on Token +//! breaks the LM loop + records `RefusalReason::LlmStreamAborted` +//! in the persisted Answer (p9-fb-33). //! 6. Citation extract — STRICT regex `\[#(\d{1,3})\]`, no false //! positives from prose `[1]` / `vec![1]` / Markdown link refs. //! 7. Citation validate — every extracted marker must map to a packed @@ -83,11 +86,12 @@ type PackedContext = (String, Vec, usize); /// `RefusalReason::LlmStreamAborted`). #[derive(Clone, Debug, serde::Serialize)] #[serde(tag = "kind", rename_all = "snake_case")] -// `Final.answer` carries a full `Answer` (~320B) and is the largest -// variant; `Token` is the hot path. Size mismatch is unavoidable -// without boxing the wire-shape, which would force every consumer -// (TUI / CLI / future MCP) to deref. The sink is short-lived (one -// per ask) so the per-event overhead is not material. +// p9-fb-33: clippy flags Final.answer (~320B) as the heavy variant. +// In practice RetrievalDone.hits (Vec, k≤10×~1KB each) +// dominates per-emit cost, but it fires once. Boxing either would +// force every consumer (TUI, CLI ndjson driver, future MCP) to +// deref through a Box for marginal win on a short-lived per-ask +// channel. Keep both unboxed. #[allow(clippy::large_enum_variant)] pub enum StreamEvent { RetrievalDone { diff --git a/crates/kebab-rag/tests/streaming_events.rs b/crates/kebab-rag/tests/streaming_events.rs index 05be1b0..daa908d 100644 --- a/crates/kebab-rag/tests/streaming_events.rs +++ b/crates/kebab-rag/tests/streaming_events.rs @@ -129,3 +129,89 @@ fn ask_records_llm_stream_aborted_when_receiver_drops() { // Persistence still happens on cancel — the row is the audit trail. assert_eq!(env.count_answers(), 1, "answers row written on cancel"); } + +/// p9-fb-33 (PR #124 round 1, item 5): pin the "no Final on cancel" +/// invariant. Uses a barrier-gated LM so the test can observe the +/// `RetrievalDone` event before any `Token`/`Final` lands in the +/// channel — then drops `rx` to force SendError on the next `Token`. +/// The pipeline's cancel branch must avoid emitting `Final` and +/// record `RefusalReason::LlmStreamAborted`. +struct BlockingLm { + inner: MockLanguageModel, + /// Pipeline thread waits on this before yielding any token. + /// Test thread releases it after observing `RetrievalDone`. + gate: Arc, +} + +impl LanguageModel for BlockingLm { + fn model_ref(&self) -> kebab_core::ModelRef { + self.inner.model_ref() + } + fn context_tokens(&self) -> usize { + self.inner.context_tokens() + } + fn generate_stream( + &self, + req: kebab_core::GenerateRequest, + ) -> anyhow::Result> + Send>> { + // Block until the test signals — guarantees `RetrievalDone` + // arrives at the receiver before any `Token` is queued. + self.gate.wait(); + self.inner.generate_stream(req) + } +} + +#[test] +fn ask_emits_no_final_when_cancelled_mid_stream() { + use std::sync::Barrier; + + let env = RagEnv::new(); + let cid = id32("c1"); + let did = id32("d1"); + env.seed_chunk(&cid, &did, "notes/a.md", "apples are red.", &["Intro"]); + let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])]; + let retriever: Arc = Arc::new(MockRetriever::new(hits)); + + let gate = Arc::new(Barrier::new(2)); + let lm: Arc = Arc::new(BlockingLm { + inner: MockLanguageModel { + model_id: TEST_LM_ID.to_string(), + provider: "mock".to_string(), + context_tokens: 32_768, + canned_response: "apples are red. [#1]".to_string(), + canned_finish: FinishReason::Stop, + canned_usage: TokenUsage { + prompt_tokens: 10, + completion_tokens: 5, + latency_ms: 7, + }, + }, + gate: Arc::clone(&gate), + }); + let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone()); + + let (tx, rx) = mpsc::channel::(); + let opts = opts_with_sink(tx); + let handle = std::thread::spawn(move || pipeline.ask("apples", opts)); + + // Receive RetrievalDone first — pipeline emits this before + // calling generate_stream (where the LM blocks on the gate). + let first = rx.recv().expect("RetrievalDone must arrive"); + assert!( + matches!(first, StreamEvent::RetrievalDone { .. }), + "first event must be RetrievalDone, got {first:?}", + ); + + // Drop rx now, BEFORE releasing the gate. Once the LM unblocks + // and the pipeline tries to send the first Token, it'll get + // SendError → cancel branch. + drop(rx); + gate.wait(); + + let ans = handle.join().expect("ask thread").unwrap(); + + // Cancel was observed: no Final emitted, refusal recorded. + assert!(!ans.grounded); + assert_eq!(ans.refusal_reason, Some(RefusalReason::LlmStreamAborted)); + assert_eq!(env.count_answers(), 1, "answers row written on cancel"); +} diff --git a/docs/wire-schema/v1/answer_event.schema.json b/docs/wire-schema/v1/answer_event.schema.json index 80a46f9..8581d08 100644 --- a/docs/wire-schema/v1/answer_event.schema.json +++ b/docs/wire-schema/v1/answer_event.schema.json @@ -2,7 +2,7 @@ "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://kb.local/wire/v1/answer_event.schema.json", "title": "AnswerEvent v1", - "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. Terminal: `final`. Final stdout line is `answer.v1` for backwards compat (see ingest_progress.v1 precedent).", + "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. The success path closes with `final`; refusal paths (score_gate / no_chunks) emit only `retrieval_done` and rely on the stdout `answer.v1` line for the canonical signal. Cancel paths (BrokenPipe) may emit any prefix and stop. Final stdout line is always `answer.v1` for backwards compat (see ingest_progress.v1 precedent).", "type": "object", "required": ["schema_version", "kind", "ts"], "properties": { diff --git a/tasks/HOTFIXES.md b/tasks/HOTFIXES.md index 24fcb67..b8a785c 100644 --- a/tasks/HOTFIXES.md +++ b/tasks/HOTFIXES.md @@ -14,6 +14,19 @@ historical contract that was implemented; this file accumulates the deltas so phase 5+ readers can find the live behavior without diffing git history. +## 2026-05-09 — p9-fb-33: AskOpts.stream_sink type widened to StreamEvent + +**무엇이 바뀌었나**: `kebab_rag::AskOpts.stream_sink` 의 타입이 `Option>` 에서 `Option>` 로 변경됨. `kebab_app::StreamEvent` 가 새 re-export. + +**Spec contract 와의 관계**: spec §Domain API change 에서 명시한 internal API breaking. consumer = TUI worker 한 곳 (이번 PR 에서 같이 갱신). 외부 consumer 없음. + +**의식적 결정**: +- single sink 로 retrieval / token / final 세 stage 를 모두 운반하기 위한 필수 타입 변경. +- 기존 `Sender` 으로는 retrieval / final 단계를 표현할 방법이 없음. +- internal API 라 wire schema 와 다름 — `answer_event.v1` 는 신규 schema (additive minor at wire layer). + +**영향 받는 consumer**: `kebab-tui::ask::spawn_ask_worker` (PR #124 에서 동시 갱신). 외부 통합 없음. + ## 2026-05-09 — p9-fb-32: search_hit.v1 / citation.v1 required-field expansion **무엇이 바뀌었나**: `search_hit.v1` 과 `citation.v1` 의 `required` 배열에 `indexed_at` (RFC3339) + `stale` (bool) 두 필드가 추가됨. `schema_version` 은 그대로 (`search_hit.v1` / `citation.v1`).