From 4949775c8b65a246a105238561f9a03eb84a1e20 Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 14:10:08 +0900 Subject: [PATCH 01/11] =?UTF-8?q?spec(fb-33):=20streaming=20ask=20(ndjson?= =?UTF-8?q?=20delta)=20=E2=80=94=20design?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 3-variant StreamEvent enum (RetrievalDone / Token / Final) 을 통해 RagPipeline 이 retrieval / per-token / final 단계를 sink 로 발사. CLI `kebab ask --stream` 이 ndjson event 를 stderr 로 흘리고 final stdout line 은 기존 answer.v1 그대로 (ingest_progress.v1 패턴). Cancel = stdout 닫힘 → SendError → LLM stream break + RefusalReason::LlmStreamAborted 로 partial answer 기록. MCP streaming 은 v0.5+ 별도 검토 (scope out). Co-Authored-By: Claude Opus 4.7 (1M context) --- ...026-05-09-p9-fb-33-streaming-ask-design.md | 253 ++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md diff --git a/docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md b/docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md new file mode 100644 index 0000000..def093e --- /dev/null +++ b/docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md @@ -0,0 +1,253 @@ +--- +title: "p9-fb-33 — Streaming ask (ndjson delta) design" +phase: P9 +component: kebab-rag + kebab-cli + kebab-tui + wire-schema +task_id: p9-fb-33 +status: design +target_version: 0.5.0 +contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md +contract_sections: [§7 RAG, §10 UX, wire-schema answer.v1] +date: 2026-05-09 +--- + +# p9-fb-33 — Streaming ask (ndjson delta) + +## Goal + +`kebab ask --stream` — agent 가 LLM token 을 도착 즉시 소비할 수 있도록 retrieval / token / final 세 단계 ndjson event 를 stderr 에 흘리고, 마지막 stdout 한 줄은 기존 `answer.v1` 그대로 유지. CLI surface 우선, MCP `kebab__ask` streaming 은 v0.5+ 별도 검토 (이 spec 의 scope 아님). + +## Behavior contract + +### Stream event taxonomy + +3 variant 로 confined. `kind` discriminator + `ts` 타임스탬프 + variant 별 페이로드. + +1. **`retrieval_done`** — pipeline 의 retrieve + stale-stamp 직후 1회. 페이로드는 `hits: search_hit.v1[]` (fb-32 의 `indexed_at` / `stale` 포함). +2. **`token`** — LLM 의 `TokenChunk::Token` 매 도착 시. 페이로드는 `delta: string` + `turn_index: integer | null` (multi-turn ask 의 `Answer.turn_index` 와 일치). +3. **`final`** — 모든 token 수신 + citation extract / validate 완료 후 1회. 페이로드는 `answer: answer.v1` (스키마 v1 통째). + +terminal event = `final`. 모든 ask 는 `final` 또는 (cancel 경로) 0개 event 로 끝남 — 후자는 ndjson 흐름이 중간에 끊긴 형태. + +### CLI flag + +`kebab ask --stream` (boolean flag, default off). `--json` 와 독립: + +| flag 조합 | stderr | stdout | +|----------|--------|--------| +| (없음) | (없음) | plain text answer + 근거 블록 | +| `--json` | (없음) | `answer.v1` 1회 | +| `--stream` | ndjson `answer_event.v1` events | `answer.v1` 1회 (final stdout line) | +| `--stream --json` | 동일 (stream 이 dominant) | 동일 | + +backwards-compat: `--stream` 미사용 시 모든 동작 보존. + +### Output stream + +- ndjson event → **stderr**. 매 줄 한 event, `serde_json::to_string` + `writeln!`. +- final `answer.v1` → **stdout**. 기존 final-only consumer 가 stdout 만 파싱해도 호환. +- 선례: `ingest_progress.v1` 가 stderr ndjson + stdout `ingest_report.v1` final 패턴 사용. + +### Cancel semantics + +`kebab ask --stream` 의 stdout/stderr 가 외부에서 닫힘 (예: agent 가 SIGPIPE / `head -c 1` / connection close): + +1. CLI main thread 의 `writeln!(stderr, ...)` 가 `io::ErrorKind::BrokenPipe` 반환. +2. CLI 가 receiver 폐기 (rx drop). +3. background thread 의 `pipeline.ask` 가 `stream_sink.send(StreamEvent::Token { .. })` 시 `SendError` 반환. +4. pipeline 의 token loop — 현재 `let _ = sink.send(t)` 로 swallow 하지만 본 task 에서 cancel 분기 추가: `SendError` 감지 시 LLM stream `break`, `finish_reason = FinishReason::Cancelled`, `RefusalReason::LlmStreamAborted` 로 Answer 채움, `answers` 테이블에 partial answer + cancel 사유 기록. +5. CLI background thread join → cancel 사유 명시한 Answer return → CLI 종료. stdout 은 이미 닫혀 final answer.v1 출력 시도해도 BrokenPipe 무시. + +`io::ErrorKind::BrokenPipe` 만 cancel 처리. 그 외 IoError 는 fatal — `error.v1` stderr emit + exit 2. + +### Wire schema delta + +신규 `docs/wire-schema/v1/answer_event.schema.json`: + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://kb.local/wire/v1/answer_event.schema.json", + "title": "AnswerEvent v1", + "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr. Discriminated by `kind`. Terminal: `final`. Final stdout line is `answer.v1` for backwards compat.", + "type": "object", + "required": ["schema_version", "kind", "ts"], + "properties": { + "schema_version": { "const": "answer_event.v1" }, + "kind": { "enum": ["retrieval_done", "token", "final"] }, + "ts": { "type": "string", "format": "date-time" }, + "hits": { "type": "array", "description": "retrieval_done: search_hit.v1[]" }, + "delta": { "type": "string", "description": "token: incremental string chunk" }, + "turn_index": { "type": ["integer", "null"], "minimum": 0, + "description": "token: matches Answer.turn_index" }, + "answer": { "type": "object", "description": "final: complete answer.v1 payload" } + } +} +``` + +기존 `answer.v1` / `search_hit.v1` / `citation.v1` 변경 없음. + +### Domain API change + +`kebab-rag::pipeline`: + +```rust +#[derive(Clone, Debug)] +pub enum StreamEvent { + RetrievalDone { hits: Vec }, + Token { delta: String, turn_index: Option }, + Final { answer: Answer }, +} + +pub struct AskOpts { + // ... 기존 필드 + /// p9-fb-33: was `Option>`. Now carries discriminated + /// events so callers can distinguish retrieval / per-token / final. + pub stream_sink: Option>, +} +``` + +- internal API breaking. consumer = TUI worker + (없을 시) MCP. TUI 만 갱신. +- non-streaming consumer (`stream_sink: None`) 는 무영향. + +## Allowed / forbidden dependencies + +각 crate 기존 deps 유지. `mpsc::Sender` 는 std. 신규 dep 없음. + +- `kebab-core` 는 `StreamEvent` 정의 안 함 (도메인 type 가 wire 변환과 분리되어 있고, StreamEvent 는 pipeline 의 communication channel — kebab-rag 안 위치 적절). +- `kebab-cli` 는 wire 변환 코드 (`wire::wire_answer_event(&StreamEvent) -> Value`) 추가 — `kebab-cli/src/wire.rs` 의 기존 패턴 따라. +- UI crate (kebab-tui) 가 직접 retriever / store 호출 X — `kebab-app` facade 통과만. + +## Components + +### kebab-rag::pipeline + +- `enum StreamEvent` 신규 정의 (`pub`). +- `AskOpts.stream_sink` 타입 변경. +- `RagPipeline::ask`: + - retrieve + stale-stamp 직후 `if let Some(sink) = &opts.stream_sink { let _ = sink.send(StreamEvent::RetrievalDone { hits: hits.clone() }); }` 발사. cancel 시 즉시 break out (이때는 LLM 도 안 부름). + - token loop: `sink.send(StreamEvent::Token { delta: t, turn_index: opts.turn_index })`. SendError → cancel 분기. + - 끝에서 `Final { answer: built_answer.clone() }` 발사. +- cancel 분기: + ```rust + // p9-fb-33: SendError → caller (CLI) closed the receiver, + // probably due to BrokenPipe on stdout. Stop generation, mark + // refusal, persist partial answer. + if matches!(send_result, Err(_)) { + finish_reason = FinishReason::Cancelled; + break; + } + ``` + finish_reason = Cancelled 일 때 grounded=false + RefusalReason::LlmStreamAborted. + +### kebab-app + +- `AskOpts` re-export 만 (이미 public). `StreamEvent` 도 `pub use`. +- `App::ask` / `ask_with_session` 변경 없음 (opts 통과). + +### kebab-cli + +- `Cmd::Ask` 에 `#[arg(long)] stream: bool` 추가. +- `--stream` 분기: + ```rust + if cli.json && !stream || !cli.json && !stream { + // 기존 final-only path + } else if stream { + let (tx, rx) = std::sync::mpsc::channel::(); + let cfg2 = cfg.clone(); + let q = query.clone(); + let opts2 = AskOpts { stream_sink: Some(tx), ..opts }; + let handle = std::thread::spawn(move || { + kebab_app::ask_with_config(cfg2, &q, opts2) + }); + let mut stderr = std::io::stderr().lock(); + let mut cancelled = false; + for ev in rx { + let v = wire::wire_answer_event(&ev); + let line = serde_json::to_string(&v)?; + if let Err(e) = writeln!(stderr, "{line}") { + if e.kind() == std::io::ErrorKind::BrokenPipe { + cancelled = true; + break; + } + return Err(e.into()); + } + } + drop(stderr); + let result = handle.join().expect("ask thread panic"); + let ans = result?; + // final stdout line + let mut stdout = std::io::stdout().lock(); + let _ = writeln!(stdout, "{}", serde_json::to_string(&wire::wire_answer(&ans))?); + // cancel 또는 refusal 시 exit 1 + if !ans.grounded { return Err(RefusalSignal.into()); } + Ok(()) + } + ``` +- `wire::wire_answer_event(&StreamEvent) -> Value` 추가 — discriminated by variant, schema_version 태그. + +### kebab-tui + +- ask worker 가 받던 `Sender` → `Sender`. +- worker thread 의 receive loop: + - `StreamEvent::Token { delta, .. }` → 기존 token 누적 path 그대로. + - `StreamEvent::RetrievalDone { hits }` → minimal 안에선 ignore (citation 은 final 도착 후 표시 — fb-22 에서 살펴봄). + - `StreamEvent::Final { answer }` → 이미 `App::ask` return 으로 받으므로 무시 가능 (또는 sanity check). +- snapshot 영향 없음 (token concat 결과 동일). + +### kebab-mcp + +변경 없음. `stream_sink: None` 유지. 향후 v0.5+ 에서 rmcp progress notification 채택 검토. + +## Test plan + +| kind | description | +|------|-------------| +| unit (kebab-rag) | `StreamEvent` serde round-trip — RetrievalDone / Token / Final 각각 한 줄 ndjson | +| unit (kebab-rag) | pipeline.ask + MockLm + sink: 발사 순서 = `RetrievalDone` 1회 → `Token`* → `Final` 1회 | +| unit (kebab-rag) | sink SendError (rx drop) → LLM loop 즉시 break + Answer.refusal_reason = `LlmStreamAborted` + answers row 기록 | +| unit (kebab-rag) | RetrievalDone 의 hits 가 Final.answer.citations 의 부분집합 (LLM 이 마커 안 쓴 hit 도 RetrievalDone 에 포함) | +| 통합 (kebab-cli) | `kebab ask --stream` stderr 가 valid ndjson — schema_version/kind/ts 모두 정상 | +| 통합 (kebab-cli) | `kebab ask --stream --json` stdout 마지막 줄이 `answer.v1` 통째 | +| 통합 (kebab-cli) | `kebab ask --json` (no --stream) 동작 무변경 — stdout final-only | +| 통합 (kebab-cli) | stdout 닫힘 시뮬 (`kebab ask --stream | head -c 1`) → process 정상 종료 + answers row 의 refusal_reason = LlmStreamAborted | +| 통합 (wire-schema) | answer_event.schema.json validate — RetrievalDone/Token/Final 샘플 | +| 통합 (kebab-tui) | 기존 ask snapshot 모두 통과 (token concat 결과 동일) | + +LLM 의존: pipeline unit test 는 MockLm 활용 (이미 `crates/kebab-rag/tests/common/mod.rs` 의 `CountingLm` 패턴). CLI 통합 test 는 Ollama 필요 → `#[ignore]` gate. + +## Implementation steps (high-level) + +1. wire schema 신규 `answer_event.schema.json`. +2. `kebab-rag::pipeline::StreamEvent` enum 정의 + `AskOpts.stream_sink` 타입 변경. +3. `RagPipeline::ask`: + - RetrievalDone 발사 추가. + - token loop sink.send 의 SendError → cancel 분기. + - Final 발사 추가. +4. `kebab-app` re-exports 갱신. +5. `kebab-tui` worker 의 `Sender` → `Sender` 변환. +6. `kebab-cli`: + - `--stream` flag. + - `wire::wire_answer_event` 헬퍼. + - background thread + main thread receive loop. +7. 단위 + 통합 테스트. +8. README + SMOKE — `--stream` 사용 예시. +9. tasks/INDEX.md / spec status flip. +10. `integrations/claude-code/kebab/SKILL.md` — agent 가 ndjson stream 을 어떻게 소비하는지 한 단락. + +## Risks / notes + +- **TUI sink 타입 breaking**: 1 곳만 수정. 기존 token 누적 path 는 `StreamEvent::Token { delta, .. }` 만 매치하면 동일 동작. snapshot 영향 없음. +- **`Final` event 의 Answer clone**: streaming path 만 부담. non-streaming caller 무영향. +- **BrokenPipe vs 일반 IoError**: `io::ErrorKind::BrokenPipe` 만 cancel. 그 외는 `error.v1` stderr emit + exit 2. +- **ndjson 줄 단위**: serde_json::to_string + writeln! 충분. embedded newline 은 serde 가 escape. +- **partial markdown safety**: out of scope. agent 책임. +- **multi-turn token_index**: streaming 과 fb-15 multi-turn 의 상호작용. 새 turn 마다 streaming 재시작이 자연스러움 (`Token.turn_index` 가 각 ask 호출 단위로 일관). + +## Documentation updates (implementation PR 동시) + +- `README.md` — Quick start 또는 명령 표에 `--stream` 한 줄. +- `docs/SMOKE.md` — `kebab ask --stream` walkthrough (실행 예시 + agent 가 stderr 파싱하는 패턴 한 단락). +- `tasks/p9/p9-fb-33-streaming-ask.md` — `status: open → completed`, design/plan 링크 추가. +- `tasks/INDEX.md` — fb-33 행 ✅ 표시. +- `integrations/claude-code/kebab/SKILL.md` — `--stream` 멘션 (CLI fallback 섹션). +- `tasks/HOTFIXES.md` — internal API breaking (AskOpts.stream_sink 타입 변경) 결정 로그 (선택, 머지 후 의문 발생 시). From 0ca9b1d5c3719a70e6e01618dce81f17d5afd94a Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 14:16:42 +0900 Subject: [PATCH 02/11] plan(fb-33): streaming ask implementation plan 10 tasks: StreamEvent enum + AskOpts switch (kebab-core), pipeline emits + cancel branch (kebab-rag), kebab-app re-exports, TUI worker adapt, wire schema answer_event.v1, CLI --stream flag + ndjson stderr driver + BrokenPipe cancel, integration tests (Ollama-gated), workspace+clippy gate, docs, smoke+PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../2026-05-09-p9-fb-33-streaming-ask.md | 1360 +++++++++++++++++ 1 file changed, 1360 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md diff --git a/docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md b/docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md new file mode 100644 index 0000000..49bcedd --- /dev/null +++ b/docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md @@ -0,0 +1,1360 @@ +# p9-fb-33 — Streaming Ask Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add `kebab ask --stream` that emits ndjson `answer_event.v1` events on stderr (RetrievalDone → Token* → Final) while keeping the final stdout line as the existing `answer.v1`. Cancel via stdout close → LLM stream break + `RefusalReason::LlmStreamAborted`. + +**Architecture:** Pipeline-internal `enum StreamEvent` carries discriminated events. `AskOpts.stream_sink` switches type from `Sender` to `Sender` (internal API breaking — TUI worker is the only consumer). CLI `--stream` flag spawns a background thread that runs `ask_with_config`; main thread drains the receiver, writes ndjson to stderr, and triggers cancel via `BrokenPipe` → channel drop → pipeline `SendError` break. + +**Tech Stack:** Rust 2024, std::sync::mpsc, std::thread, time crate (RFC3339), serde, JSON Schema (answer_event.v1). + +**Spec:** `docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md` + +--- + +## File Structure + +| File | Responsibility | Action | +|------|----------------|--------| +| `crates/kebab-rag/src/pipeline.rs` | Add `enum StreamEvent`, switch `AskOpts.stream_sink` type, emit RetrievalDone/Token/Final, cancel branch on SendError | modify | +| `crates/kebab-app/src/lib.rs` | Re-export `StreamEvent` alongside existing `AskOpts` | modify | +| `crates/kebab-cli/src/main.rs` | New `--stream` flag on `Cmd::Ask`, background-thread driver, ndjson stderr writer, BrokenPipe handling | modify | +| `crates/kebab-cli/src/wire.rs` | New `wire_answer_event(&StreamEvent) -> Value` helper tagging `schema_version: "answer_event.v1"` | modify | +| `crates/kebab-tui/src/ask.rs` | Switch worker `Sender` → `Sender`; `drain_stream` matches on `Token { delta }` | modify | +| `crates/kebab-tui/src/app.rs:217` | `pub rx: Option>` → `Option>` | modify | +| `docs/wire-schema/v1/answer_event.schema.json` | NEW — discriminated ndjson schema | create | +| `crates/kebab-rag/tests/streaming_events.rs` | Unit/integration: order invariants + cancel + serde round-trip | create | +| `crates/kebab-cli/tests/wire_ask_stream.rs` | Integration: stderr ndjson + stdout final answer.v1 + BrokenPipe cancel | create | +| `crates/kebab-cli/tests/common/mod.rs` | Reuse existing helpers (`write_config_with_llm_model`, `ingest`, `backdate_updated_at`); add `run_ask_stream` if needed | modify | +| `README.md` | Quick start mention `--stream` | modify | +| `docs/SMOKE.md` | Walkthrough paragraph for streaming + cancel | modify | +| `tasks/p9/p9-fb-33-streaming-ask.md` | Status flip + design/plan links | modify | +| `tasks/INDEX.md` | fb-33 row → ✅ | modify | +| `integrations/claude-code/kebab/SKILL.md` | One-line CLI fallback note about `--stream` | modify | + +--- + +## Pre-flight + +- [ ] **Step 0.1: Branch off main** + +```bash +git checkout main +git pull +git checkout -b feat/fb-33-streaming-ask +``` + +- [ ] **Step 0.2: Confirm spec branch is reachable (or already on main)** + +```bash +git log --oneline spec/fb-33-streaming-ask -1 +``` + +Expected: shows `4949775 spec(fb-33): streaming ask (ndjson delta) — design`. If the spec PR has not yet merged into main, `git merge spec/fb-33-streaming-ask` so the spec doc lives on this branch too. + +--- + +## Task 1: Define `StreamEvent` enum + switch sink type + +**Files:** +- Modify: `crates/kebab-rag/src/pipeline.rs` + +- [ ] **Step 1.1: Write the failing serde test** + +Append to `crates/kebab-rag/src/pipeline.rs` `#[cfg(test)] mod compute_stale_mirror_tests` block (or create a new sibling `mod stream_event_serde_tests`): + +```rust +#[cfg(test)] +mod stream_event_serde_tests { + use super::*; + use kebab_core::{ + AnswerCitation, AnswerRetrievalSummary, ChunkId, ChunkerVersion, Citation, + DocumentId, IndexVersion, ModelRef, RetrievalDetail, SearchHit, SearchMode, + TokenUsage, TraceId, + }; + use kebab_core::asset::WorkspacePath; + use kebab_core::versions::PromptTemplateVersion; + use time::macros::datetime; + + fn mk_hit() -> SearchHit { + SearchHit { + rank: 1, + chunk_id: ChunkId("c1".into()), + doc_id: DocumentId("d1".into()), + doc_path: WorkspacePath::new("a.md".into()).unwrap(), + heading_path: vec!["H".into()], + section_label: None, + snippet: "s".into(), + citation: Citation::Line { + path: WorkspacePath::new("a.md".into()).unwrap(), + start: 1, + end: 1, + section: None, + }, + retrieval: RetrievalDetail { + method: SearchMode::Lexical, + fusion_score: 0.5, + lexical_score: Some(0.5), + vector_score: None, + lexical_rank: Some(1), + vector_rank: None, + }, + index_version: IndexVersion("v1".into()), + embedding_model: None, + chunker_version: ChunkerVersion("c@1".into()), + indexed_at: datetime!(2026-05-09 12:00:00 UTC), + stale: false, + } + } + + #[test] + fn stream_event_token_serializes_with_kind_discriminator() { + let ev = StreamEvent::Token { delta: "안녕".into(), turn_index: Some(0) }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!(v["kind"], "token"); + assert_eq!(v["delta"], "안녕"); + assert_eq!(v["turn_index"], 0); + } + + #[test] + fn stream_event_retrieval_done_serializes_hits() { + let ev = StreamEvent::RetrievalDone { hits: vec![mk_hit()] }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!(v["kind"], "retrieval_done"); + assert_eq!(v["hits"].as_array().unwrap().len(), 1); + } + + #[test] + fn stream_event_final_serializes_answer() { + let answer = Answer { + answer: "x".into(), + citations: vec![], + grounded: true, + refusal_reason: None, + model: ModelRef { id: "m".into(), provider: "p".into(), dimensions: None }, + embedding: None, + prompt_template_version: PromptTemplateVersion("rag-v1".into()), + retrieval: AnswerRetrievalSummary { + trace_id: TraceId("t".into()), + mode: SearchMode::Hybrid, + k: 10, score_gate: 0.3, top_score: 0.5, + chunks_returned: 1, chunks_used: 1, + }, + usage: TokenUsage { prompt_tokens: 0, completion_tokens: 0, latency_ms: 0 }, + created_at: datetime!(2026-05-09 12:00:00 UTC), + conversation_id: None, + turn_index: None, + }; + let ev = StreamEvent::Final { answer }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!(v["kind"], "final"); + assert!(v["answer"].is_object()); + } +} +``` + +- [ ] **Step 1.2: Run test — verify failure** + +```bash +cargo test -p kebab-rag --lib stream_event_serde_tests +``` + +Expected: FAIL — `cannot find type StreamEvent in scope`. + +- [ ] **Step 1.3: Define the enum + switch AskOpts.stream_sink type** + +In `crates/kebab-rag/src/pipeline.rs`, near the existing `PackedCitation` definition (around line 47-62), add: + +```rust +/// p9-fb-33: streaming events the pipeline forwards into +/// [`AskOpts::stream_sink`] when present. Discriminated on `kind` +/// to match the wire `answer_event.v1` schema. Three variants: +/// +/// - `RetrievalDone` — emitted once after retrieval + stale-stamp. +/// - `Token` — emitted per `TokenChunk::Token` from the LM. +/// - `Final` — emitted once after the full Answer is built (before +/// persistence). Always the terminal event on the success path. +/// +/// On caller-side cancel (receiver dropped), the pipeline observes +/// the `SendError` from the next `Token` send and breaks the LM +/// loop — see `RagPipeline::ask` cancel branch. In that case +/// `Final` is NOT emitted (the answer still gets persisted with +/// `RefusalReason::LlmStreamAborted`). +#[derive(Clone, Debug, serde::Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum StreamEvent { + RetrievalDone { + hits: Vec, + }, + Token { + delta: String, + turn_index: Option, + }, + Final { + answer: Answer, + }, +} +``` + +`Answer` and `SearchHit` are already imported at the top of the file. Add `serde::Serialize` import via `use serde` if not already in scope (check existing `use` statements; `serde_json` is already a dep). + +Switch `AskOpts.stream_sink` (around line 99): + +```rust + /// Optional sink: every staged event (`RetrievalDone`, `Token`, + /// `Final`) is forwarded synchronously. A dropped receiver + /// triggers cancel — see `RagPipeline::ask` for the break path. + pub stream_sink: Option>, +``` + +- [ ] **Step 1.4: Run tests — verify pass** + +```bash +cargo test -p kebab-rag --lib stream_event_serde_tests +``` + +Expected: 3 PASS. + +The rest of the workspace will fail to compile because: +- `crates/kebab-rag/src/pipeline.rs::ask` uses `sink.send(t)` where `t: String`. +- `crates/kebab-tui/src/ask.rs` declares `mpsc::channel::()` and `Receiver`. +- `crates/kebab-app/...` exposes `AskOpts` with the old type. + +That is **expected**. Tasks 2-5 fix the call sites. + +- [ ] **Step 1.5: Commit** + +```bash +git add crates/kebab-rag/src/pipeline.rs +git commit -m "$(cat <<'EOF' +feat(rag): StreamEvent enum + switch AskOpts.stream_sink (fb-33) + +3-variant discriminated enum (RetrievalDone / Token / Final). +AskOpts.stream_sink now carries StreamEvent. Other crates fail +to compile until subsequent tasks adapt their call sites. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +## Task 2: Pipeline emits RetrievalDone + Token + Final + cancel branch + +**Files:** +- Modify: `crates/kebab-rag/src/pipeline.rs` + +- [ ] **Step 2.1: Write the failing test for ordering invariant** + +Create `crates/kebab-rag/tests/streaming_events.rs`: + +```rust +//! p9-fb-33: pipeline-level streaming behavior — order invariants, +//! cancel propagation, refusal flagging. + +mod common; + +use kebab_core::{Answer, FinishReason, RefusalReason, SearchMode, TokenChunk, TokenUsage}; +use kebab_rag::{AskOpts, RagPipeline, StreamEvent}; +use std::sync::mpsc; + +#[test] +fn ask_emits_retrieval_then_tokens_then_final() { + let env = common::RagEnv::new(); + env.seed_one_doc("a.md", "apples are red."); + let (tx, rx) = mpsc::channel::(); + let opts = AskOpts { + k: 3, + explain: false, + mode: SearchMode::Lexical, + temperature: None, + seed: None, + stream_sink: Some(tx), + history: vec![], + conversation_id: None, + turn_index: None, + }; + let _ans = env.pipeline().ask("apples", opts).unwrap(); + let events: Vec = rx.iter().collect(); + + // First event must be RetrievalDone. + assert!(matches!(events.first(), Some(StreamEvent::RetrievalDone { .. })), + "first event must be RetrievalDone, got {:?}", events.first()); + + // Last event must be Final. + assert!(matches!(events.last(), Some(StreamEvent::Final { .. })), + "last event must be Final, got {:?}", events.last()); + + // Everything in between is Token. + for ev in &events[1..events.len() - 1] { + assert!(matches!(ev, StreamEvent::Token { .. }), + "middle events must be Token, got {:?}", ev); + } +} +``` + +`common::RagEnv` and `seed_one_doc` already exist in `crates/kebab-rag/tests/common/mod.rs` (Task 7's `mk_hit_with_indexed_at` plus the existing `RagEnv` scaffold from earlier tests). Reuse them. + +If the test scaffold's existing `MockRetriever` / `CountingLm` doesn't trigger the LLM-citation path naturally for the seeded text, adapt — the goal is just to drive a non-empty token stream past the score-gate. Look at existing `kebab-rag/tests/pipeline.rs` (`grounded_citations_inherit_indexed_at_and_stale_from_hit`) for a working setup. + +- [ ] **Step 2.2: Run test — verify it fails** + +```bash +cargo test -p kebab-rag --test streaming_events ask_emits_retrieval_then_tokens_then_final +``` + +Expected: FAIL — pipeline currently sends `String` and the test gets a `mpsc::SendError` on type mismatch (compile error) or `events` only contains tokens (no RetrievalDone, no Final). + +- [ ] **Step 2.3: Add RetrievalDone emit after retrieval + stale-stamp** + +In `crates/kebab-rag/src/pipeline.rs::ask`, immediately AFTER the staleness stamping loop (around line 205, after `for h in &mut hits { h.stale = ... }`): + +```rust + // p9-fb-33: emit retrieval_done as soon as the hit list is + // ready (post stale-stamp so consumers see the same `stale` + // values the App-level wire path emits). Cancel is best-effort + // here — if the caller already dropped the receiver we just + // skip and let the LLM-loop SendError handle it consistently. + if let Some(sink) = &opts.stream_sink { + let _ = sink.send(StreamEvent::RetrievalDone { + hits: hits.clone(), + }); + } +``` + +- [ ] **Step 2.4: Switch token send to StreamEvent::Token + add cancel branch** + +Replace the existing token loop body (around lines 304-325). The current code is: + +```rust + for item in stream { + let chunk = item.context("kb-rag: stream item")?; + match chunk { + TokenChunk::Token(t) => { + acc.push_str(&t); + if let Some(sink) = &opts.stream_sink { + let _ = sink.send(t); + } + } + TokenChunk::Done { + finish_reason: fr, + usage: u, + } => { + finish_reason = fr; + usage = u; + break; + } + } + } +``` + +Replace with: + +```rust + let mut cancelled = false; + for item in stream { + let chunk = item.context("kb-rag: stream item")?; + match chunk { + TokenChunk::Token(t) => { + acc.push_str(&t); + if let Some(sink) = &opts.stream_sink { + // p9-fb-33: SendError → caller dropped the + // receiver (probably a closed stdout downstream). + // Stop generation, mark the answer cancelled so + // the persistence path records refusal_reason = + // LlmStreamAborted. + if sink + .send(StreamEvent::Token { + delta: t, + turn_index: opts.turn_index, + }) + .is_err() + { + cancelled = true; + break; + } + } + } + TokenChunk::Done { + finish_reason: fr, + usage: u, + } => { + finish_reason = fr; + usage = u; + break; + } + } + } + if cancelled { + finish_reason = FinishReason::Cancelled; + } +``` + +`FinishReason::Cancelled` should already exist (it's used for `LlmStreamAborted` per the spec). If it doesn't: + +```bash +grep -n "Cancelled\|FinishReason" crates/kebab-core/src/answer.rs crates/kebab-core/src/llm.rs 2>/dev/null +``` + +If absent in the existing enum, add it to the `FinishReason` enum in `kebab-core` (likely `crates/kebab-core/src/llm.rs`): + +```rust +pub enum FinishReason { + Stop, + Length, + Cancelled, // p9-fb-33 +} +``` + +- [ ] **Step 2.5: Honor cancel in refusal logic + emit Final on success** + +After the existing grounded/refusal computation block (around lines 348-359), prepend a cancel check: + +```rust + // p9-fb-33: cancel takes priority over LlmSelfJudge — the + // caller bailed mid-stream, so the recorded reason should + // reflect that, not "model didn't cite". + let (grounded, refusal_reason) = if matches!(finish_reason, FinishReason::Cancelled) { + (false, Some(RefusalReason::LlmStreamAborted)) + } else if grounded { + (grounded, None) + } else { + (grounded, Some(RefusalReason::LlmSelfJudge)) + }; +``` + +(The existing `let grounded = ...; let refusal_reason = ...` block becomes dead code — delete those two `let` bindings and replace with the tuple destructure above. Keep the existing `let cited_set` and downstream logic.) + +After the `Answer { ... }` literal is built (around line 422), and BEFORE the persistence step (line 437), emit Final ONLY when the run wasn't cancelled: + +```rust + // p9-fb-33: emit final on the success path. On cancel we + // skip Final — the receiver is gone and persistence still + // records the partial answer below. + if !cancelled + && let Some(sink) = &opts.stream_sink + { + let _ = sink.send(StreamEvent::Final { + answer: answer.clone(), + }); + } +``` + +(`answer.clone()` is the price of streaming. Non-streaming callers pay nothing — `opts.stream_sink` is `None` and the `if let` short-circuits.) + +- [ ] **Step 2.6: Run test — verify pass** + +```bash +cargo test -p kebab-rag --test streaming_events ask_emits_retrieval_then_tokens_then_final +``` + +Expected: PASS. + +- [ ] **Step 2.7: Add cancel-propagation test** + +Append to `crates/kebab-rag/tests/streaming_events.rs`: + +```rust +#[test] +fn ask_records_llm_stream_aborted_when_receiver_drops() { + let env = common::RagEnv::new(); + env.seed_one_doc("a.md", "apples are red."); + let (tx, rx) = mpsc::channel::(); + let opts = AskOpts { + k: 3, + explain: false, + mode: SearchMode::Lexical, + temperature: None, + seed: None, + stream_sink: Some(tx), + history: vec![], + conversation_id: None, + turn_index: None, + }; + // Drop the receiver immediately so the first Token send fails. + drop(rx); + let ans = env.pipeline().ask("apples", opts).unwrap(); + assert!(!ans.grounded); + assert_eq!(ans.refusal_reason, Some(RefusalReason::LlmStreamAborted)); +} +``` + +- [ ] **Step 2.8: Run cancel test — verify pass** + +```bash +cargo test -p kebab-rag --test streaming_events ask_records_llm_stream_aborted +``` + +Expected: PASS. + +- [ ] **Step 2.9: Run full kebab-rag suite** + +```bash +cargo test -p kebab-rag +``` + +Expected: all PASS. Existing pipeline tests should still pass — they don't pass a `stream_sink`, so the new emit code is a no-op for them. + +- [ ] **Step 2.10: Commit** + +```bash +git add crates/kebab-rag/src/pipeline.rs crates/kebab-rag/tests/streaming_events.rs crates/kebab-core/src/llm.rs +git commit -m "$(cat <<'EOF' +feat(rag): pipeline emits StreamEvent + cancel on SendError (fb-33) + +RetrievalDone after retrieve+stale-stamp, Token per LM chunk +(SendError → break, FinishReason::Cancelled, RefusalReason:: +LlmStreamAborted), Final on success. answers row still persists +on cancel for audit. Adds FinishReason::Cancelled if absent. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +## Task 3: kebab-app re-exports + adapt TUI worker + +**Files:** +- Modify: `crates/kebab-app/src/lib.rs` +- Modify: `crates/kebab-tui/src/app.rs` +- Modify: `crates/kebab-tui/src/ask.rs` + +- [ ] **Step 3.1: Add `pub use` for StreamEvent in kebab-app** + +In `crates/kebab-app/src/lib.rs`, find the existing `pub use kebab_rag::AskOpts` (or equivalent re-export) and append: + +```rust +pub use kebab_rag::{AskOpts, StreamEvent}; +``` + +If the existing line already covers `AskOpts`, just add `StreamEvent` to that brace list. + +- [ ] **Step 3.2: Switch TUI Receiver type** + +Edit `crates/kebab-tui/src/app.rs:217`: + +```diff +- pub rx: Option>, ++ pub rx: Option>, +``` + +- [ ] **Step 3.3: Switch worker channel type** + +Edit `crates/kebab-tui/src/ask.rs::spawn_ask_worker` (around line 486): + +```diff +- let (tx, rx) = mpsc::channel::(); ++ let (tx, rx) = mpsc::channel::(); +``` + +- [ ] **Step 3.4: Update drain_stream to match Token only** + +Edit `crates/kebab-tui/src/ask.rs::drain_stream` (around line 542): + +```rust +pub(crate) fn drain_stream(state: &mut App) { + let Some(s) = state.ask.as_mut() else { return }; + if let Some(rx) = &s.rx { + for ev in rx.try_iter() { + match ev { + kebab_app::StreamEvent::Token { delta, .. } => { + s.partial.push_str(&delta); + } + // p9-fb-33: TUI ignores RetrievalDone (citation + // panel renders after completion via `last_answer`) + // and Final (the worker thread's join already + // delivers the canonical Answer in poll_worker). + kebab_app::StreamEvent::RetrievalDone { .. } + | kebab_app::StreamEvent::Final { .. } => {} + } + } + } +} +``` + +- [ ] **Step 3.5: Build the TUI crate** + +```bash +cargo build -p kebab-tui +``` + +Expected: clean build. If there are leftover `Receiver` references in test code, fix them — same `StreamEvent` swap. + +- [ ] **Step 3.6: Run TUI test suite** + +```bash +cargo test -p kebab-tui +``` + +Expected: all PASS. Existing snapshot/string assertion tests check rendered output (Q/A blocks, citations) — token concat behavior is unchanged, so output is byte-identical. + +If a test directly constructs `mpsc::channel::()` for `pub rx` (e.g. a unit test that injects fake tokens), it needs the same swap. Adjust each call site to send `StreamEvent::Token { delta: "...".into(), turn_index: None }` instead of bare strings. + +- [ ] **Step 3.7: Commit** + +```bash +git add crates/kebab-app/src/lib.rs crates/kebab-tui/ +git commit -m "$(cat <<'EOF' +feat(tui): adapt ask worker to StreamEvent sink (fb-33) + +Worker channel now carries kebab_app::StreamEvent. drain_stream +matches on Token { delta }; RetrievalDone and Final are ignored +(citations render from last_answer, Final is redundant with +worker join). app::AskState.rx type widened to match. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +## Task 4: Wire schema — `answer_event.v1` + +**Files:** +- Create: `docs/wire-schema/v1/answer_event.schema.json` + +- [ ] **Step 4.1: Write the schema file** + +Create `docs/wire-schema/v1/answer_event.schema.json`: + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://kb.local/wire/v1/answer_event.schema.json", + "title": "AnswerEvent v1", + "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. Terminal: `final`. Final stdout line is `answer.v1` for backwards compat (see ingest_progress.v1 precedent).", + "type": "object", + "required": ["schema_version", "kind", "ts"], + "properties": { + "schema_version": { "const": "answer_event.v1" }, + "kind": { "enum": ["retrieval_done", "token", "final"] }, + "ts": { "type": "string", "format": "date-time" }, + "hits": { "type": "array", "description": "retrieval_done: search_hit.v1[]" }, + "delta": { "type": "string", "description": "token: incremental string chunk" }, + "turn_index": { "type": ["integer", "null"], "minimum": 0, "description": "token: matches Answer.turn_index" }, + "answer": { "type": "object", "description": "final: complete answer.v1 payload" } + } +} +``` + +- [ ] **Step 4.2: Verify the schema file is valid JSON** + +```bash +python3 -c "import json; json.load(open('docs/wire-schema/v1/answer_event.schema.json'))" +``` + +Expected: silent success. + +- [ ] **Step 4.3: Commit** + +```bash +git add docs/wire-schema/v1/answer_event.schema.json +git commit -m "$(cat <<'EOF' +feat(wire): answer_event.v1 schema (fb-33) + +Discriminated ndjson event for `kebab ask --stream`. Mirrors +the ingest_progress.v1 pattern (stderr stream + stdout final +answer.v1 for backwards compat). + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +## Task 5: CLI `--stream` flag + wire helper + background-thread driver + +**Files:** +- Modify: `crates/kebab-cli/src/wire.rs` +- Modify: `crates/kebab-cli/src/main.rs` + +- [ ] **Step 5.1: Find the existing wire DTO pattern** + +```bash +grep -n "tag_object\|wire_answer\|wire_search_hit\|schema_version" crates/kebab-cli/src/wire.rs | head -20 +``` + +The existing pattern uses a `tag_object(value, schema_version)` helper. Our new helper follows the same shape. + +- [ ] **Step 5.2: Add `wire_answer_event` helper** + +Edit `crates/kebab-cli/src/wire.rs`. Append after the existing `wire_answer` function: + +```rust +/// p9-fb-33: tag a `StreamEvent` as `answer_event.v1` ndjson. The +/// timestamp is added at emit time (caller fills `ts`), since the +/// pipeline doesn't carry one in the in-process enum. +pub fn wire_answer_event(ev: &kebab_app::StreamEvent, ts: time::OffsetDateTime) -> Value { + let mut v = serde_json::to_value(ev).expect("StreamEvent serializes"); + let ts_str = ts + .format(&time::format_description::well_known::Rfc3339) + .expect("OffsetDateTime formats as RFC3339"); + if let Value::Object(ref mut map) = v { + map.insert("ts".to_string(), Value::String(ts_str)); + } + tag_object(v, "answer_event.v1") +} +``` + +`time` is already a dep on `kebab-cli` from fb-32. If not, add it to `[dependencies]`: + +```bash +grep -n "^time" crates/kebab-cli/Cargo.toml +``` + +If absent: `time = { workspace = true, features = ["formatting", "macros"] }`. + +- [ ] **Step 5.3: Add the `--stream` clap flag** + +Edit `crates/kebab-cli/src/main.rs` `Cmd::Ask` variant struct definition: + +```bash +grep -n "Ask {" crates/kebab-cli/src/main.rs | head -5 +``` + +Find the `Ask { .. }` enum variant (the clap subcommand definition, with fields like `query`, `k`, `mode`, `explain`, etc.). Add: + +```rust + /// p9-fb-33: emit ndjson answer_event.v1 events on stderr + /// while streaming. Final stdout line is the existing + /// answer.v1. Off by default to preserve final-only behavior. + #[arg(long)] + stream: bool, +``` + +Update the `Cmd::Ask { ... }` destructure binding inside the match arm (around line 571 — `Cmd::Ask { query, k, mode, explain, ..., session }`) to include `stream`. + +- [ ] **Step 5.4: Implement the stream branch** + +Replace the existing `Cmd::Ask` match arm body. The current body (lines 571-630) has a single non-streaming path. Add a `--stream` branch: + +```rust + Cmd::Ask { + query, + k, + mode, + explain, + temperature, + seed, + show_citations, + hide_citations, + session, + stream, + } => { + let cfg = kebab_config::Config::load(cli.config.as_deref())?; + if *stream { + use std::sync::mpsc; + let (tx, rx) = mpsc::channel::(); + let opts = kebab_app::AskOpts { + k: *k, + explain: *explain, + mode: (*mode).into(), + temperature: *temperature, + seed: *seed, + stream_sink: Some(tx), + history: Vec::new(), + conversation_id: None, + turn_index: None, + }; + let cfg2 = cfg.clone(); + let q = query.clone(); + let session2 = session.clone(); + let handle = std::thread::spawn(move || -> anyhow::Result { + match session2.as_deref() { + Some(sid) => kebab_app::ask_with_session_with_config(cfg2, sid, &q, opts), + None => kebab_app::ask_with_config(cfg2, &q, opts), + } + }); + + // Drain receiver, write ndjson to stderr until completion + // or BrokenPipe. Drop rx on BrokenPipe so the worker's + // send returns SendError and the pipeline cancels. + let mut cancelled_pipe = false; + { + let mut stderr = std::io::stderr().lock(); + for ev in &rx { + let now = time::OffsetDateTime::now_utc(); + let v = wire::wire_answer_event(&ev, now); + let line = serde_json::to_string(&v)?; + if let Err(e) = writeln!(stderr, "{line}") { + if e.kind() == std::io::ErrorKind::BrokenPipe { + cancelled_pipe = true; + break; + } + return Err(e.into()); + } + } + } + if cancelled_pipe { + drop(rx); // signal to worker — next send returns SendError + } + + let result = handle + .join() + .map_err(|_| anyhow::anyhow!("ask worker panicked"))?; + let ans = result?; + + // Final stdout line — answer.v1 for backwards compat. + // BrokenPipe on stdout is silent (caller already gone). + let final_json = serde_json::to_string(&wire::wire_answer(&ans))?; + let _ = writeln!(std::io::stdout().lock(), "{final_json}"); + + if !ans.grounded { + return Err(RefusalSignal.into()); + } + Ok(()) + } else { + // Existing non-streaming path — unchanged from + // lines 583-629 in the prior version. + let opts = kebab_app::AskOpts { + k: *k, + explain: *explain, + mode: (*mode).into(), + temperature: *temperature, + seed: *seed, + stream_sink: None, + history: Vec::new(), + conversation_id: None, + turn_index: None, + }; + let ans = match session.as_deref() { + Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?, + None => kebab_app::ask_with_config(cfg, query, opts)?, + }; + if cli.json { + println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?); + } else { + println!("{}", ans.answer); + let print_citations = *show_citations && !*hide_citations; + if print_citations && !ans.citations.is_empty() { + use std::io::IsTerminal; + let color = std::io::stdout().is_terminal(); + let mut out = std::io::stdout().lock(); + render_ask_plain_citations(&mut out, &ans, color)?; + } + } + if !ans.grounded { + return Err(RefusalSignal.into()); + } + Ok(()) + } + } +``` + +`writeln!` on stderr's `MutexGuard` requires `std::io::Write` in scope — verify the existing imports include it (most CLI files do). + +- [ ] **Step 5.5: Build the CLI** + +```bash +cargo build -p kebab-cli +``` + +Expected: clean build. If `kebab_core::Answer` isn't in scope of the spawn closure return type, the inferred return is fine — the explicit `-> anyhow::Result` annotation covers it. If `kebab_core` isn't a dep of `kebab-cli`, swap the annotation to whatever path resolves (`kebab_app::Answer` if it re-exports, or just elide with `-> anyhow::Result<_>`). + +```bash +grep -n "^kebab-core\|^kebab_core" crates/kebab-cli/Cargo.toml +``` + +If `kebab-core` is missing, use `kebab_app::Answer`: + +```bash +grep -n "pub use.*Answer" crates/kebab-app/src/lib.rs +``` + +If not re-exported, add `pub use kebab_core::Answer;` to `crates/kebab-app/src/lib.rs` near the existing `pub use kebab_rag::{AskOpts, StreamEvent};`. + +- [ ] **Step 5.6: Smoke-test the CLI flag (skipped on no-Ollama)** + +```bash +kebab --help 2>&1 | grep -A2 "ask" +kebab ask --help 2>&1 | grep -A1 stream +``` + +Expected: `--stream` appears in `ask` subcommand help. + +- [ ] **Step 5.7: Commit** + +```bash +git add crates/kebab-cli/src/wire.rs crates/kebab-cli/src/main.rs crates/kebab-cli/Cargo.toml crates/kebab-app/src/lib.rs Cargo.lock +git commit -m "$(cat <<'EOF' +feat(cli): kebab ask --stream emits ndjson on stderr (fb-33) + +Background-thread driver runs ask_with_config; main thread +drains the receiver, serializes each StreamEvent to ndjson on +stderr. BrokenPipe → drop receiver → pipeline SendError → +cancel + LlmStreamAborted refusal. Final stdout line is the +existing answer.v1 (ingest_progress.v1 backwards-compat +pattern). + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +## Task 6: CLI integration tests + +**Files:** +- Create: `crates/kebab-cli/tests/wire_ask_stream.rs` +- Modify: `crates/kebab-cli/tests/common/mod.rs` + +- [ ] **Step 6.1: Inspect existing common helpers** + +```bash +sed -n '1,40p' crates/kebab-cli/tests/common/mod.rs +``` + +The existing `common::run_ask_lexical(env, query, json: bool)` (or equivalent) is the pattern. We need a `--stream` variant. + +- [ ] **Step 6.2: Add `run_ask_stream` helper** + +Append to `crates/kebab-cli/tests/common/mod.rs`: + +```rust +/// p9-fb-33: invoke `kebab ask --stream`, capturing stdout + stderr. +/// Returns (stdout, stderr). +pub fn run_ask_stream(env: &TestEnv, query: &str) -> (String, String) { + let exe = env!("CARGO_BIN_EXE_kebab"); + let out = std::process::Command::new(exe) + .args(["--config", env.config_path(), "ask", "--stream", "--mode", "lexical", query]) + .output() + .expect("kebab ask --stream"); + ( + String::from_utf8_lossy(&out.stdout).to_string(), + String::from_utf8_lossy(&out.stderr).to_string(), + ) +} +``` + +Adapt to whatever helper signature `run_ask_lexical` uses — match the same idiom (e.g., if existing helpers take `&CliEnv` and return a struct with stdout/stderr, mirror that). + +- [ ] **Step 6.3: Write the integration tests** + +Create `crates/kebab-cli/tests/wire_ask_stream.rs`: + +```rust +//! p9-fb-33: CLI streaming surface — stderr ndjson + stdout final answer.v1. + +mod common; + +use serde_json::Value; + +#[test] +#[ignore = "requires real Ollama (matches sibling ask integration tests)"] +fn stream_emits_ndjson_events_on_stderr() { + let env = common::CliEnv::new_with_llm_model("gemma4:e4b"); + common::ingest(&env, "a.md", "# T\n\nrust ownership is a memory model.\n"); + let (stdout, stderr) = common::run_ask_stream(&env, "what is rust ownership"); + + // stderr: every line should parse as JSON with schema_version + // == "answer_event.v1" and a recognized kind. + let mut kinds: Vec = vec![]; + for line in stderr.lines() { + if line.trim().is_empty() { + continue; + } + let v: Value = serde_json::from_str(line) + .unwrap_or_else(|e| panic!("non-JSON stderr line: {line:?}: {e}")); + assert_eq!(v["schema_version"], "answer_event.v1"); + let kind = v["kind"].as_str().expect("kind").to_string(); + assert!( + matches!(kind.as_str(), "retrieval_done" | "token" | "final"), + "unexpected kind: {kind}" + ); + assert!(v["ts"].is_string(), "ts must be RFC3339 string"); + kinds.push(kind); + } + + // First event must be retrieval_done. Last must be final. + assert_eq!(kinds.first().map(String::as_str), Some("retrieval_done")); + assert_eq!(kinds.last().map(String::as_str), Some("final")); + + // stdout: last line is answer.v1. + let final_line = stdout.lines().last().expect("stdout has at least one line"); + let answer: Value = serde_json::from_str(final_line).expect("stdout final = answer.v1"); + assert_eq!(answer["schema_version"], "answer.v1"); +} + +#[test] +#[ignore = "requires real Ollama"] +fn non_stream_path_unchanged() { + let env = common::CliEnv::new_with_llm_model("gemma4:e4b"); + common::ingest(&env, "a.md", "# T\n\nrust ownership is a memory model.\n"); + let stdout = common::run_ask_json(&env, "what is rust ownership"); // existing helper + let v: Value = serde_json::from_str(&stdout).expect("answer.v1"); + assert_eq!(v["schema_version"], "answer.v1"); +} +``` + +`common::run_ask_json` already exists from fb-32's wire test scaffold. If the parameter / return shape differs from what's shown, adjust. + +- [ ] **Step 6.4: Run new tests (with Ollama available)** + +```bash +cargo test -p kebab-cli --test wire_ask_stream -- --ignored +``` + +Expected: 2 PASS (when Ollama is running locally and `gemma4:e4b` is pulled). Without Ollama, the tests stay ignored — sibling fb-32 integration tests follow the same gate. + +- [ ] **Step 6.5: Verify the non-ignored CLI suite still passes** + +```bash +cargo test -p kebab-cli +``` + +Expected: all PASS, ignored count includes the two new tests. + +- [ ] **Step 6.6: Commit** + +```bash +git add crates/kebab-cli/tests/ +git commit -m "$(cat <<'EOF' +test(cli): wire_ask_stream — stderr ndjson + stdout final answer.v1 (fb-33) + +Two Ollama-gated integration tests verifying: +- stderr lines parse as answer_event.v1, first=retrieval_done, + last=final, all carry RFC3339 ts. +- stdout final line is answer.v1 (backwards compat). +- non-stream path (--json without --stream) unchanged. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +## Task 7: BrokenPipe cancel integration test + +**Files:** +- Modify: `crates/kebab-cli/tests/wire_ask_stream.rs` + +The shell-level `head -c 1` simulation is brittle in cargo test. Use a more direct test: pipe stderr through a writer that fails after N bytes. + +- [ ] **Step 7.1: Add the cancel test (Ollama-gated)** + +Append to `crates/kebab-cli/tests/wire_ask_stream.rs`: + +```rust +#[test] +#[ignore = "requires real Ollama + writes to a closed pipe"] +fn stream_cancels_when_stderr_closes() { + use std::io::{BufRead, BufReader}; + use std::process::{Command, Stdio}; + + let env = common::CliEnv::new_with_llm_model("gemma4:e4b"); + common::ingest(&env, "a.md", "# T\n\nrust ownership is a memory model. it tracks lifetimes.\n"); + + // Spawn `kebab ask --stream`. Read stderr line-by-line, then + // immediately drop the stderr reader after the first line (which + // is retrieval_done). Pipeline should detect SendError and break. + let exe = env!("CARGO_BIN_EXE_kebab"); + let mut child = Command::new(exe) + .args([ + "--config", env.config_path(), + "ask", "--stream", "--mode", "lexical", + "tell me about rust ownership", + ]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("spawn kebab"); + + { + let stderr = child.stderr.take().expect("stderr piped"); + let mut reader = BufReader::new(stderr); + let mut first = String::new(); + reader.read_line(&mut first).expect("read first stderr line"); + assert!( + first.contains("\"kind\":\"retrieval_done\""), + "first event must be retrieval_done, got {first:?}" + ); + // Drop the reader → child's stderr write end will see SIGPIPE + // on the next write → main thread gets BrokenPipe → drops rx → + // worker's pipeline.send returns SendError → cancel. + } + + // Process should still terminate cleanly within reasonable time. + let status = child + .wait() + .expect("child completes after cancel"); + // Refusal exits with code 1 (RefusalSignal). Don't assert exact + // code — different OSes report SIGPIPE differently. Assert just + // that the process didn't hang. + let _ = status; +} +``` + +This is the closest portable approximation of the BrokenPipe scenario without spawning a subprocess that pipes through `head`. The test verifies the process terminates instead of hanging — that's the key invariant. + +- [ ] **Step 7.2: Run the cancel test (with Ollama)** + +```bash +cargo test -p kebab-cli --test wire_ask_stream stream_cancels_when_stderr_closes -- --ignored +``` + +Expected: PASS — process exits within `child.wait()` instead of blocking. + +- [ ] **Step 7.3: Commit** + +```bash +git add crates/kebab-cli/tests/wire_ask_stream.rs +git commit -m "$(cat <<'EOF' +test(cli): BrokenPipe stderr → ask --stream terminates cleanly (fb-33) + +Spawn the binary, read first stderr line (retrieval_done), drop +the reader. Pipeline's next Token send returns SendError, cancel +branch fires, child.wait() returns instead of blocking forever. +Ollama-gated. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +## Task 8: Workspace test + clippy gate + +- [ ] **Step 8.1: Run full workspace test** + +```bash +cargo test --workspace --no-fail-fast -j 1 2>&1 | tail -50 +``` + +Expected: all PASS. Snapshot tests in other crates should be unaffected — `StreamEvent` is internal API and the wire emit happens only on `--stream`. + +- [ ] **Step 8.2: Clippy gate** + +```bash +cargo clippy --workspace --all-targets -- -D warnings +``` + +Expected: clean. Common new warnings to watch for: +- `clippy::large_enum_variant` on `StreamEvent` (Final::answer is large). If reported, wrap in `Box`: `Final { answer: Box }`. Update emit + match sites. +- `clippy::needless_borrow` on `&rx` iteration — adapt as flagged. + +- [ ] **Step 8.3: Commit if clippy required fixes** + +```bash +git add -A +git commit -m "chore: clippy fixes for fb-33" +``` + +(Skip this commit if no fixes were needed.) + +--- + +## Task 9: Documentation updates + +**Files:** +- Modify: `README.md` +- Modify: `docs/SMOKE.md` +- Modify: `tasks/p9/p9-fb-33-streaming-ask.md` +- Modify: `tasks/INDEX.md` +- Modify: `integrations/claude-code/kebab/SKILL.md` + +- [ ] **Step 9.1: README — Quick start mention** + +Find the existing `## 명령` table or quick-start block: + +```bash +grep -n "kebab ask\|## 명령\|Quick start" README.md | head -10 +``` + +Add a row or paragraph noting `--stream`: + +```markdown +| `kebab ask "..." --stream` | RAG 답변을 ndjson event 로 stderr 에 stream — agent token 즉시 소비 (fb-33) | +``` + +Or, if README format prefers prose, append one short line under the existing `kebab ask` description. + +- [ ] **Step 9.2: SMOKE.md — walkthrough** + +After the existing ask section, append: + +```markdown +### Streaming ask (fb-33) + +```bash +kebab ask "what is rust ownership" --stream 2> events.ndjson > final.json +``` + +stderr 의 events.ndjson 은 한 줄 = 한 event 의 ndjson — `retrieval_done` 한 번, `token` 여러 번, `final` 한 번. final.json 은 기존 `answer.v1` 그대로. + +agent 가 `head -c 1` 로 stderr 를 닫으면 pipeline 이 LLM stream 을 즉시 중단하고 `RefusalReason::LlmStreamAborted` 로 partial answer 를 `answers` 테이블에 기록한다. +``` + +- [ ] **Step 9.3: Task spec status flip** + +Edit `tasks/p9/p9-fb-33-streaming-ask.md`: + +```diff + --- +-status: open ++status: completed + target_version: 0.5.0 +``` + +Replace the `> ⏳ **백로그 only — 미구현.**` block (around line 14): + +```markdown +상세 설계: `docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md`. +구현 계획: `docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md`. +``` + +- [ ] **Step 9.4: tasks/INDEX.md — fb-33 row** + +```diff +- - [p9-fb-33 streaming ask (ndjson delta)](p9/p9-fb-33-streaming-ask.md) — ⏳ 미구현, brainstorm 필요 ++ - [p9-fb-33 streaming ask (ndjson delta)](p9/p9-fb-33-streaming-ask.md) — ✅ 머지 + v0.5.0 cut 후보 (2026-05-09) +``` + +- [ ] **Step 9.5: Skill — CLI fallback note** + +Edit `integrations/claude-code/kebab/SKILL.md`. Find the "CLI fallback" or equivalent section. Append: + +```markdown +- `kebab ask --stream`: ndjson `answer_event.v1` events on stderr (`retrieval_done` → `token`* → `final`), plus the existing `answer.v1` as the final stdout line. Use when you need progressive token consumption; otherwise the default non-streaming path is simpler. +``` + +- [ ] **Step 9.6: Commit docs** + +```bash +git add README.md docs/SMOKE.md tasks/p9/p9-fb-33-streaming-ask.md tasks/INDEX.md integrations/claude-code/kebab/SKILL.md +git commit -m "$(cat <<'EOF' +docs(fb-33): README + SMOKE + INDEX + skill notes + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +## Task 10: Smoke + push + PR + +- [ ] **Step 10.1: Manual smoke** + +```bash +cd /tmp/kebab-smoke # the existing SMOKE.md scratch dir +~/Workspace/projects/kebab/target/release/kebab --config /tmp/kebab-smoke/config.toml ingest +~/Workspace/projects/kebab/target/release/kebab --config /tmp/kebab-smoke/config.toml ask "test" --stream 2>events.ndjson >final.json +head -1 events.ndjson +tail -1 events.ndjson +cat final.json | jq .schema_version +``` + +Expected: +- first event line includes `"kind":"retrieval_done"` +- last event line includes `"kind":"final"` +- `final.json` contains `"schema_version":"answer.v1"` + +- [ ] **Step 10.2: Final workspace test** + +```bash +cd ~/Workspace/projects/kebab +cargo test --workspace --no-fail-fast -j 1 +``` + +Expected: all green. + +- [ ] **Step 10.3: Push branch** + +```bash +git push -u origin feat/fb-33-streaming-ask +``` + +- [ ] **Step 10.4: Open PR via gitea-pr** + +Build the PR body file at `/tmp/fb33-pr-body.md`: + +```markdown +## Summary + +- adds `kebab ask --stream` emitting `answer_event.v1` ndjson events on stderr (`retrieval_done` → `token`* → `final`), final stdout line stays `answer.v1` for backwards compat +- internal API: `AskOpts.stream_sink` now carries discriminated `StreamEvent` instead of bare `String`; TUI worker adapted +- cancel: stdout/stderr close → BrokenPipe → drop receiver → pipeline `SendError` → LLM loop break + `RefusalReason::LlmStreamAborted` +- MCP `kebab__ask` streaming deferred to v0.5+ (rmcp progress notifications need verification first) + +## Test plan + +- [x] `cargo test --workspace --no-fail-fast -j 1` — green +- [x] `cargo clippy --workspace --all-targets -- -D warnings` — clean +- [x] new tests: pipeline order invariant + cancel propagation (kebab-rag), `wire_ask_stream` ndjson shape + stdout final + BrokenPipe cancel (kebab-cli, Ollama-gated) +- [x] manual smoke per `docs/SMOKE.md` "Streaming ask" walkthrough + +## Architectural notes + +- `RetrievalDone` includes the retrieval-stale-stamp result so consumers see the same `stale` values the App-level wire path emits. +- `Final` event mirrors the canonical Answer; TUI worker ignores it (worker join already delivers Answer). +- `StreamEvent` lives in `kebab-rag` to keep the type adjacent to the pipeline that emits it; `kebab-app` re-exports for downstream consumers. + +## Files of interest + +- spec: `docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md` +- plan: `docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md` +- pipeline: `crates/kebab-rag/src/pipeline.rs` (StreamEvent + emit + cancel) +- CLI: `crates/kebab-cli/src/main.rs` (Cmd::Ask --stream branch), `crates/kebab-cli/src/wire.rs` (wire_answer_event) +- wire: `docs/wire-schema/v1/answer_event.schema.json` +- TUI: `crates/kebab-tui/src/ask.rs` (drain_stream match) +``` + +Then open: + +```bash +/Users/user/.claude/skills/gitea-ops/bin/gitea-pr \ + --title "feat(fb-33): streaming ask (ndjson delta)" \ + --body "$(cat /tmp/fb33-pr-body.md)" \ + --head feat/fb-33-streaming-ask \ + --base main +``` + +Capture the returned PR URL. + +--- + +## Self-review checklist + +- **Spec coverage:** + - §Behavior contract / event taxonomy → Tasks 1, 2 (StreamEvent + emit positions) + - §CLI flag → Task 5 (`--stream`) + - §Output stream (stderr ndjson + stdout final) → Task 5 + Task 6 tests + - §Cancel semantics → Task 2 (SendError branch) + Task 7 (BrokenPipe integration test) + - §Wire schema → Task 4 (`answer_event.schema.json`) + - §Domain API change → Tasks 1, 3 (AskOpts + TUI adapt) + - §Components (kebab-rag/app/cli/tui) → Tasks 1-5 + - §Test plan → Tasks 2, 6, 7 cover unit (serde + ordering + cancel) + integration (CLI ndjson, BrokenPipe) + - §Documentation → Task 9 + - §Risks (BrokenPipe vs IoError, ndjson line-unit, partial markdown) → addressed in Task 5 (only `BrokenPipe` triggers cancel; other IoError fatal) + +- **Placeholder scan:** + - "adapt to existing scaffold" appears in Tasks 2, 6 — these instruct mirroring of existing test infrastructure (RagEnv, CliEnv) rather than inventing new helpers. + - "if absent, add it" in Task 5 (Cargo.toml `time` dep, kebab-core re-export) — concrete fallback paths spelled out, not deferred. + - No TODO / "fill in" / "later" remaining. + +- **Type consistency:** + - `StreamEvent` enum variants identical across Tasks 1, 2, 3, 5 (RetrievalDone {hits}, Token {delta, turn_index}, Final {answer}). + - `AskOpts.stream_sink: Option>` consistent. + - `wire_answer_event(&StreamEvent, OffsetDateTime) -> Value` signature stable. + - `FinishReason::Cancelled` used consistently (Task 2 step 2.4 + 2.5). + - `RefusalReason::LlmStreamAborted` matches existing variant (already in `kebab-core`). + +--- + +## Execution Handoff + +Plan complete and saved to `docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md`. Two execution options: + +**1. Subagent-Driven (recommended)** — fresh subagent per task, review between tasks, fast iteration. + +**2. Inline Execution** — execute tasks in this session using executing-plans, batch execution with checkpoints. + +Which approach? From 31475f0312f4ca037faffdc890ba9f82ee6f05fd Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 14:38:54 +0900 Subject: [PATCH 03/11] feat(rag): StreamEvent enum + switch AskOpts.stream_sink (fb-33) 3-variant discriminated enum (RetrievalDone / Token / Final). AskOpts.stream_sink now carries StreamEvent. Other crates fail to compile until subsequent tasks adapt their call sites. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-rag/src/pipeline.rs | 126 +++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 5 deletions(-) diff --git a/crates/kebab-rag/src/pipeline.rs b/crates/kebab-rag/src/pipeline.rs index ab829d5..3f1024b 100644 --- a/crates/kebab-rag/src/pipeline.rs +++ b/crates/kebab-rag/src/pipeline.rs @@ -67,6 +67,35 @@ struct PackedCitation { /// prompt section the LLM will see (system + query + packed context). type PackedContext = (String, Vec, usize); +/// p9-fb-33: streaming events the pipeline forwards into +/// [`AskOpts::stream_sink`] when present. Discriminated on `kind` +/// to match the wire `answer_event.v1` schema. Three variants: +/// +/// - `RetrievalDone` — emitted once after retrieval + stale-stamp. +/// - `Token` — emitted per `TokenChunk::Token` from the LM. +/// - `Final` — emitted once after the full Answer is built (before +/// persistence). Always the terminal event on the success path. +/// +/// On caller-side cancel (receiver dropped), the pipeline observes +/// the `SendError` from the next `Token` send and breaks the LM +/// loop — see `RagPipeline::ask` cancel branch. In that case +/// `Final` is NOT emitted (the answer still gets persisted with +/// `RefusalReason::LlmStreamAborted`). +#[derive(Clone, Debug, serde::Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum StreamEvent { + RetrievalDone { + hits: Vec, + }, + Token { + delta: String, + turn_index: Option, + }, + Final { + answer: Answer, + }, +} + // ── AskOpts ───────────────────────────────────────────────────────────────── /// Caller-supplied knobs for one [`RagPipeline::ask`] invocation. @@ -92,11 +121,10 @@ pub struct AskOpts { pub temperature: Option, /// Override `config.models.llm.seed` for this call. pub seed: Option, - /// Optional sink: every `TokenChunk::Token` produced by the LM is - /// forwarded synchronously. A dropped receiver does NOT abort the - /// pipeline — `SendError` is silently swallowed and generation - /// continues so the `Answer` row still gets persisted. - pub stream_sink: Option>, + /// Optional sink: every staged event (`RetrievalDone`, `Token`, + /// `Final`) is forwarded synchronously. A dropped receiver + /// triggers cancel — see `RagPipeline::ask` for the break path. + pub stream_sink: Option>, /// p9-fb-15: prior turns of the same conversation. Empty for /// single-shot ask. The pipeline prepends a serialized `[이전 /// 대화]` block to the user prompt and uses the most-recent @@ -997,3 +1025,91 @@ mod compute_stale_mirror_tests { assert!(!compute_stale(future, now(), 30)); } } + +#[cfg(test)] +mod stream_event_serde_tests { + use super::*; + use kebab_core::{ + AnswerRetrievalSummary, ChunkId, ChunkerVersion, Citation, + DocumentId, IndexVersion, ModelRef, RetrievalDetail, SearchHit, SearchMode, + TokenUsage, TraceId, + }; + use kebab_core::asset::WorkspacePath; + use kebab_core::versions::PromptTemplateVersion; + use time::macros::datetime; + + fn mk_hit() -> SearchHit { + SearchHit { + rank: 1, + chunk_id: ChunkId("c1".into()), + doc_id: DocumentId("d1".into()), + doc_path: WorkspacePath::new("a.md".into()).unwrap(), + heading_path: vec!["H".into()], + section_label: None, + snippet: "s".into(), + citation: Citation::Line { + path: WorkspacePath::new("a.md".into()).unwrap(), + start: 1, + end: 1, + section: None, + }, + retrieval: RetrievalDetail { + method: SearchMode::Lexical, + fusion_score: 0.5, + lexical_score: Some(0.5), + vector_score: None, + lexical_rank: Some(1), + vector_rank: None, + }, + index_version: IndexVersion("v1".into()), + embedding_model: None, + chunker_version: ChunkerVersion("c@1".into()), + indexed_at: datetime!(2026-05-09 12:00:00 UTC), + stale: false, + } + } + + #[test] + fn stream_event_token_serializes_with_kind_discriminator() { + let ev = StreamEvent::Token { delta: "안녕".into(), turn_index: Some(0) }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!(v["kind"], "token"); + assert_eq!(v["delta"], "안녕"); + assert_eq!(v["turn_index"], 0); + } + + #[test] + fn stream_event_retrieval_done_serializes_hits() { + let ev = StreamEvent::RetrievalDone { hits: vec![mk_hit()] }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!(v["kind"], "retrieval_done"); + assert_eq!(v["hits"].as_array().unwrap().len(), 1); + } + + #[test] + fn stream_event_final_serializes_answer() { + let answer = Answer { + answer: "x".into(), + citations: vec![], + grounded: true, + refusal_reason: None, + model: ModelRef { id: "m".into(), provider: "p".into(), dimensions: None }, + embedding: None, + prompt_template_version: PromptTemplateVersion("rag-v1".into()), + retrieval: AnswerRetrievalSummary { + trace_id: TraceId("t".into()), + mode: SearchMode::Hybrid, + k: 10, score_gate: 0.3, top_score: 0.5, + chunks_returned: 1, chunks_used: 1, + }, + usage: TokenUsage { prompt_tokens: 0, completion_tokens: 0, latency_ms: 0 }, + created_at: datetime!(2026-05-09 12:00:00 UTC), + conversation_id: None, + turn_index: None, + }; + let ev = StreamEvent::Final { answer }; + let v = serde_json::to_value(&ev).unwrap(); + assert_eq!(v["kind"], "final"); + assert!(v["answer"].is_object()); + } +} From 307fd8d5276f800db1c895a019cb52a31b60071d Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 14:49:55 +0900 Subject: [PATCH 04/11] feat(rag): pipeline emits StreamEvent + cancel on SendError (fb-33) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RetrievalDone after retrieve+stale-stamp, Token per LM chunk (SendError → break, FinishReason::Cancelled, RefusalReason:: LlmStreamAborted), Final on success. answers row still persists on cancel for audit. Adds FinishReason::Cancelled, re-exports StreamEvent from kebab_rag, migrates two pre-fb-33 sink tests in tests/pipeline.rs to the new StreamEvent type (the "dropped receiver does not abort" test inverts to record cancel). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-core/src/traits.rs | 5 + crates/kebab-rag/src/lib.rs | 2 +- crates/kebab-rag/src/pipeline.rs | 63 ++++++++-- crates/kebab-rag/tests/pipeline.rs | 36 ++++-- crates/kebab-rag/tests/streaming_events.rs | 131 +++++++++++++++++++++ 5 files changed, 219 insertions(+), 18 deletions(-) create mode 100644 crates/kebab-rag/tests/streaming_events.rs diff --git a/crates/kebab-core/src/traits.rs b/crates/kebab-core/src/traits.rs index 2c48411..bb4d0c3 100644 --- a/crates/kebab-core/src/traits.rs +++ b/crates/kebab-core/src/traits.rs @@ -98,6 +98,11 @@ pub enum FinishReason { Stop, Length, Aborted, + /// p9-fb-33: caller-side cancel. The pipeline breaks the LM loop + /// when a `Token` send into `AskOpts.stream_sink` returns + /// `SendError` (receiver dropped). The persisted answer is + /// flagged with `RefusalReason::LlmStreamAborted`. + Cancelled, Error(String), } diff --git a/crates/kebab-rag/src/lib.rs b/crates/kebab-rag/src/lib.rs index a883dae..e527dc0 100644 --- a/crates/kebab-rag/src/lib.rs +++ b/crates/kebab-rag/src/lib.rs @@ -22,4 +22,4 @@ pub use kebab_core::{Answer, AnswerCitation, AnswerRetrievalSummary, RefusalReas mod pipeline; -pub use pipeline::{AskOpts, RagPipeline}; +pub use pipeline::{AskOpts, RagPipeline, StreamEvent}; diff --git a/crates/kebab-rag/src/pipeline.rs b/crates/kebab-rag/src/pipeline.rs index 3f1024b..21e7844 100644 --- a/crates/kebab-rag/src/pipeline.rs +++ b/crates/kebab-rag/src/pipeline.rs @@ -83,6 +83,12 @@ type PackedContext = (String, Vec, usize); /// `RefusalReason::LlmStreamAborted`). #[derive(Clone, Debug, serde::Serialize)] #[serde(tag = "kind", rename_all = "snake_case")] +// `Final.answer` carries a full `Answer` (~320B) and is the largest +// variant; `Token` is the hot path. Size mismatch is unavoidable +// without boxing the wire-shape, which would force every consumer +// (TUI / CLI / future MCP) to deref. The sink is short-lived (one +// per ask) so the per-event overhead is not material. +#[allow(clippy::large_enum_variant)] pub enum StreamEvent { RetrievalDone { hits: Vec, @@ -231,6 +237,16 @@ impl RagPipeline { for h in &mut hits { h.stale = compute_stale(h.indexed_at, now, stale_threshold_days); } + // p9-fb-33: emit retrieval_done as soon as the hit list is + // ready (post stale-stamp so consumers see the same `stale` + // values the App-level wire path emits). Cancel is best-effort + // here — if the caller already dropped the receiver we just + // skip and let the LLM-loop SendError handle it consistently. + if let Some(sink) = &opts.stream_sink { + let _ = sink.send(StreamEvent::RetrievalDone { + hits: hits.clone(), + }); + } let chunks_returned = u32::try_from(hits.len()).unwrap_or(u32::MAX); let top_score = hits.first().map(|h| h.retrieval.fusion_score).unwrap_or(0.0); @@ -329,16 +345,28 @@ impl RagPipeline { .llm .generate_stream(req) .context("kb-rag: llm.generate_stream")?; + let mut cancelled = false; for item in stream { let chunk = item.context("kb-rag: stream item")?; match chunk { TokenChunk::Token(t) => { acc.push_str(&t); if let Some(sink) = &opts.stream_sink { - // SendError silently dropped — caller cancelled but the - // pipeline still drives generation to completion so the - // `answers` row gets a faithful record. - let _ = sink.send(t); + // p9-fb-33: SendError → caller dropped the + // receiver (probably a closed stdout downstream). + // Stop generation, mark the answer cancelled so + // the persistence path records refusal_reason = + // LlmStreamAborted. + if sink + .send(StreamEvent::Token { + delta: t, + turn_index: opts.turn_index, + }) + .is_err() + { + cancelled = true; + break; + } } } TokenChunk::Done { @@ -351,6 +379,9 @@ impl RagPipeline { } } } + if cancelled { + finish_reason = FinishReason::Cancelled; + } // ── 6. Citation extract ──────────────────────────────────────────── let extracted: Vec = extract_markers(&acc); @@ -375,15 +406,20 @@ impl RagPipeline { }); let trimmed_answer = acc.trim(); let matched_refusal_phrase = refusal_phrase.is_match(&acc); - let grounded = !trimmed_answer.is_empty() + let grounded_unaware = !trimmed_answer.is_empty() && unknown_markers.is_empty() && !extracted.is_empty(); - let refusal_reason = if grounded { - None + // p9-fb-33: cancel takes priority over LlmSelfJudge — the + // caller bailed mid-stream, so the recorded reason should + // reflect that, not "model didn't cite". + let (grounded, refusal_reason) = if matches!(finish_reason, FinishReason::Cancelled) { + (false, Some(RefusalReason::LlmStreamAborted)) + } else if grounded_unaware { + (true, None) } else { // Spec §7: empty answer, unknown markers, silent ungrounded, // and explicit "근거가 부족" all collapse to LlmSelfJudge. - Some(RefusalReason::LlmSelfJudge) + (false, Some(RefusalReason::LlmSelfJudge)) }; // ── 8. Build Answer ──────────────────────────────────────────────── @@ -461,6 +497,17 @@ impl RagPipeline { "kb-rag: ask done" ); + // p9-fb-33: emit final on the success path. On cancel we + // skip Final — the receiver is gone and persistence still + // records the partial answer below. + if !cancelled + && let Some(sink) = &opts.stream_sink + { + let _ = sink.send(StreamEvent::Final { + answer: answer.clone(), + }); + } + // ── 9. Persist ───────────────────────────────────────────────────── let packed_chunks_json = if opts.explain { // Snapshot the packed entries as a portable list of objects so diff --git a/crates/kebab-rag/tests/pipeline.rs b/crates/kebab-rag/tests/pipeline.rs index 9dc9bd4..875e9d6 100644 --- a/crates/kebab-rag/tests/pipeline.rs +++ b/crates/kebab-rag/tests/pipeline.rs @@ -14,7 +14,7 @@ use kebab_core::{ FinishReason, LanguageModel, Retriever, SearchMode, TokenChunk, TokenUsage, }; use kebab_llm::MockLanguageModel; -use kebab_rag::{AskOpts, RagPipeline, RefusalReason}; +use kebab_rag::{AskOpts, RagPipeline, RefusalReason, StreamEvent}; /// LM ID used everywhere — kept short so snapshots stay stable. const TEST_LM_ID: &str = "mock-lm"; @@ -270,18 +270,32 @@ fn streaming_forwards_tokens_to_sink() { let lm: Arc = Arc::new(CountingLm::new(canned)); let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone()); - let (tx, rx) = std::sync::mpsc::channel::(); + let (tx, rx) = std::sync::mpsc::channel::(); let mut opts = default_opts(); opts.stream_sink = Some(tx); let _ = pipeline.ask("q", opts).unwrap(); - let collected: String = rx.into_iter().collect::>().join(""); + // p9-fb-33: extract Token deltas from the staged event stream. + let collected: String = rx + .into_iter() + .filter_map(|ev| match ev { + StreamEvent::Token { delta, .. } => Some(delta), + _ => None, + }) + .collect::>() + .join(""); assert_eq!(collected, canned); } -// ── 10. dropped receiver does NOT abort generation ──────────────────────── +// ── 10. dropped receiver aborts generation, records LlmStreamAborted ────── +// +// p9-fb-33: cancel semantics changed. Pre-fb-33 the pipeline drove +// the LM loop to completion and silently dropped sends. Now a +// SendError breaks the loop and stamps `RefusalReason::LlmStreamAborted` +// onto the persisted row — the partial answer (whatever was buffered +// before the cancel) still gets written for audit. #[test] -fn dropped_receiver_does_not_abort_generation() { +fn dropped_receiver_aborts_with_llm_stream_aborted() { let env = RagEnv::new(); let cid = id32("c1"); let did = id32("d1"); @@ -292,13 +306,17 @@ fn dropped_receiver_does_not_abort_generation() { let lm: Arc = Arc::new(CountingLm::new(canned)); let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone()); - let (tx, rx) = std::sync::mpsc::channel::(); - drop(rx); // receiver gone — every send fails silently + let (tx, rx) = std::sync::mpsc::channel::(); + drop(rx); // receiver gone — first Token send fails, loop breaks let mut opts = default_opts(); opts.stream_sink = Some(tx); let answer = pipeline.ask("q", opts).unwrap(); - assert_eq!(answer.answer, canned, "generation completes despite dead sink"); - assert!(answer.grounded); + assert!(!answer.grounded, "cancel takes priority over grounded"); + assert_eq!( + answer.refusal_reason, + Some(RefusalReason::LlmStreamAborted), + "cancel records LlmStreamAborted", + ); assert_eq!(env.count_answers(), 1, "answers row still persisted"); } diff --git a/crates/kebab-rag/tests/streaming_events.rs b/crates/kebab-rag/tests/streaming_events.rs new file mode 100644 index 0000000..05be1b0 --- /dev/null +++ b/crates/kebab-rag/tests/streaming_events.rs @@ -0,0 +1,131 @@ +//! p9-fb-33: pipeline-level streaming behavior — order invariants, +//! cancel propagation, refusal flagging. + +mod common; + +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::sync::mpsc; + +use common::{MockRetriever, RagEnv, id32, mk_hit}; +use kebab_core::{ + FinishReason, LanguageModel, RefusalReason, Retriever, SearchMode, TokenChunk, TokenUsage, +}; +use kebab_llm::MockLanguageModel; +use kebab_rag::{AskOpts, RagPipeline, StreamEvent}; + +const TEST_LM_ID: &str = "mock-lm"; + +/// Minimal LM mirroring `tests/pipeline.rs::CountingLm` so the +/// streaming-events suite stays self-contained. +struct CountingLm { + inner: MockLanguageModel, + calls: std::sync::atomic::AtomicUsize, +} + +impl CountingLm { + fn new(canned: &str) -> Self { + Self { + inner: MockLanguageModel { + model_id: TEST_LM_ID.to_string(), + provider: "mock".to_string(), + context_tokens: 32_768, + canned_response: canned.to_string(), + canned_finish: FinishReason::Stop, + canned_usage: TokenUsage { + prompt_tokens: 10, + completion_tokens: 5, + latency_ms: 7, + }, + }, + calls: std::sync::atomic::AtomicUsize::new(0), + } + } +} + +impl LanguageModel for CountingLm { + fn model_ref(&self) -> kebab_core::ModelRef { + self.inner.model_ref() + } + fn context_tokens(&self) -> usize { + self.inner.context_tokens() + } + fn generate_stream( + &self, + req: kebab_core::GenerateRequest, + ) -> anyhow::Result> + Send>> { + self.calls.fetch_add(1, Ordering::SeqCst); + self.inner.generate_stream(req) + } +} + +fn opts_with_sink(tx: mpsc::Sender) -> AskOpts { + AskOpts { + k: 3, + explain: false, + mode: SearchMode::Lexical, + temperature: Some(0.0), + seed: Some(0), + stream_sink: Some(tx), + history: Vec::new(), + conversation_id: None, + turn_index: None, + } +} + +/// Build a pipeline with one seeded chunk + canned LM response so +/// retrieval lands a single hit and the LM emits at least one token. +fn env_with_one_hit(canned: &str) -> (RagEnv, RagPipeline) { + let env = RagEnv::new(); + let cid = id32("c1"); + let did = id32("d1"); + env.seed_chunk(&cid, &did, "notes/a.md", "apples are red.", &["Intro"]); + let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])]; + let retriever: Arc = Arc::new(MockRetriever::new(hits)); + let lm: Arc = Arc::new(CountingLm::new(canned)); + let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone()); + (env, pipeline) +} + +#[test] +fn ask_emits_retrieval_then_tokens_then_final() { + let (_env, pipeline) = env_with_one_hit("apples are red. [#1]"); + let (tx, rx) = mpsc::channel::(); + let _ans = pipeline.ask("apples", opts_with_sink(tx)).unwrap(); + let events: Vec = rx.iter().collect(); + + // First event must be RetrievalDone. + assert!( + matches!(events.first(), Some(StreamEvent::RetrievalDone { .. })), + "first event must be RetrievalDone, got {:?}", + events.first() + ); + + // Last event must be Final. + assert!( + matches!(events.last(), Some(StreamEvent::Final { .. })), + "last event must be Final, got {:?}", + events.last() + ); + + // Everything in between is Token. + for ev in &events[1..events.len() - 1] { + assert!( + matches!(ev, StreamEvent::Token { .. }), + "middle events must be Token, got {ev:?}" + ); + } +} + +#[test] +fn ask_records_llm_stream_aborted_when_receiver_drops() { + let (env, pipeline) = env_with_one_hit("apples are red. [#1]"); + let (tx, rx) = mpsc::channel::(); + // Drop the receiver immediately so the first Token send fails. + drop(rx); + let ans = pipeline.ask("apples", opts_with_sink(tx)).unwrap(); + assert!(!ans.grounded); + assert_eq!(ans.refusal_reason, Some(RefusalReason::LlmStreamAborted)); + // Persistence still happens on cancel — the row is the audit trail. + assert_eq!(env.count_answers(), 1, "answers row written on cancel"); +} From e5c99f5b80760815a97006ded329a655ec15e317 Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 14:57:46 +0900 Subject: [PATCH 05/11] feat(tui): adapt ask worker to StreamEvent sink (fb-33) Worker channel now carries kebab_app::StreamEvent. drain_stream matches on Token { delta }; RetrievalDone and Final are ignored (citations render from last_answer, Final is redundant with worker join). app::AskState.rx type widened to match. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-app/src/lib.rs | 2 +- crates/kebab-tui/src/app.rs | 11 +++++++---- crates/kebab-tui/src/ask.rs | 16 +++++++++++++--- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/crates/kebab-app/src/lib.rs b/crates/kebab-app/src/lib.rs index 602fdaf..960442b 100644 --- a/crates/kebab-app/src/lib.rs +++ b/crates/kebab-app/src/lib.rs @@ -85,7 +85,7 @@ pub const NO_EXT_SENTINEL: &str = ""; /// `use kebab_app::AskOpts` keeps working without churn. The struct gained /// a `stream_sink` field in P4-3; non-streaming callers (kb-cli today) /// pass `stream_sink: None`. -pub use kebab_rag::AskOpts; +pub use kebab_rag::{AskOpts, StreamEvent}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct DoctorReport { diff --git a/crates/kebab-tui/src/app.rs b/crates/kebab-tui/src/app.rs index 44ed73c..1d53d0c 100644 --- a/crates/kebab-tui/src/app.rs +++ b/crates/kebab-tui/src/app.rs @@ -186,9 +186,12 @@ impl Default for SearchState { /// Ask pane state — owned by p9-3, extended by p9-fb-16 for /// multi-turn conversation transcript. /// -/// The worker thread (`thread`) owns the `mpsc::Sender` that -/// `kebab-app::ask` writes tokens into. The pane keeps the matching -/// `rx` and drains it once per render frame (no blocking). +/// The worker thread (`thread`) owns the `mpsc::Sender` +/// that `kebab-app::ask` writes events into. The pane keeps the matching +/// `rx` and drains it once per render frame (no blocking). Only the +/// `Token { delta }` variant is consumed for the streaming transcript; +/// `RetrievalDone` and `Final` are ignored (citations render from +/// `last_answer` after the worker join). /// /// p9-fb-16: completed `Turn`s accumulate in `turns`; the worker /// passes a snapshot of `turns` as `history` to @@ -214,7 +217,7 @@ pub struct AskState { pub thread: Option>>, /// Token receiver paired with the worker's `Sender`. Drained /// every render frame. - pub rx: Option>, + pub rx: Option>, /// Vertical scroll offset for the transcript area when content /// exceeds the viewport. Only consulted when `follow_tail` is /// false; otherwise the renderer overrides this with the diff --git a/crates/kebab-tui/src/ask.rs b/crates/kebab-tui/src/ask.rs index dd20917..854a325 100644 --- a/crates/kebab-tui/src/ask.rs +++ b/crates/kebab-tui/src/ask.rs @@ -483,7 +483,7 @@ pub fn handle_key_ask(state: &mut App, key: KeyEvent) -> KeyOutcome { } fn spawn_ask_worker(state: &mut App) { - let (tx, rx) = mpsc::channel::(); + let (tx, rx) = mpsc::channel::(); let cfg = state.config.clone(); let s = state.ask.as_mut().unwrap(); // p9-fb-10: take() consumes the input in one step (no clone + @@ -542,8 +542,18 @@ fn make_conversation_id() -> String { pub(crate) fn drain_stream(state: &mut App) { let Some(s) = state.ask.as_mut() else { return }; if let Some(rx) = &s.rx { - for tok in rx.try_iter() { - s.partial.push_str(&tok); + for ev in rx.try_iter() { + match ev { + kebab_app::StreamEvent::Token { delta, .. } => { + s.partial.push_str(&delta); + } + // p9-fb-33: TUI ignores RetrievalDone (citation + // panel renders after completion via `last_answer`) + // and Final (the worker thread's join already + // delivers the canonical Answer in poll_worker). + kebab_app::StreamEvent::RetrievalDone { .. } + | kebab_app::StreamEvent::Final { .. } => {} + } } } } From e8caf2a57e6b698ea66516cdc407b7a87a42301e Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 14:58:49 +0900 Subject: [PATCH 06/11] feat(wire): answer_event.v1 schema (fb-33) Discriminated ndjson event for `kebab ask --stream`. Mirrors the ingest_progress.v1 pattern (stderr stream + stdout final answer.v1 for backwards compat). Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/wire-schema/v1/answer_event.schema.json | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 docs/wire-schema/v1/answer_event.schema.json diff --git a/docs/wire-schema/v1/answer_event.schema.json b/docs/wire-schema/v1/answer_event.schema.json new file mode 100644 index 0000000..80a46f9 --- /dev/null +++ b/docs/wire-schema/v1/answer_event.schema.json @@ -0,0 +1,17 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://kb.local/wire/v1/answer_event.schema.json", + "title": "AnswerEvent v1", + "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. Terminal: `final`. Final stdout line is `answer.v1` for backwards compat (see ingest_progress.v1 precedent).", + "type": "object", + "required": ["schema_version", "kind", "ts"], + "properties": { + "schema_version": { "const": "answer_event.v1" }, + "kind": { "enum": ["retrieval_done", "token", "final"] }, + "ts": { "type": "string", "format": "date-time" }, + "hits": { "type": "array", "description": "retrieval_done: search_hit.v1[]" }, + "delta": { "type": "string", "description": "token: incremental string chunk" }, + "turn_index": { "type": ["integer", "null"], "minimum": 0, "description": "token: matches Answer.turn_index" }, + "answer": { "type": "object", "description": "final: complete answer.v1 payload" } + } +} From 29629e6786d7009f47538be0a45f914a6d6863c7 Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 15:03:41 +0900 Subject: [PATCH 07/11] feat(cli): kebab ask --stream emits ndjson on stderr (fb-33) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background-thread driver runs ask_with_config; main thread drains the receiver, serializes each StreamEvent to ndjson on stderr. BrokenPipe → drop receiver → pipeline SendError → cancel + LlmStreamAborted refusal. Final stdout line is the existing answer.v1 (ingest_progress.v1 backwards-compat pattern). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-cli/src/main.rs | 179 ++++++++++++++++++++++++++--------- crates/kebab-cli/src/wire.rs | 19 ++++ 2 files changed, 153 insertions(+), 45 deletions(-) diff --git a/crates/kebab-cli/src/main.rs b/crates/kebab-cli/src/main.rs index 4614bee..f3df832 100644 --- a/crates/kebab-cli/src/main.rs +++ b/crates/kebab-cli/src/main.rs @@ -153,6 +153,12 @@ enum Cmd { /// (e.g. `kebab-rust-async-2026-05`). #[arg(long, value_name = "ID")] session: Option, + + /// p9-fb-33: emit ndjson `answer_event.v1` events on stderr + /// while streaming. Final stdout line is the existing + /// `answer.v1`. Off by default to preserve final-only behavior. + #[arg(long)] + stream: bool, }, /// Wipe XDG data dirs (and optionally the Lance vector store) so the @@ -578,55 +584,138 @@ fn run(cli: &Cli) -> anyhow::Result<()> { show_citations, hide_citations, session, + stream, } => { let cfg = kebab_config::Config::load(cli.config.as_deref())?; - let opts = kebab_app::AskOpts { - k: *k, - explain: *explain, - mode: (*mode).into(), - temperature: *temperature, - seed: *seed, - // CLI ask is non-streaming today (the answer prints all at - // once on completion). The TUI ask pane (P9-3) is what - // wires up a real `mpsc::Sender` here. - stream_sink: None, - // p9-fb-18: when `--session` is set, the facade - // (`ask_with_session_with_config`) loads prior turns - // from SQLite and stuffs them into AskOpts.history - // before calling `ask_with_history`. Single-shot path - // (no `--session`) keeps the empty defaults. - history: Vec::new(), - conversation_id: None, - turn_index: None, - }; - let ans = match session.as_deref() { - Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?, - None => kebab_app::ask_with_config(cfg, query, opts)?, - }; - if cli.json { - println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?); - } else { - println!("{}", ans.answer); - // p9-fb-20: print the citation block after the - // answer body when --hide-citations is not set - // (--show-citations is the default). Skipped on - // refusal-with-zero-citations to avoid an empty - // `근거:` header. - let print_citations = *show_citations && !*hide_citations; - if print_citations && !ans.citations.is_empty() { - // p9-fb-32: yellow `[stale]` prefix on TTY (mirrors - // the search renderer's pattern in `Cmd::Search`). - use std::io::IsTerminal; - let color = std::io::stdout().is_terminal(); - let mut out = std::io::stdout().lock(); - render_ask_plain_citations(&mut out, &ans, color)?; + if *stream { + // p9-fb-33: streaming branch. Background thread runs + // ask_with_config (which calls into the rag pipeline); + // main thread drains the receiver and writes + // `answer_event.v1` ndjson to stderr. On BrokenPipe + // (downstream consumer closed), drop the receiver so + // the worker's next `send` returns SendError → + // pipeline cancels with LlmStreamAborted. Final stdout + // line is the existing `answer.v1` (mirrors + // ingest_progress.v1 + ingest_report.v1 split). + use std::io::Write; + use std::sync::mpsc; + + let (tx, rx) = mpsc::channel::(); + let opts = kebab_app::AskOpts { + k: *k, + explain: *explain, + mode: (*mode).into(), + temperature: *temperature, + seed: *seed, + stream_sink: Some(tx), + history: Vec::new(), + conversation_id: None, + turn_index: None, + }; + let cfg2 = cfg.clone(); + let q = query.clone(); + let session2 = session.clone(); + let handle = std::thread::spawn( + move || -> anyhow::Result { + match session2.as_deref() { + Some(sid) => kebab_app::ask_with_session_with_config( + cfg2, sid, &q, opts, + ), + None => kebab_app::ask_with_config(cfg2, &q, opts), + } + }, + ); + + // Drain receiver, write ndjson to stderr until + // completion or BrokenPipe. + let mut cancelled_pipe = false; + { + let mut stderr = std::io::stderr().lock(); + for ev in &rx { + let now = time::OffsetDateTime::now_utc(); + let v = wire::wire_answer_event(&ev, now); + let line = serde_json::to_string(&v)?; + if let Err(e) = writeln!(stderr, "{line}") { + if e.kind() == std::io::ErrorKind::BrokenPipe { + cancelled_pipe = true; + break; + } + return Err(e.into()); + } + } } + if cancelled_pipe { + // Dropping the receiver signals to the worker — + // the next `send` returns SendError, which the + // pipeline interprets as a cancel. + drop(rx); + } + + let result = handle + .join() + .map_err(|_| anyhow::anyhow!("ask worker panicked"))?; + let ans = result?; + + // Final stdout line — answer.v1 for backwards + // compat. BrokenPipe on stdout is silent (caller + // already gone). + let final_json = serde_json::to_string(&wire::wire_answer(&ans))?; + let _ = writeln!(std::io::stdout().lock(), "{final_json}"); + + if !ans.grounded { + return Err(RefusalSignal.into()); + } + Ok(()) + } else { + let opts = kebab_app::AskOpts { + k: *k, + explain: *explain, + mode: (*mode).into(), + temperature: *temperature, + seed: *seed, + // CLI ask is non-streaming by default (the answer + // prints all at once on completion). `--stream` + // takes the branch above; the TUI ask pane (P9-3) + // wires up its own `mpsc::Sender`. + stream_sink: None, + // p9-fb-18: when `--session` is set, the facade + // (`ask_with_session_with_config`) loads prior turns + // from SQLite and stuffs them into AskOpts.history + // before calling `ask_with_history`. Single-shot path + // (no `--session`) keeps the empty defaults. + history: Vec::new(), + conversation_id: None, + turn_index: None, + }; + let ans = match session.as_deref() { + Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?, + None => kebab_app::ask_with_config(cfg, query, opts)?, + }; + if cli.json { + println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?); + } else { + println!("{}", ans.answer); + // p9-fb-20: print the citation block after the + // answer body when --hide-citations is not set + // (--show-citations is the default). Skipped on + // refusal-with-zero-citations to avoid an empty + // `근거:` header. + let print_citations = *show_citations && !*hide_citations; + if print_citations && !ans.citations.is_empty() { + // p9-fb-32: yellow `[stale]` prefix on TTY (mirrors + // the search renderer's pattern in `Cmd::Search`). + use std::io::IsTerminal; + let color = std::io::stdout().is_terminal(); + let mut out = std::io::stdout().lock(); + render_ask_plain_citations(&mut out, &ans, color)?; + } + } + // Refusal → exit 1. + if !ans.grounded { + return Err(RefusalSignal.into()); + } + Ok(()) } - // Refusal → exit 1. - if !ans.grounded { - return Err(RefusalSignal.into()); - } - Ok(()) } Cmd::Reset { diff --git a/crates/kebab-cli/src/wire.rs b/crates/kebab-cli/src/wire.rs index 8fd58e7..e1e35d3 100644 --- a/crates/kebab-cli/src/wire.rs +++ b/crates/kebab-cli/src/wire.rs @@ -87,6 +87,25 @@ pub fn wire_answer(a: &Answer) -> Value { tag_object(v, "answer.v1") } +/// p9-fb-33: tag a [`StreamEvent`] as `answer_event.v1` ndjson. +/// +/// The timestamp is added at emit time (caller fills `ts`), since the +/// pipeline doesn't carry one in the in-process enum — mirrors the +/// `wire_ingest_progress` pattern (§2 ingest_progress.v1). +pub fn wire_answer_event( + ev: &kebab_app::StreamEvent, + ts: time::OffsetDateTime, +) -> Value { + let mut v = serde_json::to_value(ev).expect("StreamEvent serializes"); + let ts_str = ts + .format(&time::format_description::well_known::Rfc3339) + .expect("OffsetDateTime formats as RFC3339"); + if let Value::Object(ref mut map) = v { + map.insert("ts".to_string(), Value::String(ts_str)); + } + tag_object(v, "answer_event.v1") +} + /// Idempotent pass-through for [`DoctorReport`] — the type already carries /// `schema_version: "doctor.v1"` (struct-field convention, the one /// exception called out in the module doc above). This helper exists so From 39bf0de94906889f3f096c4659caa369dbd40dad Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 15:14:00 +0900 Subject: [PATCH 08/11] =?UTF-8?q?test(cli):=20wire=5Fask=5Fstream=20?= =?UTF-8?q?=E2=80=94=20stderr=20ndjson=20+=20stdout=20final=20+=20BrokenPi?= =?UTF-8?q?pe=20cancel=20(fb-33)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three Ollama-gated integration tests covering: - stderr lines parse as answer_event.v1 (retrieval_done first, final last, all carry RFC3339 ts). - stdout final line is answer.v1 (backwards compat). - non-stream path (--json without --stream) unchanged. - BrokenPipe stderr → child terminates cleanly via cancel propagation through pipeline SendError. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-cli/tests/common/mod.rs | 47 ++++++ crates/kebab-cli/tests/wire_ask_stream.rs | 186 ++++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 crates/kebab-cli/tests/wire_ask_stream.rs diff --git a/crates/kebab-cli/tests/common/mod.rs b/crates/kebab-cli/tests/common/mod.rs index 7d66076..8926bd2 100644 --- a/crates/kebab-cli/tests/common/mod.rs +++ b/crates/kebab-cli/tests/common/mod.rs @@ -126,6 +126,53 @@ pub fn ingest(cfg: &Path, workspace: &Path) { ); } +/// p9-fb-33: invoke `kebab ask --stream --mode lexical ` and +/// capture stdout + stderr. Lexical mode skips embeddings (matches +/// `wire_ask_stale.rs::run_ask_lexical`). Caller asserts on the +/// resulting (stdout, stderr) pair. +pub fn run_ask_stream(cfg: &Path, query: &str) -> (String, String) { + let bin = env!("CARGO_BIN_EXE_kebab"); + let out = Command::new(bin) + .args([ + "--config", + cfg.to_str().unwrap(), + "ask", + "--stream", + "--mode", + "lexical", + query, + ]) + .output() + .expect("kebab ask --stream"); + ( + String::from_utf8_lossy(&out.stdout).to_string(), + String::from_utf8_lossy(&out.stderr).to_string(), + ) +} + +/// p9-fb-33: invoke `kebab --json ask --mode lexical ` (no +/// `--stream`) — used by `wire_ask_stream::non_stream_path_unchanged` +/// to confirm the non-streaming JSON path still emits a single +/// `answer.v1` line on stdout. Returns stdout only (mirrors +/// `wire_ask_stale.rs::run_ask_lexical(json=true)` minus the +/// `Output` indirection). +pub fn run_ask_json(cfg: &Path, query: &str) -> String { + let bin = env!("CARGO_BIN_EXE_kebab"); + let out = Command::new(bin) + .args([ + "--config", + cfg.to_str().unwrap(), + "--json", + "ask", + "--mode", + "lexical", + query, + ]) + .output() + .expect("kebab ask --json"); + String::from_utf8_lossy(&out.stdout).to_string() +} + /// Rewrite `documents.updated_at` for one workspace path to /// `now - days_ago` (RFC3339 UTC). Mirrors /// `kebab-app/tests/common/mod.rs::backdate_document_updated_at`. diff --git a/crates/kebab-cli/tests/wire_ask_stream.rs b/crates/kebab-cli/tests/wire_ask_stream.rs new file mode 100644 index 0000000..4ba341b --- /dev/null +++ b/crates/kebab-cli/tests/wire_ask_stream.rs @@ -0,0 +1,186 @@ +//! p9-fb-33: CLI streaming surface — stderr ndjson `answer_event.v1` +//! events while the answer streams; final stdout line is the existing +//! `answer.v1` (backwards compat with the non-`--stream` path). +//! +//! These end-to-end checks exercise `kebab ask --stream`, which +//! requires a real Ollama on `127.0.0.1:11434` (same constraint as +//! `wire_ask_stale.rs` + `kebab-app/tests/ask_smoke.rs`). All three +//! tests are therefore `#[ignore]` by default — run with +//! `cargo test -p kebab-cli --test wire_ask_stream -- --ignored` +//! against a live Ollama with `gemma4:e4b` pulled. +//! +//! The `BrokenPipe → cancel` test (Task 7 of the fb-33 plan) verifies +//! that closing the stderr reader propagates SendError through the +//! pipeline so the child terminates instead of hanging. That's the +//! main thing the integration test layer can prove that unit tests +//! can't — pipeline cancel is a cross-process concern. +//! +//! Shared TempDir / ingest helpers live in `tests/common/mod.rs`. + +mod common; + +use std::fs; +use std::path::Path; + +use serde_json::Value; + +/// Drop `[rag].score_gate` to ~0 in the test config so the +/// score-gate refusal path doesn't short-circuit the LLM call. +/// Lexical retrieval against a one-doc corpus produces tiny fusion +/// scores (well below the default 0.30 gate); the pipeline would +/// take the `refuse_score_gate` early-return — which does not emit +/// a `Final` event — making the streaming-event ordering assertion +/// vacuous. Lower the gate so the LLM actually runs. +fn relax_score_gate(cfg: &Path) { + let body = fs::read_to_string(cfg).expect("read config.toml"); + let body = body.replace("score_gate = 0.30", "score_gate = 0.0"); + fs::write(cfg, body).expect("write relaxed config.toml"); +} + +#[test] +#[ignore = "requires real Ollama on 127.0.0.1:11434"] +fn stream_emits_ndjson_events_on_stderr() { + let dir = tempfile::tempdir().unwrap(); + let (cfg, workspace, _data) = + common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b"); + relax_score_gate(&cfg); + fs::write( + workspace.join("a.md"), + "# T\n\nrust ownership is a memory model.\n", + ) + .unwrap(); + common::ingest(&cfg, &workspace); + + let (stdout, stderr) = common::run_ask_stream(&cfg, "ownership"); + + // stderr: every non-empty line should parse as JSON with + // schema_version == "answer_event.v1" and a recognized kind. + let mut kinds: Vec = vec![]; + for line in stderr.lines() { + if line.trim().is_empty() { + continue; + } + let v: Value = serde_json::from_str(line) + .unwrap_or_else(|e| panic!("non-JSON stderr line: {line:?}: {e}")); + assert_eq!(v["schema_version"], "answer_event.v1"); + let kind = v["kind"].as_str().expect("kind").to_string(); + assert!( + matches!(kind.as_str(), "retrieval_done" | "token" | "final"), + "unexpected kind: {kind}" + ); + assert!(v["ts"].is_string(), "ts must be RFC3339 string"); + kinds.push(kind); + } + + // First event must be retrieval_done. Last must be final. + // (If the LLM refused mid-stream we still expect the worker to + // emit a Final event — the pipeline always closes the stream.) + assert_eq!( + kinds.first().map(String::as_str), + Some("retrieval_done"), + "first event must be retrieval_done, all kinds: {kinds:?}" + ); + assert_eq!( + kinds.last().map(String::as_str), + Some("final"), + "last event must be final, all kinds: {kinds:?}" + ); + + // stdout: last line is answer.v1 (backwards compat with the + // non-streaming path — same wire shape, just emitted after the + // ndjson event stream rather than instead of it). + let final_line = stdout + .lines() + .last() + .expect("stdout has at least one line"); + let answer: Value = + serde_json::from_str(final_line).expect("stdout final line = answer.v1"); + assert_eq!(answer["schema_version"], "answer.v1"); +} + +#[test] +#[ignore = "requires real Ollama on 127.0.0.1:11434"] +fn non_stream_path_unchanged() { + // Verify that the non-streaming JSON path (no `--stream`) still + // emits a single `answer.v1` line on stdout — fb-33 must not + // perturb the existing wire surface. + let dir = tempfile::tempdir().unwrap(); + let (cfg, workspace, _data) = + common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b"); + relax_score_gate(&cfg); + fs::write( + workspace.join("a.md"), + "# T\n\nrust ownership is a memory model.\n", + ) + .unwrap(); + common::ingest(&cfg, &workspace); + + let stdout = common::run_ask_json(&cfg, "ownership"); + let v: Value = serde_json::from_str(stdout.trim()) + .unwrap_or_else(|e| panic!("expected answer.v1, got {stdout:?}: {e}")); + assert_eq!(v["schema_version"], "answer.v1"); +} + +// p9-fb-33 (Task 7): BrokenPipe → cancel propagation. Spawn the +// binary, read the first stderr line (retrieval_done), drop the +// reader. The pipeline's next `Token` send returns SendError, the +// cancel branch fires, child.wait() returns instead of blocking +// forever. The key invariant is *liveness* — that `wait()` returns +// in bounded time. Don't assert exit code: refusal is exit 1, but +// the child may also exit 0 if the LLM happened to finish before +// cancel propagated. +#[test] +#[ignore = "requires real Ollama on 127.0.0.1:11434 + writes to a closed pipe"] +fn stream_cancels_when_stderr_closes() { + use std::io::{BufRead, BufReader}; + use std::process::{Command, Stdio}; + + let dir = tempfile::tempdir().unwrap(); + let (cfg, workspace, _data) = + common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b"); + relax_score_gate(&cfg); + fs::write( + workspace.join("a.md"), + "# T\n\nrust ownership is a memory model. it tracks lifetimes.\n", + ) + .unwrap(); + common::ingest(&cfg, &workspace); + + let bin = env!("CARGO_BIN_EXE_kebab"); + let mut child = Command::new(bin) + .args([ + "--config", + cfg.to_str().unwrap(), + "ask", + "--stream", + "--mode", + "lexical", + "ownership", + ]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("spawn kebab"); + + { + let stderr = child.stderr.take().expect("stderr piped"); + let mut reader = BufReader::new(stderr); + let mut first = String::new(); + reader + .read_line(&mut first) + .expect("read first stderr line"); + assert!( + first.contains("\"kind\":\"retrieval_done\""), + "first event must be retrieval_done, got {first:?}" + ); + // Drop the reader → child's stderr write end will see + // BrokenPipe on the next write → main thread drops rx → + // worker's pipeline.send returns SendError → cancel. + } + + let status = child.wait().expect("child completes after cancel"); + // Don't assert specific exit code — refusal is exit 1, but child + // may also exit 0 if the LLM finished before cancel propagated. + // The load-bearing assertion is that wait() returned at all. + let _ = status; +} From e1c6b7055af1b98a131913b046f848b04ea3fe40 Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 15:22:09 +0900 Subject: [PATCH 09/11] docs(fb-33): README + SMOKE + INDEX + skill notes Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 2 +- docs/SMOKE.md | 10 ++++++++++ integrations/claude-code/kebab/SKILL.md | 3 +++ tasks/INDEX.md | 2 +- tasks/p9/p9-fb-33-streaming-ask.md | 9 ++++++--- 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index e2a26b5..b4bc63f 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ kebab doctor | `kebab search --mode {lexical,vector,hybrid} "" [--no-cache]` | 검색. hybrid는 RRF fusion, citation 포함. 같은 process 안에서 동일 query (NFKC + trim + lowercase 정규화) 반복 시 in-process LRU 캐시 hit (capacity = `[search] cache_capacity`, default 256). `--no-cache` 로 강제 bypass — 디버깅용. ingest commit 발생 시 `kv['corpus_revision']` bump 으로 모든 entry 자동 stale | | `kebab list docs` | 색인된 문서 목록 | | `kebab inspect doc ` / `kebab inspect chunk ` | raw record 보기 | -| `kebab ask "" [--show-citations / --hide-citations] [--session ]` | RAG 답변 + 근거 인용. 답변 후 `근거:` block 으로 full path / line range / score 한 줄씩 (default ON — `--hide-citations` 로 끄기, pipe 시 유용). 근거 부족 시 거절. Ollama 필요. `--session ` 로 multi-turn — 첫 호출에서 SQLite `chat_sessions` 에 자동 생성, 이후 호출은 prior turns 를 history 로 받아 follow-up. session id 는 사용자 지정 (e.g. `kb-rust-async-2026-05`) — `kebab reset --data-only` 로 모든 session wipe | +| `kebab ask "" [--show-citations / --hide-citations] [--session ] [--stream]` | RAG 답변 + 근거 인용. 답변 후 `근거:` block 으로 full path / line range / score 한 줄씩 (default ON — `--hide-citations` 로 끄기, pipe 시 유용). 근거 부족 시 거절. Ollama 필요. `--session ` 로 multi-turn — 첫 호출에서 SQLite `chat_sessions` 에 자동 생성, 이후 호출은 prior turns 를 history 로 받아 follow-up. session id 는 사용자 지정 (e.g. `kb-rust-async-2026-05`) — `kebab reset --data-only` 로 모든 session wipe. **`--stream` (p9-fb-33)** 로 ndjson `answer_event.v1` event (retrieval_done → token* → final) 를 stderr 에 흘리고 stdout 마지막 줄에 기존 `answer.v1` — agent 가 token 즉시 소비 가능 | | `kebab doctor` | 설정/모델/DB 헬스 체크 | | `kebab tui` | Ratatui 셸 (Library + Search + Ask + Inspect 패널, desktop 진행 중). Library 에서 `r` 키로 background ingest 시작 — 화면 하단 status bar 가 진행 표시, 완료/abort 시 final 라인 잠시 유지 후 자동 hide. ingest 진행 중 `Esc` / `Ctrl-C` 가 cancel signal (그 외에는 quit). vim-style mode (header 우측 `-- NORMAL --` / `-- INSERT --`) — Library/Inspect 는 자동 NORMAL, Search/Ask 는 자동 INSERT. `i` 로 Normal→Insert (모든 pane — p9-fb-21), `Esc` 로 Insert→Normal 어디서나. mode-authoritative dispatch — Search 의 `j/k/o/g`, Ask 의 `e/j/k` 는 NORMAL 모드에서만 명령으로 동작, INSERT 에서는 입력 문자로 typing. (Search 의 chunk inspect 키는 `i`→`o` 로 rebind — `i` 가 universal Insert toggle.) **`F1` 로 cheatsheet popup** (현재 pane 의 키 매핑 + global 토글 표) — `Esc` / `F1` 로 닫기. Search 패널은 200ms debounce 후 background worker 가 검색 — 키 입력으로 UI freeze 안 됨, 사용자가 계속 타이핑하면 stale 결과 자동 폐기 (generation counter). Ask 패널은 multi-turn — 같은 conversation 안에서 Q1/A1, Q2/A2 transcript 누적, 다음 질문이 이전 턴을 history 로 받아 답변. 답변 본문은 markdown 렌더 (bold/italic/inline code/heading/list/code fence/table/blockquote, raw `**bold**` 가 실제 굵게 표시). `Ctrl-L` 로 새 conversation 시작. Search 의 `g` 키가 `$EDITOR` (기본 `vi`) 로 hit 의 citation 위치 열기 — 종료 후 TUI 화면이 자동으로 깨끗이 redraw. CLI `kebab ask` 는 raw markdown 그대로 (terminal 호환성 위해). Library 의 doc-list 가 한글 / 일본어 / 중국어 (CJK) 제목을 wide-char 정확한 column width 로 truncate — 한글 제목이 한 줄을 넘기지 않음 (CJK 1 자 = 2 col). Search/Ask/Filter 입력의 cursor 가 wide char 위에서 column 단위로 정렬 — 한글 입력 시 caret 이 글자 옆에 정확히 놓임. `← / →` 로 입력 문자열 중간 cursor 이동 (한글 한 글자 = 2 column 이라도 한 번에 이동), `Home / End` 로 양 끝 점프, `Delete` 로 cursor 위치 char 삭제 — 모든 input pane (Ask / Search / Library filter overlay) 동일 (p9-fb-22). Ask 트랜스크립트는 새 답변이 viewport 아래로 누적될 때 자동으로 tail 을 따라감 (auto-scroll); `j` / `k` 로 위로 스크롤하면 freeze, `Shift-G` 로 다시 bottom + auto-tail 재개. 화면 하단 hint line 은 한국어 동사구로 (`"위로"` / `"아래로"` / `"필터"` / `"타이핑 검색어"` / `"Esc 로 NORMAL 모드"` / `"i 입력모드"` 등) + 현재 (pane, mode) 조합에 맞춰 자동 분기, **첫 fragment 가 항상 `F1 도움말`** (cheatsheet 발견성 보장). 모든 모드에서 항상 떠 있는 상태바 — `kebab v docs │ ` (state: streaming/searching/indexing/idle, ingest 진행 중에는 progress 가 같은 자리에 흡수됨). Ask 진입 시 conversation id 8 자 prefix 도 함께 표시. Ask 트랜스크립트와 Inspect 양쪽에서 `PgUp / PgDn` 으로 10 줄씩 페이지 스크롤. Library 의 doc list 위에는 `TITLE / TAGS / UPDATED / CHUNKS` 컬럼 헤더 행 표시 (display-width 정렬, Hangul / CJK 안전). | | `kebab reset [--all / --data-only / --vector-only / --config-only] [--yes]` | XDG 데이터 wipe. **Irreversible.** TTY 면 confirm prompt, 아니면 `--yes` 필수. `--vector-only` 는 SQLite `embedding_records` 도 함께 truncate (orphan 방지) | diff --git a/docs/SMOKE.md b/docs/SMOKE.md index bdae18b..3ec0d2a 100644 --- a/docs/SMOKE.md +++ b/docs/SMOKE.md @@ -142,6 +142,16 @@ A 30-day default flags docs that haven't been touched in a month — the intent is to nudge a reingest before relying on the snapshot. Set to `0` to disable. +### Streaming ask (fb-33) + +```bash +kebab ask "what is rust ownership" --stream 2> events.ndjson > final.json +``` + +stderr 의 events.ndjson 은 한 줄 = 한 event 의 ndjson — `retrieval_done` 한 번, `token` 여러 번, `final` 한 번 (refusal 경로는 `final` 생략). final.json 은 기존 `answer.v1` 그대로 (backwards-compat). + +agent 가 stderr 를 닫으면 (`head -c 1` 등) pipeline 이 LLM stream 을 즉시 중단하고 `RefusalReason::LlmStreamAborted` 로 partial answer 를 `answers` 테이블에 기록. + ## P6-4 이미지 ingestion 옵션 `config.toml` 에 다음 절을 추가하면 `kebab ingest` 가 `**/*.png` / `**/*.jpg` 등 이미지 자산도 함께 색인합니다 (텍스트만 색인하려면 생략): diff --git a/integrations/claude-code/kebab/SKILL.md b/integrations/claude-code/kebab/SKILL.md index 51ddd97..a065e71 100644 --- a/integrations/claude-code/kebab/SKILL.md +++ b/integrations/claude-code/kebab/SKILL.md @@ -75,10 +75,13 @@ If MCP tools aren't in scope (host without MCP support, or `mcp.json` not config kebab search "" --mode hybrid --json 2>/dev/null kebab ask "" --json 2>/dev/null kebab ask "" --session --json 2>/dev/null +kebab ask "" --stream # ndjson answer_event.v1 on stderr, final answer.v1 on stdout ``` Same wire shapes as MCP. CLI pays cold start (~1-2s) per call — prefer MCP when available. +`--stream` (p9-fb-33) emits `retrieval_done` → `token`* → `final` events on stderr while the answer streams; the final stdout line is the standard `answer.v1` for backwards compat. Use when you need progressive token consumption; otherwise the default non-streaming path is simpler. Refusal paths (score-gate / no-chunks) emit `retrieval_done` then no `token`/`final` — read stdout `answer.v1` for the canonical refusal signal. + ## MCP host config Register `kebab mcp` once in your host's MCP config. For Claude Code, edit `~/.claude/mcp.json`: diff --git a/tasks/INDEX.md b/tasks/INDEX.md index 088ca53..1f69f76 100644 --- a/tasks/INDEX.md +++ b/tasks/INDEX.md @@ -121,7 +121,7 @@ P0~P5 는 직렬. P6~P9 는 P5 이후 병렬 가능. ### 🎯 0.4.0 — agent surface refinement (additive only) - [p9-fb-32 stale doc indicator](p9/p9-fb-32-stale-doc-indicator.md) — ✅ 머지 + v0.4.0 cut 후보 (2026-05-09) - - [p9-fb-33 streaming ask (ndjson delta)](p9/p9-fb-33-streaming-ask.md) — ⏳ 미구현, brainstorm 필요 + - [p9-fb-33 streaming ask (ndjson delta)](p9/p9-fb-33-streaming-ask.md) — ✅ 머지 + v0.5.0 cut 후보 (2026-05-09) - [p9-fb-34 output budget controls](p9/p9-fb-34-output-budget-controls.md) — ⏳ 미구현, brainstorm 필요 - [p9-fb-35 verbatim fetch](p9/p9-fb-35-verbatim-fetch.md) — ⏳ 미구현, brainstorm 필요 - [p9-fb-36 search filter args](p9/p9-fb-36-search-filters.md) — ⏳ 미구현, brainstorm 필요 diff --git a/tasks/p9/p9-fb-33-streaming-ask.md b/tasks/p9/p9-fb-33-streaming-ask.md index 24c38e1..305f321 100644 --- a/tasks/p9/p9-fb-33-streaming-ask.md +++ b/tasks/p9/p9-fb-33-streaming-ask.md @@ -3,8 +3,8 @@ phase: P9 component: kebab-cli + kebab-app + wire-schema task_id: p9-fb-33 title: "Streaming ask (ndjson delta) — agent token 즉시 소비" -status: open -target_version: 0.4.0 +status: completed +target_version: 0.5.0 depends_on: [] unblocks: [] contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md @@ -14,7 +14,10 @@ source_feedback: 사용자 도그푸딩 2026-05-06 — agent 가 token 도착 # p9-fb-33 — Streaming ask (ndjson delta) -> ⏳ **백로그 only — 미구현.** 본 spec 은 도그푸딩 피드백 skeleton. 구현 착수 전 [superpowers:brainstorming](../../docs/superpowers/) 으로 설계 단계 선행 필요. delta event 형식 / final-only fallback / TUI vs CLI 차이 brainstorm 후 확정. +> ✅ **구현 완료.** 본 spec 은 구현 시점의 frozen 상태. post-merge deviation 은 [HOTFIXES.md](../HOTFIXES.md) 참조 — live source of truth. + +상세 설계: `docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md`. +구현 계획: `docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md`. ## 증상 / 동기 From a082b78f8e180aef3b00bd26c79b71675e7fd201 Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 15:46:04 +0900 Subject: [PATCH 10/11] fix(fb-33): address PR #124 round 1 review - pipeline: refresh module docstring step 5 to reflect new cancel semantics (RetrievalDone/Token/Final + LlmStreamAborted) - wire schema: spell out refusal-path behavior in answer_event.v1 description (only retrieval_done emitted; no final) - test: factual comment on relax_score_gate-using test corrected - test: new Ollama-gated stream_score_gate_refusal_emits_only_retrieval_done - test: new ask_emits_no_final_when_cancelled_mid_stream pinning the no-Final invariant on cancel - pipeline: large_enum_variant comment broadened to acknowledge RetrievalDone.hits as the dominant per-emit cost - HOTFIXES: log AskOpts.stream_sink internal API break per spec contract policy Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-cli/tests/wire_ask_stream.rs | 59 +++++++++++++- crates/kebab-rag/src/pipeline.rs | 20 +++-- crates/kebab-rag/tests/streaming_events.rs | 86 ++++++++++++++++++++ docs/wire-schema/v1/answer_event.schema.json | 2 +- tasks/HOTFIXES.md | 13 +++ 5 files changed, 169 insertions(+), 11 deletions(-) diff --git a/crates/kebab-cli/tests/wire_ask_stream.rs b/crates/kebab-cli/tests/wire_ask_stream.rs index 4ba341b..98c995a 100644 --- a/crates/kebab-cli/tests/wire_ask_stream.rs +++ b/crates/kebab-cli/tests/wire_ask_stream.rs @@ -73,8 +73,12 @@ fn stream_emits_ndjson_events_on_stderr() { } // First event must be retrieval_done. Last must be final. - // (If the LLM refused mid-stream we still expect the worker to - // emit a Final event — the pipeline always closes the stream.) + // Note: this test only exercises the LLM-running path which always + // closes with `final`. score-gate / no-chunks refusal paths emit + // only `retrieval_done` and skip `final` — that's why the test uses + // `relax_score_gate()` above to force the LLM path. See + // `stream_score_gate_refusal_emits_only_retrieval_done` for the + // refusal-path coverage. assert_eq!( kinds.first().map(String::as_str), Some("retrieval_done"), @@ -184,3 +188,54 @@ fn stream_cancels_when_stderr_closes() { // The load-bearing assertion is that wait() returned at all. let _ = status; } + +// p9-fb-33 (PR #124 round 1, item 4): score-gate refusal path — +// thin doc + unrelated query trips the default 0.30 score gate +// before the LLM runs. The pipeline emits only `retrieval_done` +// on stderr (no `token`, no `final`); stdout still carries the +// canonical `answer.v1` with `grounded=false`. +#[test] +#[ignore = "requires real Ollama on 127.0.0.1:11434"] +fn stream_score_gate_refusal_emits_only_retrieval_done() { + let dir = tempfile::tempdir().unwrap(); + let (cfg, workspace, _data) = + common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b"); + // Intentionally NO relax_score_gate — keep the default 0.30 + // so the thin-doc + unrelated-query combo trips refusal. + fs::write( + workspace.join("a.md"), + "# Title\n\nrust is a language.\n", + ) + .unwrap(); + common::ingest(&cfg, &workspace); + + let (stdout, stderr) = + common::run_ask_stream(&cfg, "completely unrelated topic about cooking pasta"); + + let kinds: Vec = stderr + .lines() + .filter(|l| !l.trim().is_empty()) + .filter_map(|l| serde_json::from_str::(l).ok()) + .filter_map(|v| v["kind"].as_str().map(String::from)) + .collect(); + + // Refusal path: only retrieval_done, no token, no final. + assert!( + kinds.iter().all(|k| k == "retrieval_done"), + "refusal path must emit only retrieval_done, got {kinds:?}" + ); + assert!( + !kinds.is_empty(), + "expected at least one retrieval_done event, got empty stderr" + ); + + // Stdout still has answer.v1 with grounded=false. + let final_line = stdout + .lines() + .last() + .expect("stdout has at least one line"); + let answer: Value = + serde_json::from_str(final_line).expect("answer.v1"); + assert_eq!(answer["schema_version"], "answer.v1"); + assert_eq!(answer["grounded"], false); +} diff --git a/crates/kebab-rag/src/pipeline.rs b/crates/kebab-rag/src/pipeline.rs index 21e7844..bf70900 100644 --- a/crates/kebab-rag/src/pipeline.rs +++ b/crates/kebab-rag/src/pipeline.rs @@ -12,9 +12,12 @@ //! ~4 chars / token, matching the kb-chunk convention). //! 4. Render the `rag-v1` prompt (system + user) verbatim per design. //! 5. Generate via `LanguageModel::generate_stream`. The token loop runs -//! on the calling thread; `opts.stream_sink` (if any) gets each -//! token forwarded synchronously and a dropped receiver does not -//! abort generation. +//! on the calling thread; `opts.stream_sink` (if any) emits +//! `StreamEvent::RetrievalDone` once after retrieve+stale-stamp, +//! `StreamEvent::Token` per LM chunk, and `StreamEvent::Final` on +//! success. A dropped receiver triggers cancel: SendError on Token +//! breaks the LM loop + records `RefusalReason::LlmStreamAborted` +//! in the persisted Answer (p9-fb-33). //! 6. Citation extract — STRICT regex `\[#(\d{1,3})\]`, no false //! positives from prose `[1]` / `vec![1]` / Markdown link refs. //! 7. Citation validate — every extracted marker must map to a packed @@ -83,11 +86,12 @@ type PackedContext = (String, Vec, usize); /// `RefusalReason::LlmStreamAborted`). #[derive(Clone, Debug, serde::Serialize)] #[serde(tag = "kind", rename_all = "snake_case")] -// `Final.answer` carries a full `Answer` (~320B) and is the largest -// variant; `Token` is the hot path. Size mismatch is unavoidable -// without boxing the wire-shape, which would force every consumer -// (TUI / CLI / future MCP) to deref. The sink is short-lived (one -// per ask) so the per-event overhead is not material. +// p9-fb-33: clippy flags Final.answer (~320B) as the heavy variant. +// In practice RetrievalDone.hits (Vec, k≤10×~1KB each) +// dominates per-emit cost, but it fires once. Boxing either would +// force every consumer (TUI, CLI ndjson driver, future MCP) to +// deref through a Box for marginal win on a short-lived per-ask +// channel. Keep both unboxed. #[allow(clippy::large_enum_variant)] pub enum StreamEvent { RetrievalDone { diff --git a/crates/kebab-rag/tests/streaming_events.rs b/crates/kebab-rag/tests/streaming_events.rs index 05be1b0..daa908d 100644 --- a/crates/kebab-rag/tests/streaming_events.rs +++ b/crates/kebab-rag/tests/streaming_events.rs @@ -129,3 +129,89 @@ fn ask_records_llm_stream_aborted_when_receiver_drops() { // Persistence still happens on cancel — the row is the audit trail. assert_eq!(env.count_answers(), 1, "answers row written on cancel"); } + +/// p9-fb-33 (PR #124 round 1, item 5): pin the "no Final on cancel" +/// invariant. Uses a barrier-gated LM so the test can observe the +/// `RetrievalDone` event before any `Token`/`Final` lands in the +/// channel — then drops `rx` to force SendError on the next `Token`. +/// The pipeline's cancel branch must avoid emitting `Final` and +/// record `RefusalReason::LlmStreamAborted`. +struct BlockingLm { + inner: MockLanguageModel, + /// Pipeline thread waits on this before yielding any token. + /// Test thread releases it after observing `RetrievalDone`. + gate: Arc, +} + +impl LanguageModel for BlockingLm { + fn model_ref(&self) -> kebab_core::ModelRef { + self.inner.model_ref() + } + fn context_tokens(&self) -> usize { + self.inner.context_tokens() + } + fn generate_stream( + &self, + req: kebab_core::GenerateRequest, + ) -> anyhow::Result> + Send>> { + // Block until the test signals — guarantees `RetrievalDone` + // arrives at the receiver before any `Token` is queued. + self.gate.wait(); + self.inner.generate_stream(req) + } +} + +#[test] +fn ask_emits_no_final_when_cancelled_mid_stream() { + use std::sync::Barrier; + + let env = RagEnv::new(); + let cid = id32("c1"); + let did = id32("d1"); + env.seed_chunk(&cid, &did, "notes/a.md", "apples are red.", &["Intro"]); + let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])]; + let retriever: Arc = Arc::new(MockRetriever::new(hits)); + + let gate = Arc::new(Barrier::new(2)); + let lm: Arc = Arc::new(BlockingLm { + inner: MockLanguageModel { + model_id: TEST_LM_ID.to_string(), + provider: "mock".to_string(), + context_tokens: 32_768, + canned_response: "apples are red. [#1]".to_string(), + canned_finish: FinishReason::Stop, + canned_usage: TokenUsage { + prompt_tokens: 10, + completion_tokens: 5, + latency_ms: 7, + }, + }, + gate: Arc::clone(&gate), + }); + let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone()); + + let (tx, rx) = mpsc::channel::(); + let opts = opts_with_sink(tx); + let handle = std::thread::spawn(move || pipeline.ask("apples", opts)); + + // Receive RetrievalDone first — pipeline emits this before + // calling generate_stream (where the LM blocks on the gate). + let first = rx.recv().expect("RetrievalDone must arrive"); + assert!( + matches!(first, StreamEvent::RetrievalDone { .. }), + "first event must be RetrievalDone, got {first:?}", + ); + + // Drop rx now, BEFORE releasing the gate. Once the LM unblocks + // and the pipeline tries to send the first Token, it'll get + // SendError → cancel branch. + drop(rx); + gate.wait(); + + let ans = handle.join().expect("ask thread").unwrap(); + + // Cancel was observed: no Final emitted, refusal recorded. + assert!(!ans.grounded); + assert_eq!(ans.refusal_reason, Some(RefusalReason::LlmStreamAborted)); + assert_eq!(env.count_answers(), 1, "answers row written on cancel"); +} diff --git a/docs/wire-schema/v1/answer_event.schema.json b/docs/wire-schema/v1/answer_event.schema.json index 80a46f9..8581d08 100644 --- a/docs/wire-schema/v1/answer_event.schema.json +++ b/docs/wire-schema/v1/answer_event.schema.json @@ -2,7 +2,7 @@ "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://kb.local/wire/v1/answer_event.schema.json", "title": "AnswerEvent v1", - "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. Terminal: `final`. Final stdout line is `answer.v1` for backwards compat (see ingest_progress.v1 precedent).", + "description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. The success path closes with `final`; refusal paths (score_gate / no_chunks) emit only `retrieval_done` and rely on the stdout `answer.v1` line for the canonical signal. Cancel paths (BrokenPipe) may emit any prefix and stop. Final stdout line is always `answer.v1` for backwards compat (see ingest_progress.v1 precedent).", "type": "object", "required": ["schema_version", "kind", "ts"], "properties": { diff --git a/tasks/HOTFIXES.md b/tasks/HOTFIXES.md index 24fcb67..b8a785c 100644 --- a/tasks/HOTFIXES.md +++ b/tasks/HOTFIXES.md @@ -14,6 +14,19 @@ historical contract that was implemented; this file accumulates the deltas so phase 5+ readers can find the live behavior without diffing git history. +## 2026-05-09 — p9-fb-33: AskOpts.stream_sink type widened to StreamEvent + +**무엇이 바뀌었나**: `kebab_rag::AskOpts.stream_sink` 의 타입이 `Option>` 에서 `Option>` 로 변경됨. `kebab_app::StreamEvent` 가 새 re-export. + +**Spec contract 와의 관계**: spec §Domain API change 에서 명시한 internal API breaking. consumer = TUI worker 한 곳 (이번 PR 에서 같이 갱신). 외부 consumer 없음. + +**의식적 결정**: +- single sink 로 retrieval / token / final 세 stage 를 모두 운반하기 위한 필수 타입 변경. +- 기존 `Sender` 으로는 retrieval / final 단계를 표현할 방법이 없음. +- internal API 라 wire schema 와 다름 — `answer_event.v1` 는 신규 schema (additive minor at wire layer). + +**영향 받는 consumer**: `kebab-tui::ask::spawn_ask_worker` (PR #124 에서 동시 갱신). 외부 통합 없음. + ## 2026-05-09 — p9-fb-32: search_hit.v1 / citation.v1 required-field expansion **무엇이 바뀌었나**: `search_hit.v1` 과 `citation.v1` 의 `required` 배열에 `indexed_at` (RFC3339) + `stale` (bool) 두 필드가 추가됨. `schema_version` 은 그대로 (`search_hit.v1` / `citation.v1`). From 225831ffcdd514cfabcd716eb87231136fb2db89 Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sat, 9 May 2026 15:52:51 +0900 Subject: [PATCH 11/11] fix(fb-33): correct HOTFIXES cross-reference per PR #124 round 2 Pointed at the actual fb-33 design spec path + clarified that the AskOpts type widening is a byproduct of the new wire schema forcing single-sink 3-stage transport, not a stand-alone breaking change. Co-Authored-By: Claude Opus 4.7 (1M context) --- tasks/HOTFIXES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/HOTFIXES.md b/tasks/HOTFIXES.md index b8a785c..2efbcbc 100644 --- a/tasks/HOTFIXES.md +++ b/tasks/HOTFIXES.md @@ -18,7 +18,7 @@ git history. **무엇이 바뀌었나**: `kebab_rag::AskOpts.stream_sink` 의 타입이 `Option>` 에서 `Option>` 로 변경됨. `kebab_app::StreamEvent` 가 새 re-export. -**Spec contract 와의 관계**: spec §Domain API change 에서 명시한 internal API breaking. consumer = TUI worker 한 곳 (이번 PR 에서 같이 갱신). 외부 consumer 없음. +**Spec contract 와의 관계**: `answer_event.v1` (신규 wire schema) 가 단일 sink 로 3 stage (retrieval_done / token / final) 를 운반하도록 강제하면서 자연스럽게 in-process sink 의 type 폭이 넓어진 부산물. spec `docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md` 의 "Domain API change" 절에서 미리 명시. consumer = TUI worker 한 곳 (이번 PR 에서 같이 갱신). 외부 consumer 없음. **의식적 결정**: - single sink 로 retrieval / token / final 세 stage 를 모두 운반하기 위한 필수 타입 변경.