fix(fb-33): address PR #124 round 1 review

- pipeline: refresh module docstring step 5 to reflect new cancel
  semantics (RetrievalDone/Token/Final + LlmStreamAborted)
- wire schema: spell out refusal-path behavior in answer_event.v1
  description (only retrieval_done emitted; no final)
- test: factual comment on relax_score_gate-using test corrected
- test: new Ollama-gated stream_score_gate_refusal_emits_only_retrieval_done
- test: new ask_emits_no_final_when_cancelled_mid_stream pinning
  the no-Final invariant on cancel
- pipeline: large_enum_variant comment broadened to acknowledge
  RetrievalDone.hits as the dominant per-emit cost
- HOTFIXES: log AskOpts.stream_sink internal API break per spec
  contract policy

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
th-kim0823
2026-05-09 15:46:04 +09:00
parent e1c6b7055a
commit a082b78f8e
5 changed files with 169 additions and 11 deletions

View File

@@ -73,8 +73,12 @@ fn stream_emits_ndjson_events_on_stderr() {
}
// First event must be retrieval_done. Last must be final.
// (If the LLM refused mid-stream we still expect the worker to
// emit a Final event — the pipeline always closes the stream.)
// Note: this test only exercises the LLM-running path which always
// closes with `final`. score-gate / no-chunks refusal paths emit
// only `retrieval_done` and skip `final` — that's why the test uses
// `relax_score_gate()` above to force the LLM path. See
// `stream_score_gate_refusal_emits_only_retrieval_done` for the
// refusal-path coverage.
assert_eq!(
kinds.first().map(String::as_str),
Some("retrieval_done"),
@@ -184,3 +188,54 @@ fn stream_cancels_when_stderr_closes() {
// The load-bearing assertion is that wait() returned at all.
let _ = status;
}
// p9-fb-33 (PR #124 round 1, item 4): score-gate refusal path —
// thin doc + unrelated query trips the default 0.30 score gate
// before the LLM runs. The pipeline emits only `retrieval_done`
// on stderr (no `token`, no `final`); stdout still carries the
// canonical `answer.v1` with `grounded=false`.
#[test]
#[ignore = "requires real Ollama on 127.0.0.1:11434"]
fn stream_score_gate_refusal_emits_only_retrieval_done() {
let dir = tempfile::tempdir().unwrap();
let (cfg, workspace, _data) =
common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
// Intentionally NO relax_score_gate — keep the default 0.30
// so the thin-doc + unrelated-query combo trips refusal.
fs::write(
workspace.join("a.md"),
"# Title\n\nrust is a language.\n",
)
.unwrap();
common::ingest(&cfg, &workspace);
let (stdout, stderr) =
common::run_ask_stream(&cfg, "completely unrelated topic about cooking pasta");
let kinds: Vec<String> = stderr
.lines()
.filter(|l| !l.trim().is_empty())
.filter_map(|l| serde_json::from_str::<Value>(l).ok())
.filter_map(|v| v["kind"].as_str().map(String::from))
.collect();
// Refusal path: only retrieval_done, no token, no final.
assert!(
kinds.iter().all(|k| k == "retrieval_done"),
"refusal path must emit only retrieval_done, got {kinds:?}"
);
assert!(
!kinds.is_empty(),
"expected at least one retrieval_done event, got empty stderr"
);
// Stdout still has answer.v1 with grounded=false.
let final_line = stdout
.lines()
.last()
.expect("stdout has at least one line");
let answer: Value =
serde_json::from_str(final_line).expect("answer.v1");
assert_eq!(answer["schema_version"], "answer.v1");
assert_eq!(answer["grounded"], false);
}

View File

@@ -12,9 +12,12 @@
//! ~4 chars / token, matching the kb-chunk convention).
//! 4. Render the `rag-v1` prompt (system + user) verbatim per design.
//! 5. Generate via `LanguageModel::generate_stream`. The token loop runs
//! on the calling thread; `opts.stream_sink` (if any) gets each
//! token forwarded synchronously and a dropped receiver does not
//! abort generation.
//! on the calling thread; `opts.stream_sink` (if any) emits
//! `StreamEvent::RetrievalDone` once after retrieve+stale-stamp,
//! `StreamEvent::Token` per LM chunk, and `StreamEvent::Final` on
//! success. A dropped receiver triggers cancel: SendError on Token
//! breaks the LM loop + records `RefusalReason::LlmStreamAborted`
//! in the persisted Answer (p9-fb-33).
//! 6. Citation extract — STRICT regex `\[#(\d{1,3})\]`, no false
//! positives from prose `[1]` / `vec![1]` / Markdown link refs.
//! 7. Citation validate — every extracted marker must map to a packed
@@ -83,11 +86,12 @@ type PackedContext = (String, Vec<PackedCitation>, usize);
/// `RefusalReason::LlmStreamAborted`).
#[derive(Clone, Debug, serde::Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
// `Final.answer` carries a full `Answer` (~320B) and is the largest
// variant; `Token` is the hot path. Size mismatch is unavoidable
// without boxing the wire-shape, which would force every consumer
// (TUI / CLI / future MCP) to deref. The sink is short-lived (one
// per ask) so the per-event overhead is not material.
// p9-fb-33: clippy flags Final.answer (~320B) as the heavy variant.
// In practice RetrievalDone.hits (Vec<SearchHit>, k≤10×~1KB each)
// dominates per-emit cost, but it fires once. Boxing either would
// force every consumer (TUI, CLI ndjson driver, future MCP) to
// deref through a Box for marginal win on a short-lived per-ask
// channel. Keep both unboxed.
#[allow(clippy::large_enum_variant)]
pub enum StreamEvent {
RetrievalDone {

View File

@@ -129,3 +129,89 @@ fn ask_records_llm_stream_aborted_when_receiver_drops() {
// Persistence still happens on cancel — the row is the audit trail.
assert_eq!(env.count_answers(), 1, "answers row written on cancel");
}
/// p9-fb-33 (PR #124 round 1, item 5): pin the "no Final on cancel"
/// invariant. Uses a barrier-gated LM so the test can observe the
/// `RetrievalDone` event before any `Token`/`Final` lands in the
/// channel — then drops `rx` to force SendError on the next `Token`.
/// The pipeline's cancel branch must avoid emitting `Final` and
/// record `RefusalReason::LlmStreamAborted`.
struct BlockingLm {
inner: MockLanguageModel,
/// Pipeline thread waits on this before yielding any token.
/// Test thread releases it after observing `RetrievalDone`.
gate: Arc<std::sync::Barrier>,
}
impl LanguageModel for BlockingLm {
fn model_ref(&self) -> kebab_core::ModelRef {
self.inner.model_ref()
}
fn context_tokens(&self) -> usize {
self.inner.context_tokens()
}
fn generate_stream(
&self,
req: kebab_core::GenerateRequest,
) -> anyhow::Result<Box<dyn Iterator<Item = anyhow::Result<TokenChunk>> + Send>> {
// Block until the test signals — guarantees `RetrievalDone`
// arrives at the receiver before any `Token` is queued.
self.gate.wait();
self.inner.generate_stream(req)
}
}
#[test]
fn ask_emits_no_final_when_cancelled_mid_stream() {
use std::sync::Barrier;
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "apples are red.", &["Intro"]);
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever: Arc<dyn Retriever> = Arc::new(MockRetriever::new(hits));
let gate = Arc::new(Barrier::new(2));
let lm: Arc<dyn LanguageModel> = Arc::new(BlockingLm {
inner: MockLanguageModel {
model_id: TEST_LM_ID.to_string(),
provider: "mock".to_string(),
context_tokens: 32_768,
canned_response: "apples are red. [#1]".to_string(),
canned_finish: FinishReason::Stop,
canned_usage: TokenUsage {
prompt_tokens: 10,
completion_tokens: 5,
latency_ms: 7,
},
},
gate: Arc::clone(&gate),
});
let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone());
let (tx, rx) = mpsc::channel::<StreamEvent>();
let opts = opts_with_sink(tx);
let handle = std::thread::spawn(move || pipeline.ask("apples", opts));
// Receive RetrievalDone first — pipeline emits this before
// calling generate_stream (where the LM blocks on the gate).
let first = rx.recv().expect("RetrievalDone must arrive");
assert!(
matches!(first, StreamEvent::RetrievalDone { .. }),
"first event must be RetrievalDone, got {first:?}",
);
// Drop rx now, BEFORE releasing the gate. Once the LM unblocks
// and the pipeline tries to send the first Token, it'll get
// SendError → cancel branch.
drop(rx);
gate.wait();
let ans = handle.join().expect("ask thread").unwrap();
// Cancel was observed: no Final emitted, refusal recorded.
assert!(!ans.grounded);
assert_eq!(ans.refusal_reason, Some(RefusalReason::LlmStreamAborted));
assert_eq!(env.count_answers(), 1, "answers row written on cancel");
}

View File

@@ -2,7 +2,7 @@
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://kb.local/wire/v1/answer_event.schema.json",
"title": "AnswerEvent v1",
"description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. Terminal: `final`. Final stdout line is `answer.v1` for backwards compat (see ingest_progress.v1 precedent).",
"description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. The success path closes with `final`; refusal paths (score_gate / no_chunks) emit only `retrieval_done` and rely on the stdout `answer.v1` line for the canonical signal. Cancel paths (BrokenPipe) may emit any prefix and stop. Final stdout line is always `answer.v1` for backwards compat (see ingest_progress.v1 precedent).",
"type": "object",
"required": ["schema_version", "kind", "ts"],
"properties": {

View File

@@ -14,6 +14,19 @@ historical contract that was implemented; this file accumulates the
deltas so phase 5+ readers can find the live behavior without diffing
git history.
## 2026-05-09 — p9-fb-33: AskOpts.stream_sink type widened to StreamEvent
**무엇이 바뀌었나**: `kebab_rag::AskOpts.stream_sink` 의 타입이 `Option<mpsc::Sender<String>>` 에서 `Option<mpsc::Sender<StreamEvent>>` 로 변경됨. `kebab_app::StreamEvent` 가 새 re-export.
**Spec contract 와의 관계**: spec §Domain API change 에서 명시한 internal API breaking. consumer = TUI worker 한 곳 (이번 PR 에서 같이 갱신). 외부 consumer 없음.
**의식적 결정**:
- single sink 로 retrieval / token / final 세 stage 를 모두 운반하기 위한 필수 타입 변경.
- 기존 `Sender<String>` 으로는 retrieval / final 단계를 표현할 방법이 없음.
- internal API 라 wire schema 와 다름 — `answer_event.v1` 는 신규 schema (additive minor at wire layer).
**영향 받는 consumer**: `kebab-tui::ask::spawn_ask_worker` (PR #124 에서 동시 갱신). 외부 통합 없음.
## 2026-05-09 — p9-fb-32: search_hit.v1 / citation.v1 required-field expansion
**무엇이 바뀌었나**: `search_hit.v1``citation.v1``required` 배열에 `indexed_at` (RFC3339) + `stale` (bool) 두 필드가 추가됨. `schema_version` 은 그대로 (`search_hit.v1` / `citation.v1`).