From a082b78f8e180aef3b00bd26c79b71675e7fd201 Mon Sep 17 00:00:00 2001
From: th-kim0823
Date: Sat, 9 May 2026 15:46:04 +0900
Subject: [PATCH] fix(fb-33): address PR #124 round 1 review
- pipeline: refresh module docstring step 5 to reflect new cancel
semantics (RetrievalDone/Token/Final + LlmStreamAborted)
- wire schema: spell out refusal-path behavior in answer_event.v1
description (only retrieval_done emitted; no final)
- test: factual comment on relax_score_gate-using test corrected
- test: new Ollama-gated stream_score_gate_refusal_emits_only_retrieval_done
- test: new ask_emits_no_final_when_cancelled_mid_stream pinning
the no-Final invariant on cancel
- pipeline: large_enum_variant comment broadened to acknowledge
RetrievalDone.hits as the dominant per-emit cost
- HOTFIXES: log AskOpts.stream_sink internal API break per spec
contract policy
Co-Authored-By: Claude Opus 4.7 (1M context)
---
crates/kebab-cli/tests/wire_ask_stream.rs | 59 +++++++++++++-
crates/kebab-rag/src/pipeline.rs | 20 +++--
crates/kebab-rag/tests/streaming_events.rs | 86 ++++++++++++++++++++
docs/wire-schema/v1/answer_event.schema.json | 2 +-
tasks/HOTFIXES.md | 13 +++
5 files changed, 169 insertions(+), 11 deletions(-)
diff --git a/crates/kebab-cli/tests/wire_ask_stream.rs b/crates/kebab-cli/tests/wire_ask_stream.rs
index 4ba341b..98c995a 100644
--- a/crates/kebab-cli/tests/wire_ask_stream.rs
+++ b/crates/kebab-cli/tests/wire_ask_stream.rs
@@ -73,8 +73,12 @@ fn stream_emits_ndjson_events_on_stderr() {
}
// First event must be retrieval_done. Last must be final.
- // (If the LLM refused mid-stream we still expect the worker to
- // emit a Final event — the pipeline always closes the stream.)
+ // Note: this test only exercises the LLM-running path which always
+ // closes with `final`. score-gate / no-chunks refusal paths emit
+ // only `retrieval_done` and skip `final` — that's why the test uses
+ // `relax_score_gate()` above to force the LLM path. See
+ // `stream_score_gate_refusal_emits_only_retrieval_done` for the
+ // refusal-path coverage.
assert_eq!(
kinds.first().map(String::as_str),
Some("retrieval_done"),
@@ -184,3 +188,54 @@ fn stream_cancels_when_stderr_closes() {
// The load-bearing assertion is that wait() returned at all.
let _ = status;
}
+
+// p9-fb-33 (PR #124 round 1, item 4): score-gate refusal path —
+// thin doc + unrelated query trips the default 0.30 score gate
+// before the LLM runs. The pipeline emits only `retrieval_done`
+// on stderr (no `token`, no `final`); stdout still carries the
+// canonical `answer.v1` with `grounded=false`.
+#[test]
+#[ignore = "requires real Ollama on 127.0.0.1:11434"]
+fn stream_score_gate_refusal_emits_only_retrieval_done() {
+ let dir = tempfile::tempdir().unwrap();
+ let (cfg, workspace, _data) =
+ common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
+ // Intentionally NO relax_score_gate — keep the default 0.30
+ // so the thin-doc + unrelated-query combo trips refusal.
+ fs::write(
+ workspace.join("a.md"),
+ "# Title\n\nrust is a language.\n",
+ )
+ .unwrap();
+ common::ingest(&cfg, &workspace);
+
+ let (stdout, stderr) =
+ common::run_ask_stream(&cfg, "completely unrelated topic about cooking pasta");
+
+ let kinds: Vec = stderr
+ .lines()
+ .filter(|l| !l.trim().is_empty())
+ .filter_map(|l| serde_json::from_str::(l).ok())
+ .filter_map(|v| v["kind"].as_str().map(String::from))
+ .collect();
+
+ // Refusal path: only retrieval_done, no token, no final.
+ assert!(
+ kinds.iter().all(|k| k == "retrieval_done"),
+ "refusal path must emit only retrieval_done, got {kinds:?}"
+ );
+ assert!(
+ !kinds.is_empty(),
+ "expected at least one retrieval_done event, got empty stderr"
+ );
+
+ // Stdout still has answer.v1 with grounded=false.
+ let final_line = stdout
+ .lines()
+ .last()
+ .expect("stdout has at least one line");
+ let answer: Value =
+ serde_json::from_str(final_line).expect("answer.v1");
+ assert_eq!(answer["schema_version"], "answer.v1");
+ assert_eq!(answer["grounded"], false);
+}
diff --git a/crates/kebab-rag/src/pipeline.rs b/crates/kebab-rag/src/pipeline.rs
index 21e7844..bf70900 100644
--- a/crates/kebab-rag/src/pipeline.rs
+++ b/crates/kebab-rag/src/pipeline.rs
@@ -12,9 +12,12 @@
//! ~4 chars / token, matching the kb-chunk convention).
//! 4. Render the `rag-v1` prompt (system + user) verbatim per design.
//! 5. Generate via `LanguageModel::generate_stream`. The token loop runs
-//! on the calling thread; `opts.stream_sink` (if any) gets each
-//! token forwarded synchronously and a dropped receiver does not
-//! abort generation.
+//! on the calling thread; `opts.stream_sink` (if any) emits
+//! `StreamEvent::RetrievalDone` once after retrieve+stale-stamp,
+//! `StreamEvent::Token` per LM chunk, and `StreamEvent::Final` on
+//! success. A dropped receiver triggers cancel: SendError on Token
+//! breaks the LM loop + records `RefusalReason::LlmStreamAborted`
+//! in the persisted Answer (p9-fb-33).
//! 6. Citation extract — STRICT regex `\[#(\d{1,3})\]`, no false
//! positives from prose `[1]` / `vec![1]` / Markdown link refs.
//! 7. Citation validate — every extracted marker must map to a packed
@@ -83,11 +86,12 @@ type PackedContext = (String, Vec, usize);
/// `RefusalReason::LlmStreamAborted`).
#[derive(Clone, Debug, serde::Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
-// `Final.answer` carries a full `Answer` (~320B) and is the largest
-// variant; `Token` is the hot path. Size mismatch is unavoidable
-// without boxing the wire-shape, which would force every consumer
-// (TUI / CLI / future MCP) to deref. The sink is short-lived (one
-// per ask) so the per-event overhead is not material.
+// p9-fb-33: clippy flags Final.answer (~320B) as the heavy variant.
+// In practice RetrievalDone.hits (Vec, k≤10×~1KB each)
+// dominates per-emit cost, but it fires once. Boxing either would
+// force every consumer (TUI, CLI ndjson driver, future MCP) to
+// deref through a Box for marginal win on a short-lived per-ask
+// channel. Keep both unboxed.
#[allow(clippy::large_enum_variant)]
pub enum StreamEvent {
RetrievalDone {
diff --git a/crates/kebab-rag/tests/streaming_events.rs b/crates/kebab-rag/tests/streaming_events.rs
index 05be1b0..daa908d 100644
--- a/crates/kebab-rag/tests/streaming_events.rs
+++ b/crates/kebab-rag/tests/streaming_events.rs
@@ -129,3 +129,89 @@ fn ask_records_llm_stream_aborted_when_receiver_drops() {
// Persistence still happens on cancel — the row is the audit trail.
assert_eq!(env.count_answers(), 1, "answers row written on cancel");
}
+
+/// p9-fb-33 (PR #124 round 1, item 5): pin the "no Final on cancel"
+/// invariant. Uses a barrier-gated LM so the test can observe the
+/// `RetrievalDone` event before any `Token`/`Final` lands in the
+/// channel — then drops `rx` to force SendError on the next `Token`.
+/// The pipeline's cancel branch must avoid emitting `Final` and
+/// record `RefusalReason::LlmStreamAborted`.
+struct BlockingLm {
+ inner: MockLanguageModel,
+ /// Pipeline thread waits on this before yielding any token.
+ /// Test thread releases it after observing `RetrievalDone`.
+ gate: Arc,
+}
+
+impl LanguageModel for BlockingLm {
+ fn model_ref(&self) -> kebab_core::ModelRef {
+ self.inner.model_ref()
+ }
+ fn context_tokens(&self) -> usize {
+ self.inner.context_tokens()
+ }
+ fn generate_stream(
+ &self,
+ req: kebab_core::GenerateRequest,
+ ) -> anyhow::Result> + Send>> {
+ // Block until the test signals — guarantees `RetrievalDone`
+ // arrives at the receiver before any `Token` is queued.
+ self.gate.wait();
+ self.inner.generate_stream(req)
+ }
+}
+
+#[test]
+fn ask_emits_no_final_when_cancelled_mid_stream() {
+ use std::sync::Barrier;
+
+ let env = RagEnv::new();
+ let cid = id32("c1");
+ let did = id32("d1");
+ env.seed_chunk(&cid, &did, "notes/a.md", "apples are red.", &["Intro"]);
+ let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
+ let retriever: Arc = Arc::new(MockRetriever::new(hits));
+
+ let gate = Arc::new(Barrier::new(2));
+ let lm: Arc = Arc::new(BlockingLm {
+ inner: MockLanguageModel {
+ model_id: TEST_LM_ID.to_string(),
+ provider: "mock".to_string(),
+ context_tokens: 32_768,
+ canned_response: "apples are red. [#1]".to_string(),
+ canned_finish: FinishReason::Stop,
+ canned_usage: TokenUsage {
+ prompt_tokens: 10,
+ completion_tokens: 5,
+ latency_ms: 7,
+ },
+ },
+ gate: Arc::clone(&gate),
+ });
+ let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone());
+
+ let (tx, rx) = mpsc::channel::();
+ let opts = opts_with_sink(tx);
+ let handle = std::thread::spawn(move || pipeline.ask("apples", opts));
+
+ // Receive RetrievalDone first — pipeline emits this before
+ // calling generate_stream (where the LM blocks on the gate).
+ let first = rx.recv().expect("RetrievalDone must arrive");
+ assert!(
+ matches!(first, StreamEvent::RetrievalDone { .. }),
+ "first event must be RetrievalDone, got {first:?}",
+ );
+
+ // Drop rx now, BEFORE releasing the gate. Once the LM unblocks
+ // and the pipeline tries to send the first Token, it'll get
+ // SendError → cancel branch.
+ drop(rx);
+ gate.wait();
+
+ let ans = handle.join().expect("ask thread").unwrap();
+
+ // Cancel was observed: no Final emitted, refusal recorded.
+ assert!(!ans.grounded);
+ assert_eq!(ans.refusal_reason, Some(RefusalReason::LlmStreamAborted));
+ assert_eq!(env.count_answers(), 1, "answers row written on cancel");
+}
diff --git a/docs/wire-schema/v1/answer_event.schema.json b/docs/wire-schema/v1/answer_event.schema.json
index 80a46f9..8581d08 100644
--- a/docs/wire-schema/v1/answer_event.schema.json
+++ b/docs/wire-schema/v1/answer_event.schema.json
@@ -2,7 +2,7 @@
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://kb.local/wire/v1/answer_event.schema.json",
"title": "AnswerEvent v1",
- "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. Terminal: `final`. Final stdout line is `answer.v1` for backwards compat (see ingest_progress.v1 precedent).",
+ "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. The success path closes with `final`; refusal paths (score_gate / no_chunks) emit only `retrieval_done` and rely on the stdout `answer.v1` line for the canonical signal. Cancel paths (BrokenPipe) may emit any prefix and stop. Final stdout line is always `answer.v1` for backwards compat (see ingest_progress.v1 precedent).",
"type": "object",
"required": ["schema_version", "kind", "ts"],
"properties": {
diff --git a/tasks/HOTFIXES.md b/tasks/HOTFIXES.md
index 24fcb67..b8a785c 100644
--- a/tasks/HOTFIXES.md
+++ b/tasks/HOTFIXES.md
@@ -14,6 +14,19 @@ historical contract that was implemented; this file accumulates the
deltas so phase 5+ readers can find the live behavior without diffing
git history.
+## 2026-05-09 — p9-fb-33: AskOpts.stream_sink type widened to StreamEvent
+
+**무엇이 바뀌었나**: `kebab_rag::AskOpts.stream_sink` 의 타입이 `Option>` 에서 `Option>` 로 변경됨. `kebab_app::StreamEvent` 가 새 re-export.
+
+**Spec contract 와의 관계**: spec §Domain API change 에서 명시한 internal API breaking. consumer = TUI worker 한 곳 (이번 PR 에서 같이 갱신). 외부 consumer 없음.
+
+**의식적 결정**:
+- single sink 로 retrieval / token / final 세 stage 를 모두 운반하기 위한 필수 타입 변경.
+- 기존 `Sender` 으로는 retrieval / final 단계를 표현할 방법이 없음.
+- internal API 라 wire schema 와 다름 — `answer_event.v1` 는 신규 schema (additive minor at wire layer).
+
+**영향 받는 consumer**: `kebab-tui::ask::spawn_ask_worker` (PR #124 에서 동시 갱신). 외부 통합 없음.
+
## 2026-05-09 — p9-fb-32: search_hit.v1 / citation.v1 required-field expansion
**무엇이 바뀌었나**: `search_hit.v1` 과 `citation.v1` 의 `required` 배열에 `indexed_at` (RFC3339) + `stale` (bool) 두 필드가 추가됨. `schema_version` 은 그대로 (`search_hit.v1` / `citation.v1`).