Merge pull request 'feat(fb-33): streaming ask (ndjson delta)' (#124) from feat/fb-33-streaming-ask into main

Reviewed-on: #124
This commit was merged in pull request #124.
This commit is contained in:
2026-05-09 07:33:29 +00:00
21 changed files with 2559 additions and 84 deletions

View File

@@ -74,7 +74,7 @@ kebab doctor
| `kebab search --mode {lexical,vector,hybrid} "<query>" [--no-cache]` | 검색. hybrid는 RRF fusion, citation 포함. 같은 process 안에서 동일 query (NFKC + trim + lowercase 정규화) 반복 시 in-process LRU 캐시 hit (capacity = `[search] cache_capacity`, default 256). `--no-cache` 로 강제 bypass — 디버깅용. ingest commit 발생 시 `kv['corpus_revision']` bump 으로 모든 entry 자동 stale |
| `kebab list docs` | 색인된 문서 목록 |
| `kebab inspect doc <id>` / `kebab inspect chunk <id>` | raw record 보기 |
| `kebab ask "<query>" [--show-citations / --hide-citations] [--session <id>]` | RAG 답변 + 근거 인용. 답변 후 `근거:` block 으로 full path / line range / score 한 줄씩 (default ON — `--hide-citations` 로 끄기, pipe 시 유용). 근거 부족 시 거절. Ollama 필요. `--session <id>` 로 multi-turn — 첫 호출에서 SQLite `chat_sessions` 에 자동 생성, 이후 호출은 prior turns 를 history 로 받아 follow-up. session id 는 사용자 지정 (e.g. `kb-rust-async-2026-05`) — `kebab reset --data-only` 로 모든 session wipe |
| `kebab ask "<query>" [--show-citations / --hide-citations] [--session <id>] [--stream]` | RAG 답변 + 근거 인용. 답변 후 `근거:` block 으로 full path / line range / score 한 줄씩 (default ON — `--hide-citations` 로 끄기, pipe 시 유용). 근거 부족 시 거절. Ollama 필요. `--session <id>` 로 multi-turn — 첫 호출에서 SQLite `chat_sessions` 에 자동 생성, 이후 호출은 prior turns 를 history 로 받아 follow-up. session id 는 사용자 지정 (e.g. `kb-rust-async-2026-05`) — `kebab reset --data-only` 로 모든 session wipe. **`--stream` (p9-fb-33)** 로 ndjson `answer_event.v1` event (retrieval_done → token* → final) 를 stderr 에 흘리고 stdout 마지막 줄에 기존 `answer.v1` — agent 가 token 즉시 소비 가능 |
| `kebab doctor` | 설정/모델/DB 헬스 체크 |
| `kebab tui` | Ratatui 셸 (Library + Search + Ask + Inspect 패널, desktop 진행 중). Library 에서 `r` 키로 background ingest 시작 — 화면 하단 status bar 가 진행 표시, 완료/abort 시 final 라인 잠시 유지 후 자동 hide. ingest 진행 중 `Esc` / `Ctrl-C` 가 cancel signal (그 외에는 quit). vim-style mode (header 우측 `-- NORMAL --` / `-- INSERT --`) — Library/Inspect 는 자동 NORMAL, Search/Ask 는 자동 INSERT. `i` 로 Normal→Insert (모든 pane — p9-fb-21), `Esc` 로 Insert→Normal 어디서나. mode-authoritative dispatch — Search 의 `j/k/o/g`, Ask 의 `e/j/k` 는 NORMAL 모드에서만 명령으로 동작, INSERT 에서는 입력 문자로 typing. (Search 의 chunk inspect 키는 `i``o` 로 rebind — `i` 가 universal Insert toggle.) **`F1` 로 cheatsheet popup** (현재 pane 의 키 매핑 + global 토글 표) — `Esc` / `F1` 로 닫기. Search 패널은 200ms debounce 후 background worker 가 검색 — 키 입력으로 UI freeze 안 됨, 사용자가 계속 타이핑하면 stale 결과 자동 폐기 (generation counter). Ask 패널은 multi-turn — 같은 conversation 안에서 Q1/A1, Q2/A2 transcript 누적, 다음 질문이 이전 턴을 history 로 받아 답변. 답변 본문은 markdown 렌더 (bold/italic/inline code/heading/list/code fence/table/blockquote, raw `**bold**` 가 실제 굵게 표시). `Ctrl-L` 로 새 conversation 시작. Search 의 `g` 키가 `$EDITOR` (기본 `vi`) 로 hit 의 citation 위치 열기 — 종료 후 TUI 화면이 자동으로 깨끗이 redraw. CLI `kebab ask` 는 raw markdown 그대로 (terminal 호환성 위해). Library 의 doc-list 가 한글 / 일본어 / 중국어 (CJK) 제목을 wide-char 정확한 column width 로 truncate — 한글 제목이 한 줄을 넘기지 않음 (CJK 1 자 = 2 col). Search/Ask/Filter 입력의 cursor 가 wide char 위에서 column 단위로 정렬 — 한글 입력 시 caret 이 글자 옆에 정확히 놓임. `← / →` 로 입력 문자열 중간 cursor 이동 (한글 한 글자 = 2 column 이라도 한 번에 이동), `Home / End` 로 양 끝 점프, `Delete` 로 cursor 위치 char 삭제 — 모든 input pane (Ask / Search / Library filter overlay) 동일 (p9-fb-22). Ask 트랜스크립트는 새 답변이 viewport 아래로 누적될 때 자동으로 tail 을 따라감 (auto-scroll); `j` / `k` 로 위로 스크롤하면 freeze, `Shift-G` 로 다시 bottom + auto-tail 재개. 화면 하단 hint line 은 한국어 동사구로 (`"위로"` / `"아래로"` / `"필터"` / `"타이핑 검색어"` / `"Esc 로 NORMAL 모드"` / `"i 입력모드"` 등) + 현재 (pane, mode) 조합에 맞춰 자동 분기, **첫 fragment 가 항상 `F1 도움말`** (cheatsheet 발견성 보장). 모든 모드에서 항상 떠 있는 상태바 — `kebab v<version> │ <pane> │ <docs> docs │ <state>` (state: streaming/searching/indexing/idle, ingest 진행 중에는 progress 가 같은 자리에 흡수됨). Ask 진입 시 conversation id 8 자 prefix 도 함께 표시. Ask 트랜스크립트와 Inspect 양쪽에서 `PgUp / PgDn` 으로 10 줄씩 페이지 스크롤. Library 의 doc list 위에는 `TITLE / TAGS / UPDATED / CHUNKS` 컬럼 헤더 행 표시 (display-width 정렬, Hangul / CJK 안전). |
| `kebab reset [--all / --data-only / --vector-only / --config-only] [--yes]` | XDG 데이터 wipe. **Irreversible.** TTY 면 confirm prompt, 아니면 `--yes` 필수. `--vector-only` 는 SQLite `embedding_records` 도 함께 truncate (orphan 방지) |

View File

@@ -85,7 +85,7 @@ pub const NO_EXT_SENTINEL: &str = "<no-ext>";
/// `use kebab_app::AskOpts` keeps working without churn. The struct gained
/// a `stream_sink` field in P4-3; non-streaming callers (kb-cli today)
/// pass `stream_sink: None`.
pub use kebab_rag::AskOpts;
pub use kebab_rag::{AskOpts, StreamEvent};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct DoctorReport {

View File

@@ -153,6 +153,12 @@ enum Cmd {
/// (e.g. `kebab-rust-async-2026-05`).
#[arg(long, value_name = "ID")]
session: Option<String>,
/// p9-fb-33: emit ndjson `answer_event.v1` events on stderr
/// while streaming. Final stdout line is the existing
/// `answer.v1`. Off by default to preserve final-only behavior.
#[arg(long)]
stream: bool,
},
/// Wipe XDG data dirs (and optionally the Lance vector store) so the
@@ -578,55 +584,138 @@ fn run(cli: &Cli) -> anyhow::Result<()> {
show_citations,
hide_citations,
session,
stream,
} => {
let cfg = kebab_config::Config::load(cli.config.as_deref())?;
let opts = kebab_app::AskOpts {
k: *k,
explain: *explain,
mode: (*mode).into(),
temperature: *temperature,
seed: *seed,
// CLI ask is non-streaming today (the answer prints all at
// once on completion). The TUI ask pane (P9-3) is what
// wires up a real `mpsc::Sender` here.
stream_sink: None,
// p9-fb-18: when `--session` is set, the facade
// (`ask_with_session_with_config`) loads prior turns
// from SQLite and stuffs them into AskOpts.history
// before calling `ask_with_history`. Single-shot path
// (no `--session`) keeps the empty defaults.
history: Vec::new(),
conversation_id: None,
turn_index: None,
};
let ans = match session.as_deref() {
Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?,
None => kebab_app::ask_with_config(cfg, query, opts)?,
};
if cli.json {
println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?);
} else {
println!("{}", ans.answer);
// p9-fb-20: print the citation block after the
// answer body when --hide-citations is not set
// (--show-citations is the default). Skipped on
// refusal-with-zero-citations to avoid an empty
// `근거:` header.
let print_citations = *show_citations && !*hide_citations;
if print_citations && !ans.citations.is_empty() {
// p9-fb-32: yellow `[stale]` prefix on TTY (mirrors
// the search renderer's pattern in `Cmd::Search`).
use std::io::IsTerminal;
let color = std::io::stdout().is_terminal();
let mut out = std::io::stdout().lock();
render_ask_plain_citations(&mut out, &ans, color)?;
if *stream {
// p9-fb-33: streaming branch. Background thread runs
// ask_with_config (which calls into the rag pipeline);
// main thread drains the receiver and writes
// `answer_event.v1` ndjson to stderr. On BrokenPipe
// (downstream consumer closed), drop the receiver so
// the worker's next `send` returns SendError →
// pipeline cancels with LlmStreamAborted. Final stdout
// line is the existing `answer.v1` (mirrors
// ingest_progress.v1 + ingest_report.v1 split).
use std::io::Write;
use std::sync::mpsc;
let (tx, rx) = mpsc::channel::<kebab_app::StreamEvent>();
let opts = kebab_app::AskOpts {
k: *k,
explain: *explain,
mode: (*mode).into(),
temperature: *temperature,
seed: *seed,
stream_sink: Some(tx),
history: Vec::new(),
conversation_id: None,
turn_index: None,
};
let cfg2 = cfg.clone();
let q = query.clone();
let session2 = session.clone();
let handle = std::thread::spawn(
move || -> anyhow::Result<kebab_core::Answer> {
match session2.as_deref() {
Some(sid) => kebab_app::ask_with_session_with_config(
cfg2, sid, &q, opts,
),
None => kebab_app::ask_with_config(cfg2, &q, opts),
}
},
);
// Drain receiver, write ndjson to stderr until
// completion or BrokenPipe.
let mut cancelled_pipe = false;
{
let mut stderr = std::io::stderr().lock();
for ev in &rx {
let now = time::OffsetDateTime::now_utc();
let v = wire::wire_answer_event(&ev, now);
let line = serde_json::to_string(&v)?;
if let Err(e) = writeln!(stderr, "{line}") {
if e.kind() == std::io::ErrorKind::BrokenPipe {
cancelled_pipe = true;
break;
}
return Err(e.into());
}
}
}
if cancelled_pipe {
// Dropping the receiver signals to the worker —
// the next `send` returns SendError, which the
// pipeline interprets as a cancel.
drop(rx);
}
let result = handle
.join()
.map_err(|_| anyhow::anyhow!("ask worker panicked"))?;
let ans = result?;
// Final stdout line — answer.v1 for backwards
// compat. BrokenPipe on stdout is silent (caller
// already gone).
let final_json = serde_json::to_string(&wire::wire_answer(&ans))?;
let _ = writeln!(std::io::stdout().lock(), "{final_json}");
if !ans.grounded {
return Err(RefusalSignal.into());
}
Ok(())
} else {
let opts = kebab_app::AskOpts {
k: *k,
explain: *explain,
mode: (*mode).into(),
temperature: *temperature,
seed: *seed,
// CLI ask is non-streaming by default (the answer
// prints all at once on completion). `--stream`
// takes the branch above; the TUI ask pane (P9-3)
// wires up its own `mpsc::Sender`.
stream_sink: None,
// p9-fb-18: when `--session` is set, the facade
// (`ask_with_session_with_config`) loads prior turns
// from SQLite and stuffs them into AskOpts.history
// before calling `ask_with_history`. Single-shot path
// (no `--session`) keeps the empty defaults.
history: Vec::new(),
conversation_id: None,
turn_index: None,
};
let ans = match session.as_deref() {
Some(sid) => kebab_app::ask_with_session_with_config(cfg, sid, query, opts)?,
None => kebab_app::ask_with_config(cfg, query, opts)?,
};
if cli.json {
println!("{}", serde_json::to_string(&wire::wire_answer(&ans))?);
} else {
println!("{}", ans.answer);
// p9-fb-20: print the citation block after the
// answer body when --hide-citations is not set
// (--show-citations is the default). Skipped on
// refusal-with-zero-citations to avoid an empty
// `근거:` header.
let print_citations = *show_citations && !*hide_citations;
if print_citations && !ans.citations.is_empty() {
// p9-fb-32: yellow `[stale]` prefix on TTY (mirrors
// the search renderer's pattern in `Cmd::Search`).
use std::io::IsTerminal;
let color = std::io::stdout().is_terminal();
let mut out = std::io::stdout().lock();
render_ask_plain_citations(&mut out, &ans, color)?;
}
}
// Refusal → exit 1.
if !ans.grounded {
return Err(RefusalSignal.into());
}
Ok(())
}
// Refusal → exit 1.
if !ans.grounded {
return Err(RefusalSignal.into());
}
Ok(())
}
Cmd::Reset {

View File

@@ -87,6 +87,25 @@ pub fn wire_answer(a: &Answer) -> Value {
tag_object(v, "answer.v1")
}
/// p9-fb-33: tag a [`StreamEvent`] as `answer_event.v1` ndjson.
///
/// The timestamp is added at emit time (caller fills `ts`), since the
/// pipeline doesn't carry one in the in-process enum — mirrors the
/// `wire_ingest_progress` pattern (§2 ingest_progress.v1).
pub fn wire_answer_event(
ev: &kebab_app::StreamEvent,
ts: time::OffsetDateTime,
) -> Value {
let mut v = serde_json::to_value(ev).expect("StreamEvent serializes");
let ts_str = ts
.format(&time::format_description::well_known::Rfc3339)
.expect("OffsetDateTime formats as RFC3339");
if let Value::Object(ref mut map) = v {
map.insert("ts".to_string(), Value::String(ts_str));
}
tag_object(v, "answer_event.v1")
}
/// Idempotent pass-through for [`DoctorReport`] — the type already carries
/// `schema_version: "doctor.v1"` (struct-field convention, the one
/// exception called out in the module doc above). This helper exists so

View File

@@ -126,6 +126,53 @@ pub fn ingest(cfg: &Path, workspace: &Path) {
);
}
/// p9-fb-33: invoke `kebab ask --stream --mode lexical <query>` and
/// capture stdout + stderr. Lexical mode skips embeddings (matches
/// `wire_ask_stale.rs::run_ask_lexical`). Caller asserts on the
/// resulting (stdout, stderr) pair.
pub fn run_ask_stream(cfg: &Path, query: &str) -> (String, String) {
let bin = env!("CARGO_BIN_EXE_kebab");
let out = Command::new(bin)
.args([
"--config",
cfg.to_str().unwrap(),
"ask",
"--stream",
"--mode",
"lexical",
query,
])
.output()
.expect("kebab ask --stream");
(
String::from_utf8_lossy(&out.stdout).to_string(),
String::from_utf8_lossy(&out.stderr).to_string(),
)
}
/// p9-fb-33: invoke `kebab --json ask --mode lexical <query>` (no
/// `--stream`) — used by `wire_ask_stream::non_stream_path_unchanged`
/// to confirm the non-streaming JSON path still emits a single
/// `answer.v1` line on stdout. Returns stdout only (mirrors
/// `wire_ask_stale.rs::run_ask_lexical(json=true)` minus the
/// `Output` indirection).
pub fn run_ask_json(cfg: &Path, query: &str) -> String {
let bin = env!("CARGO_BIN_EXE_kebab");
let out = Command::new(bin)
.args([
"--config",
cfg.to_str().unwrap(),
"--json",
"ask",
"--mode",
"lexical",
query,
])
.output()
.expect("kebab ask --json");
String::from_utf8_lossy(&out.stdout).to_string()
}
/// Rewrite `documents.updated_at` for one workspace path to
/// `now - days_ago` (RFC3339 UTC). Mirrors
/// `kebab-app/tests/common/mod.rs::backdate_document_updated_at`.

View File

@@ -0,0 +1,241 @@
//! p9-fb-33: CLI streaming surface — stderr ndjson `answer_event.v1`
//! events while the answer streams; final stdout line is the existing
//! `answer.v1` (backwards compat with the non-`--stream` path).
//!
//! These end-to-end checks exercise `kebab ask --stream`, which
//! requires a real Ollama on `127.0.0.1:11434` (same constraint as
//! `wire_ask_stale.rs` + `kebab-app/tests/ask_smoke.rs`). All three
//! tests are therefore `#[ignore]` by default — run with
//! `cargo test -p kebab-cli --test wire_ask_stream -- --ignored`
//! against a live Ollama with `gemma4:e4b` pulled.
//!
//! The `BrokenPipe → cancel` test (Task 7 of the fb-33 plan) verifies
//! that closing the stderr reader propagates SendError through the
//! pipeline so the child terminates instead of hanging. That's the
//! main thing the integration test layer can prove that unit tests
//! can't — pipeline cancel is a cross-process concern.
//!
//! Shared TempDir / ingest helpers live in `tests/common/mod.rs`.
mod common;
use std::fs;
use std::path::Path;
use serde_json::Value;
/// Drop `[rag].score_gate` to ~0 in the test config so the
/// score-gate refusal path doesn't short-circuit the LLM call.
/// Lexical retrieval against a one-doc corpus produces tiny fusion
/// scores (well below the default 0.30 gate); the pipeline would
/// take the `refuse_score_gate` early-return — which does not emit
/// a `Final` event — making the streaming-event ordering assertion
/// vacuous. Lower the gate so the LLM actually runs.
fn relax_score_gate(cfg: &Path) {
let body = fs::read_to_string(cfg).expect("read config.toml");
let body = body.replace("score_gate = 0.30", "score_gate = 0.0");
fs::write(cfg, body).expect("write relaxed config.toml");
}
#[test]
#[ignore = "requires real Ollama on 127.0.0.1:11434"]
fn stream_emits_ndjson_events_on_stderr() {
let dir = tempfile::tempdir().unwrap();
let (cfg, workspace, _data) =
common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
relax_score_gate(&cfg);
fs::write(
workspace.join("a.md"),
"# T\n\nrust ownership is a memory model.\n",
)
.unwrap();
common::ingest(&cfg, &workspace);
let (stdout, stderr) = common::run_ask_stream(&cfg, "ownership");
// stderr: every non-empty line should parse as JSON with
// schema_version == "answer_event.v1" and a recognized kind.
let mut kinds: Vec<String> = vec![];
for line in stderr.lines() {
if line.trim().is_empty() {
continue;
}
let v: Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("non-JSON stderr line: {line:?}: {e}"));
assert_eq!(v["schema_version"], "answer_event.v1");
let kind = v["kind"].as_str().expect("kind").to_string();
assert!(
matches!(kind.as_str(), "retrieval_done" | "token" | "final"),
"unexpected kind: {kind}"
);
assert!(v["ts"].is_string(), "ts must be RFC3339 string");
kinds.push(kind);
}
// First event must be retrieval_done. Last must be final.
// Note: this test only exercises the LLM-running path which always
// closes with `final`. score-gate / no-chunks refusal paths emit
// only `retrieval_done` and skip `final` — that's why the test uses
// `relax_score_gate()` above to force the LLM path. See
// `stream_score_gate_refusal_emits_only_retrieval_done` for the
// refusal-path coverage.
assert_eq!(
kinds.first().map(String::as_str),
Some("retrieval_done"),
"first event must be retrieval_done, all kinds: {kinds:?}"
);
assert_eq!(
kinds.last().map(String::as_str),
Some("final"),
"last event must be final, all kinds: {kinds:?}"
);
// stdout: last line is answer.v1 (backwards compat with the
// non-streaming path — same wire shape, just emitted after the
// ndjson event stream rather than instead of it).
let final_line = stdout
.lines()
.last()
.expect("stdout has at least one line");
let answer: Value =
serde_json::from_str(final_line).expect("stdout final line = answer.v1");
assert_eq!(answer["schema_version"], "answer.v1");
}
#[test]
#[ignore = "requires real Ollama on 127.0.0.1:11434"]
fn non_stream_path_unchanged() {
// Verify that the non-streaming JSON path (no `--stream`) still
// emits a single `answer.v1` line on stdout — fb-33 must not
// perturb the existing wire surface.
let dir = tempfile::tempdir().unwrap();
let (cfg, workspace, _data) =
common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
relax_score_gate(&cfg);
fs::write(
workspace.join("a.md"),
"# T\n\nrust ownership is a memory model.\n",
)
.unwrap();
common::ingest(&cfg, &workspace);
let stdout = common::run_ask_json(&cfg, "ownership");
let v: Value = serde_json::from_str(stdout.trim())
.unwrap_or_else(|e| panic!("expected answer.v1, got {stdout:?}: {e}"));
assert_eq!(v["schema_version"], "answer.v1");
}
// p9-fb-33 (Task 7): BrokenPipe → cancel propagation. Spawn the
// binary, read the first stderr line (retrieval_done), drop the
// reader. The pipeline's next `Token` send returns SendError, the
// cancel branch fires, child.wait() returns instead of blocking
// forever. The key invariant is *liveness* — that `wait()` returns
// in bounded time. Don't assert exit code: refusal is exit 1, but
// the child may also exit 0 if the LLM happened to finish before
// cancel propagated.
#[test]
#[ignore = "requires real Ollama on 127.0.0.1:11434 + writes to a closed pipe"]
fn stream_cancels_when_stderr_closes() {
use std::io::{BufRead, BufReader};
use std::process::{Command, Stdio};
let dir = tempfile::tempdir().unwrap();
let (cfg, workspace, _data) =
common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
relax_score_gate(&cfg);
fs::write(
workspace.join("a.md"),
"# T\n\nrust ownership is a memory model. it tracks lifetimes.\n",
)
.unwrap();
common::ingest(&cfg, &workspace);
let bin = env!("CARGO_BIN_EXE_kebab");
let mut child = Command::new(bin)
.args([
"--config",
cfg.to_str().unwrap(),
"ask",
"--stream",
"--mode",
"lexical",
"ownership",
])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn kebab");
{
let stderr = child.stderr.take().expect("stderr piped");
let mut reader = BufReader::new(stderr);
let mut first = String::new();
reader
.read_line(&mut first)
.expect("read first stderr line");
assert!(
first.contains("\"kind\":\"retrieval_done\""),
"first event must be retrieval_done, got {first:?}"
);
// Drop the reader → child's stderr write end will see
// BrokenPipe on the next write → main thread drops rx →
// worker's pipeline.send returns SendError → cancel.
}
let status = child.wait().expect("child completes after cancel");
// Don't assert specific exit code — refusal is exit 1, but child
// may also exit 0 if the LLM finished before cancel propagated.
// The load-bearing assertion is that wait() returned at all.
let _ = status;
}
// p9-fb-33 (PR #124 round 1, item 4): score-gate refusal path —
// thin doc + unrelated query trips the default 0.30 score gate
// before the LLM runs. The pipeline emits only `retrieval_done`
// on stderr (no `token`, no `final`); stdout still carries the
// canonical `answer.v1` with `grounded=false`.
#[test]
#[ignore = "requires real Ollama on 127.0.0.1:11434"]
fn stream_score_gate_refusal_emits_only_retrieval_done() {
let dir = tempfile::tempdir().unwrap();
let (cfg, workspace, _data) =
common::write_config_with_llm_model(dir.path(), 30, "gemma4:e4b");
// Intentionally NO relax_score_gate — keep the default 0.30
// so the thin-doc + unrelated-query combo trips refusal.
fs::write(
workspace.join("a.md"),
"# Title\n\nrust is a language.\n",
)
.unwrap();
common::ingest(&cfg, &workspace);
let (stdout, stderr) =
common::run_ask_stream(&cfg, "completely unrelated topic about cooking pasta");
let kinds: Vec<String> = stderr
.lines()
.filter(|l| !l.trim().is_empty())
.filter_map(|l| serde_json::from_str::<Value>(l).ok())
.filter_map(|v| v["kind"].as_str().map(String::from))
.collect();
// Refusal path: only retrieval_done, no token, no final.
assert!(
kinds.iter().all(|k| k == "retrieval_done"),
"refusal path must emit only retrieval_done, got {kinds:?}"
);
assert!(
!kinds.is_empty(),
"expected at least one retrieval_done event, got empty stderr"
);
// Stdout still has answer.v1 with grounded=false.
let final_line = stdout
.lines()
.last()
.expect("stdout has at least one line");
let answer: Value =
serde_json::from_str(final_line).expect("answer.v1");
assert_eq!(answer["schema_version"], "answer.v1");
assert_eq!(answer["grounded"], false);
}

View File

@@ -98,6 +98,11 @@ pub enum FinishReason {
Stop,
Length,
Aborted,
/// p9-fb-33: caller-side cancel. The pipeline breaks the LM loop
/// when a `Token` send into `AskOpts.stream_sink` returns
/// `SendError` (receiver dropped). The persisted answer is
/// flagged with `RefusalReason::LlmStreamAborted`.
Cancelled,
Error(String),
}

View File

@@ -22,4 +22,4 @@ pub use kebab_core::{Answer, AnswerCitation, AnswerRetrievalSummary, RefusalReas
mod pipeline;
pub use pipeline::{AskOpts, RagPipeline};
pub use pipeline::{AskOpts, RagPipeline, StreamEvent};

View File

@@ -12,9 +12,12 @@
//! ~4 chars / token, matching the kb-chunk convention).
//! 4. Render the `rag-v1` prompt (system + user) verbatim per design.
//! 5. Generate via `LanguageModel::generate_stream`. The token loop runs
//! on the calling thread; `opts.stream_sink` (if any) gets each
//! token forwarded synchronously and a dropped receiver does not
//! abort generation.
//! on the calling thread; `opts.stream_sink` (if any) emits
//! `StreamEvent::RetrievalDone` once after retrieve+stale-stamp,
//! `StreamEvent::Token` per LM chunk, and `StreamEvent::Final` on
//! success. A dropped receiver triggers cancel: SendError on Token
//! breaks the LM loop + records `RefusalReason::LlmStreamAborted`
//! in the persisted Answer (p9-fb-33).
//! 6. Citation extract — STRICT regex `\[#(\d{1,3})\]`, no false
//! positives from prose `[1]` / `vec![1]` / Markdown link refs.
//! 7. Citation validate — every extracted marker must map to a packed
@@ -67,6 +70,42 @@ struct PackedCitation {
/// prompt section the LLM will see (system + query + packed context).
type PackedContext = (String, Vec<PackedCitation>, usize);
/// p9-fb-33: streaming events the pipeline forwards into
/// [`AskOpts::stream_sink`] when present. Discriminated on `kind`
/// to match the wire `answer_event.v1` schema. Three variants:
///
/// - `RetrievalDone` — emitted once after retrieval + stale-stamp.
/// - `Token` — emitted per `TokenChunk::Token` from the LM.
/// - `Final` — emitted once after the full Answer is built (before
/// persistence). Always the terminal event on the success path.
///
/// On caller-side cancel (receiver dropped), the pipeline observes
/// the `SendError` from the next `Token` send and breaks the LM
/// loop — see `RagPipeline::ask` cancel branch. In that case
/// `Final` is NOT emitted (the answer still gets persisted with
/// `RefusalReason::LlmStreamAborted`).
#[derive(Clone, Debug, serde::Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
// p9-fb-33: clippy flags Final.answer (~320B) as the heavy variant.
// In practice RetrievalDone.hits (Vec<SearchHit>, k≤10×~1KB each)
// dominates per-emit cost, but it fires once. Boxing either would
// force every consumer (TUI, CLI ndjson driver, future MCP) to
// deref through a Box for marginal win on a short-lived per-ask
// channel. Keep both unboxed.
#[allow(clippy::large_enum_variant)]
pub enum StreamEvent {
RetrievalDone {
hits: Vec<SearchHit>,
},
Token {
delta: String,
turn_index: Option<u32>,
},
Final {
answer: Answer,
},
}
// ── AskOpts ─────────────────────────────────────────────────────────────────
/// Caller-supplied knobs for one [`RagPipeline::ask`] invocation.
@@ -92,11 +131,10 @@ pub struct AskOpts {
pub temperature: Option<f32>,
/// Override `config.models.llm.seed` for this call.
pub seed: Option<u64>,
/// Optional sink: every `TokenChunk::Token` produced by the LM is
/// forwarded synchronously. A dropped receiver does NOT abort the
/// pipeline — `SendError` is silently swallowed and generation
/// continues so the `Answer` row still gets persisted.
pub stream_sink: Option<std::sync::mpsc::Sender<String>>,
/// Optional sink: every staged event (`RetrievalDone`, `Token`,
/// `Final`) is forwarded synchronously. A dropped receiver
/// triggers cancel — see `RagPipeline::ask` for the break path.
pub stream_sink: Option<std::sync::mpsc::Sender<StreamEvent>>,
/// p9-fb-15: prior turns of the same conversation. Empty for
/// single-shot ask. The pipeline prepends a serialized `[이전
/// 대화]` block to the user prompt and uses the most-recent
@@ -203,6 +241,16 @@ impl RagPipeline {
for h in &mut hits {
h.stale = compute_stale(h.indexed_at, now, stale_threshold_days);
}
// p9-fb-33: emit retrieval_done as soon as the hit list is
// ready (post stale-stamp so consumers see the same `stale`
// values the App-level wire path emits). Cancel is best-effort
// here — if the caller already dropped the receiver we just
// skip and let the LLM-loop SendError handle it consistently.
if let Some(sink) = &opts.stream_sink {
let _ = sink.send(StreamEvent::RetrievalDone {
hits: hits.clone(),
});
}
let chunks_returned = u32::try_from(hits.len()).unwrap_or(u32::MAX);
let top_score = hits.first().map(|h| h.retrieval.fusion_score).unwrap_or(0.0);
@@ -301,16 +349,28 @@ impl RagPipeline {
.llm
.generate_stream(req)
.context("kb-rag: llm.generate_stream")?;
let mut cancelled = false;
for item in stream {
let chunk = item.context("kb-rag: stream item")?;
match chunk {
TokenChunk::Token(t) => {
acc.push_str(&t);
if let Some(sink) = &opts.stream_sink {
// SendError silently dropped — caller cancelled but the
// pipeline still drives generation to completion so the
// `answers` row gets a faithful record.
let _ = sink.send(t);
// p9-fb-33: SendError → caller dropped the
// receiver (probably a closed stdout downstream).
// Stop generation, mark the answer cancelled so
// the persistence path records refusal_reason =
// LlmStreamAborted.
if sink
.send(StreamEvent::Token {
delta: t,
turn_index: opts.turn_index,
})
.is_err()
{
cancelled = true;
break;
}
}
}
TokenChunk::Done {
@@ -323,6 +383,9 @@ impl RagPipeline {
}
}
}
if cancelled {
finish_reason = FinishReason::Cancelled;
}
// ── 6. Citation extract ────────────────────────────────────────────
let extracted: Vec<u32> = extract_markers(&acc);
@@ -347,15 +410,20 @@ impl RagPipeline {
});
let trimmed_answer = acc.trim();
let matched_refusal_phrase = refusal_phrase.is_match(&acc);
let grounded = !trimmed_answer.is_empty()
let grounded_unaware = !trimmed_answer.is_empty()
&& unknown_markers.is_empty()
&& !extracted.is_empty();
let refusal_reason = if grounded {
None
// p9-fb-33: cancel takes priority over LlmSelfJudge — the
// caller bailed mid-stream, so the recorded reason should
// reflect that, not "model didn't cite".
let (grounded, refusal_reason) = if matches!(finish_reason, FinishReason::Cancelled) {
(false, Some(RefusalReason::LlmStreamAborted))
} else if grounded_unaware {
(true, None)
} else {
// Spec §7: empty answer, unknown markers, silent ungrounded,
// and explicit "근거가 부족" all collapse to LlmSelfJudge.
Some(RefusalReason::LlmSelfJudge)
(false, Some(RefusalReason::LlmSelfJudge))
};
// ── 8. Build Answer ────────────────────────────────────────────────
@@ -433,6 +501,17 @@ impl RagPipeline {
"kb-rag: ask done"
);
// p9-fb-33: emit final on the success path. On cancel we
// skip Final — the receiver is gone and persistence still
// records the partial answer below.
if !cancelled
&& let Some(sink) = &opts.stream_sink
{
let _ = sink.send(StreamEvent::Final {
answer: answer.clone(),
});
}
// ── 9. Persist ─────────────────────────────────────────────────────
let packed_chunks_json = if opts.explain {
// Snapshot the packed entries as a portable list of objects so
@@ -997,3 +1076,91 @@ mod compute_stale_mirror_tests {
assert!(!compute_stale(future, now(), 30));
}
}
#[cfg(test)]
mod stream_event_serde_tests {
use super::*;
use kebab_core::{
AnswerRetrievalSummary, ChunkId, ChunkerVersion, Citation,
DocumentId, IndexVersion, ModelRef, RetrievalDetail, SearchHit, SearchMode,
TokenUsage, TraceId,
};
use kebab_core::asset::WorkspacePath;
use kebab_core::versions::PromptTemplateVersion;
use time::macros::datetime;
fn mk_hit() -> SearchHit {
SearchHit {
rank: 1,
chunk_id: ChunkId("c1".into()),
doc_id: DocumentId("d1".into()),
doc_path: WorkspacePath::new("a.md".into()).unwrap(),
heading_path: vec!["H".into()],
section_label: None,
snippet: "s".into(),
citation: Citation::Line {
path: WorkspacePath::new("a.md".into()).unwrap(),
start: 1,
end: 1,
section: None,
},
retrieval: RetrievalDetail {
method: SearchMode::Lexical,
fusion_score: 0.5,
lexical_score: Some(0.5),
vector_score: None,
lexical_rank: Some(1),
vector_rank: None,
},
index_version: IndexVersion("v1".into()),
embedding_model: None,
chunker_version: ChunkerVersion("c@1".into()),
indexed_at: datetime!(2026-05-09 12:00:00 UTC),
stale: false,
}
}
#[test]
fn stream_event_token_serializes_with_kind_discriminator() {
let ev = StreamEvent::Token { delta: "안녕".into(), turn_index: Some(0) };
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v["kind"], "token");
assert_eq!(v["delta"], "안녕");
assert_eq!(v["turn_index"], 0);
}
#[test]
fn stream_event_retrieval_done_serializes_hits() {
let ev = StreamEvent::RetrievalDone { hits: vec![mk_hit()] };
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v["kind"], "retrieval_done");
assert_eq!(v["hits"].as_array().unwrap().len(), 1);
}
#[test]
fn stream_event_final_serializes_answer() {
let answer = Answer {
answer: "x".into(),
citations: vec![],
grounded: true,
refusal_reason: None,
model: ModelRef { id: "m".into(), provider: "p".into(), dimensions: None },
embedding: None,
prompt_template_version: PromptTemplateVersion("rag-v1".into()),
retrieval: AnswerRetrievalSummary {
trace_id: TraceId("t".into()),
mode: SearchMode::Hybrid,
k: 10, score_gate: 0.3, top_score: 0.5,
chunks_returned: 1, chunks_used: 1,
},
usage: TokenUsage { prompt_tokens: 0, completion_tokens: 0, latency_ms: 0 },
created_at: datetime!(2026-05-09 12:00:00 UTC),
conversation_id: None,
turn_index: None,
};
let ev = StreamEvent::Final { answer };
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v["kind"], "final");
assert!(v["answer"].is_object());
}
}

View File

@@ -14,7 +14,7 @@ use kebab_core::{
FinishReason, LanguageModel, Retriever, SearchMode, TokenChunk, TokenUsage,
};
use kebab_llm::MockLanguageModel;
use kebab_rag::{AskOpts, RagPipeline, RefusalReason};
use kebab_rag::{AskOpts, RagPipeline, RefusalReason, StreamEvent};
/// LM ID used everywhere — kept short so snapshots stay stable.
const TEST_LM_ID: &str = "mock-lm";
@@ -270,18 +270,32 @@ fn streaming_forwards_tokens_to_sink() {
let lm: Arc<dyn LanguageModel> = Arc::new(CountingLm::new(canned));
let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone());
let (tx, rx) = std::sync::mpsc::channel::<String>();
let (tx, rx) = std::sync::mpsc::channel::<StreamEvent>();
let mut opts = default_opts();
opts.stream_sink = Some(tx);
let _ = pipeline.ask("q", opts).unwrap();
let collected: String = rx.into_iter().collect::<Vec<_>>().join("");
// p9-fb-33: extract Token deltas from the staged event stream.
let collected: String = rx
.into_iter()
.filter_map(|ev| match ev {
StreamEvent::Token { delta, .. } => Some(delta),
_ => None,
})
.collect::<Vec<_>>()
.join("");
assert_eq!(collected, canned);
}
// ── 10. dropped receiver does NOT abort generation ────────────────────────
// ── 10. dropped receiver aborts generation, records LlmStreamAborted ──────
//
// p9-fb-33: cancel semantics changed. Pre-fb-33 the pipeline drove
// the LM loop to completion and silently dropped sends. Now a
// SendError breaks the loop and stamps `RefusalReason::LlmStreamAborted`
// onto the persisted row — the partial answer (whatever was buffered
// before the cancel) still gets written for audit.
#[test]
fn dropped_receiver_does_not_abort_generation() {
fn dropped_receiver_aborts_with_llm_stream_aborted() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
@@ -292,13 +306,17 @@ fn dropped_receiver_does_not_abort_generation() {
let lm: Arc<dyn LanguageModel> = Arc::new(CountingLm::new(canned));
let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone());
let (tx, rx) = std::sync::mpsc::channel::<String>();
drop(rx); // receiver gone — every send fails silently
let (tx, rx) = std::sync::mpsc::channel::<StreamEvent>();
drop(rx); // receiver gone — first Token send fails, loop breaks
let mut opts = default_opts();
opts.stream_sink = Some(tx);
let answer = pipeline.ask("q", opts).unwrap();
assert_eq!(answer.answer, canned, "generation completes despite dead sink");
assert!(answer.grounded);
assert!(!answer.grounded, "cancel takes priority over grounded");
assert_eq!(
answer.refusal_reason,
Some(RefusalReason::LlmStreamAborted),
"cancel records LlmStreamAborted",
);
assert_eq!(env.count_answers(), 1, "answers row still persisted");
}

View File

@@ -0,0 +1,217 @@
//! p9-fb-33: pipeline-level streaming behavior — order invariants,
//! cancel propagation, refusal flagging.
mod common;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use common::{MockRetriever, RagEnv, id32, mk_hit};
use kebab_core::{
FinishReason, LanguageModel, RefusalReason, Retriever, SearchMode, TokenChunk, TokenUsage,
};
use kebab_llm::MockLanguageModel;
use kebab_rag::{AskOpts, RagPipeline, StreamEvent};
const TEST_LM_ID: &str = "mock-lm";
/// Minimal LM mirroring `tests/pipeline.rs::CountingLm` so the
/// streaming-events suite stays self-contained.
struct CountingLm {
inner: MockLanguageModel,
calls: std::sync::atomic::AtomicUsize,
}
impl CountingLm {
fn new(canned: &str) -> Self {
Self {
inner: MockLanguageModel {
model_id: TEST_LM_ID.to_string(),
provider: "mock".to_string(),
context_tokens: 32_768,
canned_response: canned.to_string(),
canned_finish: FinishReason::Stop,
canned_usage: TokenUsage {
prompt_tokens: 10,
completion_tokens: 5,
latency_ms: 7,
},
},
calls: std::sync::atomic::AtomicUsize::new(0),
}
}
}
impl LanguageModel for CountingLm {
fn model_ref(&self) -> kebab_core::ModelRef {
self.inner.model_ref()
}
fn context_tokens(&self) -> usize {
self.inner.context_tokens()
}
fn generate_stream(
&self,
req: kebab_core::GenerateRequest,
) -> anyhow::Result<Box<dyn Iterator<Item = anyhow::Result<TokenChunk>> + Send>> {
self.calls.fetch_add(1, Ordering::SeqCst);
self.inner.generate_stream(req)
}
}
fn opts_with_sink(tx: mpsc::Sender<StreamEvent>) -> AskOpts {
AskOpts {
k: 3,
explain: false,
mode: SearchMode::Lexical,
temperature: Some(0.0),
seed: Some(0),
stream_sink: Some(tx),
history: Vec::new(),
conversation_id: None,
turn_index: None,
}
}
/// Build a pipeline with one seeded chunk + canned LM response so
/// retrieval lands a single hit and the LM emits at least one token.
fn env_with_one_hit(canned: &str) -> (RagEnv, RagPipeline) {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "apples are red.", &["Intro"]);
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever: Arc<dyn Retriever> = Arc::new(MockRetriever::new(hits));
let lm: Arc<dyn LanguageModel> = Arc::new(CountingLm::new(canned));
let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone());
(env, pipeline)
}
#[test]
fn ask_emits_retrieval_then_tokens_then_final() {
let (_env, pipeline) = env_with_one_hit("apples are red. [#1]");
let (tx, rx) = mpsc::channel::<StreamEvent>();
let _ans = pipeline.ask("apples", opts_with_sink(tx)).unwrap();
let events: Vec<StreamEvent> = rx.iter().collect();
// First event must be RetrievalDone.
assert!(
matches!(events.first(), Some(StreamEvent::RetrievalDone { .. })),
"first event must be RetrievalDone, got {:?}",
events.first()
);
// Last event must be Final.
assert!(
matches!(events.last(), Some(StreamEvent::Final { .. })),
"last event must be Final, got {:?}",
events.last()
);
// Everything in between is Token.
for ev in &events[1..events.len() - 1] {
assert!(
matches!(ev, StreamEvent::Token { .. }),
"middle events must be Token, got {ev:?}"
);
}
}
#[test]
fn ask_records_llm_stream_aborted_when_receiver_drops() {
let (env, pipeline) = env_with_one_hit("apples are red. [#1]");
let (tx, rx) = mpsc::channel::<StreamEvent>();
// Drop the receiver immediately so the first Token send fails.
drop(rx);
let ans = pipeline.ask("apples", opts_with_sink(tx)).unwrap();
assert!(!ans.grounded);
assert_eq!(ans.refusal_reason, Some(RefusalReason::LlmStreamAborted));
// Persistence still happens on cancel — the row is the audit trail.
assert_eq!(env.count_answers(), 1, "answers row written on cancel");
}
/// p9-fb-33 (PR #124 round 1, item 5): pin the "no Final on cancel"
/// invariant. Uses a barrier-gated LM so the test can observe the
/// `RetrievalDone` event before any `Token`/`Final` lands in the
/// channel — then drops `rx` to force SendError on the next `Token`.
/// The pipeline's cancel branch must avoid emitting `Final` and
/// record `RefusalReason::LlmStreamAborted`.
struct BlockingLm {
inner: MockLanguageModel,
/// Pipeline thread waits on this before yielding any token.
/// Test thread releases it after observing `RetrievalDone`.
gate: Arc<std::sync::Barrier>,
}
impl LanguageModel for BlockingLm {
fn model_ref(&self) -> kebab_core::ModelRef {
self.inner.model_ref()
}
fn context_tokens(&self) -> usize {
self.inner.context_tokens()
}
fn generate_stream(
&self,
req: kebab_core::GenerateRequest,
) -> anyhow::Result<Box<dyn Iterator<Item = anyhow::Result<TokenChunk>> + Send>> {
// Block until the test signals — guarantees `RetrievalDone`
// arrives at the receiver before any `Token` is queued.
self.gate.wait();
self.inner.generate_stream(req)
}
}
#[test]
fn ask_emits_no_final_when_cancelled_mid_stream() {
use std::sync::Barrier;
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "apples are red.", &["Intro"]);
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever: Arc<dyn Retriever> = Arc::new(MockRetriever::new(hits));
let gate = Arc::new(Barrier::new(2));
let lm: Arc<dyn LanguageModel> = Arc::new(BlockingLm {
inner: MockLanguageModel {
model_id: TEST_LM_ID.to_string(),
provider: "mock".to_string(),
context_tokens: 32_768,
canned_response: "apples are red. [#1]".to_string(),
canned_finish: FinishReason::Stop,
canned_usage: TokenUsage {
prompt_tokens: 10,
completion_tokens: 5,
latency_ms: 7,
},
},
gate: Arc::clone(&gate),
});
let pipeline = RagPipeline::new(env.config.clone(), retriever, lm, env.sqlite.clone());
let (tx, rx) = mpsc::channel::<StreamEvent>();
let opts = opts_with_sink(tx);
let handle = std::thread::spawn(move || pipeline.ask("apples", opts));
// Receive RetrievalDone first — pipeline emits this before
// calling generate_stream (where the LM blocks on the gate).
let first = rx.recv().expect("RetrievalDone must arrive");
assert!(
matches!(first, StreamEvent::RetrievalDone { .. }),
"first event must be RetrievalDone, got {first:?}",
);
// Drop rx now, BEFORE releasing the gate. Once the LM unblocks
// and the pipeline tries to send the first Token, it'll get
// SendError → cancel branch.
drop(rx);
gate.wait();
let ans = handle.join().expect("ask thread").unwrap();
// Cancel was observed: no Final emitted, refusal recorded.
assert!(!ans.grounded);
assert_eq!(ans.refusal_reason, Some(RefusalReason::LlmStreamAborted));
assert_eq!(env.count_answers(), 1, "answers row written on cancel");
}

View File

@@ -186,9 +186,12 @@ impl Default for SearchState {
/// Ask pane state — owned by p9-3, extended by p9-fb-16 for
/// multi-turn conversation transcript.
///
/// The worker thread (`thread`) owns the `mpsc::Sender<String>` that
/// `kebab-app::ask` writes tokens into. The pane keeps the matching
/// `rx` and drains it once per render frame (no blocking).
/// The worker thread (`thread`) owns the `mpsc::Sender<kebab_app::StreamEvent>`
/// that `kebab-app::ask` writes events into. The pane keeps the matching
/// `rx` and drains it once per render frame (no blocking). Only the
/// `Token { delta }` variant is consumed for the streaming transcript;
/// `RetrievalDone` and `Final` are ignored (citations render from
/// `last_answer` after the worker join).
///
/// p9-fb-16: completed `Turn`s accumulate in `turns`; the worker
/// passes a snapshot of `turns` as `history` to
@@ -214,7 +217,7 @@ pub struct AskState {
pub thread: Option<std::thread::JoinHandle<anyhow::Result<kebab_core::Answer>>>,
/// Token receiver paired with the worker's `Sender`. Drained
/// every render frame.
pub rx: Option<std::sync::mpsc::Receiver<String>>,
pub rx: Option<std::sync::mpsc::Receiver<kebab_app::StreamEvent>>,
/// Vertical scroll offset for the transcript area when content
/// exceeds the viewport. Only consulted when `follow_tail` is
/// false; otherwise the renderer overrides this with the

View File

@@ -483,7 +483,7 @@ pub fn handle_key_ask(state: &mut App, key: KeyEvent) -> KeyOutcome {
}
fn spawn_ask_worker(state: &mut App) {
let (tx, rx) = mpsc::channel::<String>();
let (tx, rx) = mpsc::channel::<kebab_app::StreamEvent>();
let cfg = state.config.clone();
let s = state.ask.as_mut().unwrap();
// p9-fb-10: take() consumes the input in one step (no clone +
@@ -542,8 +542,18 @@ fn make_conversation_id() -> String {
pub(crate) fn drain_stream(state: &mut App) {
let Some(s) = state.ask.as_mut() else { return };
if let Some(rx) = &s.rx {
for tok in rx.try_iter() {
s.partial.push_str(&tok);
for ev in rx.try_iter() {
match ev {
kebab_app::StreamEvent::Token { delta, .. } => {
s.partial.push_str(&delta);
}
// p9-fb-33: TUI ignores RetrievalDone (citation
// panel renders after completion via `last_answer`)
// and Final (the worker thread's join already
// delivers the canonical Answer in poll_worker).
kebab_app::StreamEvent::RetrievalDone { .. }
| kebab_app::StreamEvent::Final { .. } => {}
}
}
}
}

View File

@@ -142,6 +142,16 @@ A 30-day default flags docs that haven't been touched in a month — the
intent is to nudge a reingest before relying on the snapshot. Set to `0`
to disable.
### Streaming ask (fb-33)
```bash
kebab ask "what is rust ownership" --stream 2> events.ndjson > final.json
```
stderr 의 events.ndjson 은 한 줄 = 한 event 의 ndjson — `retrieval_done` 한 번, `token` 여러 번, `final` 한 번 (refusal 경로는 `final` 생략). final.json 은 기존 `answer.v1` 그대로 (backwards-compat).
agent 가 stderr 를 닫으면 (`head -c 1` 등) pipeline 이 LLM stream 을 즉시 중단하고 `RefusalReason::LlmStreamAborted` 로 partial answer 를 `answers` 테이블에 기록.
## P6-4 이미지 ingestion 옵션
`config.toml` 에 다음 절을 추가하면 `kebab ingest``**/*.png` / `**/*.jpg` 등 이미지 자산도 함께 색인합니다 (텍스트만 색인하려면 생략):

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,253 @@
---
title: "p9-fb-33 — Streaming ask (ndjson delta) design"
phase: P9
component: kebab-rag + kebab-cli + kebab-tui + wire-schema
task_id: p9-fb-33
status: design
target_version: 0.5.0
contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md
contract_sections: [§7 RAG, §10 UX, wire-schema answer.v1]
date: 2026-05-09
---
# p9-fb-33 — Streaming ask (ndjson delta)
## Goal
`kebab ask --stream` — agent 가 LLM token 을 도착 즉시 소비할 수 있도록 retrieval / token / final 세 단계 ndjson event 를 stderr 에 흘리고, 마지막 stdout 한 줄은 기존 `answer.v1` 그대로 유지. CLI surface 우선, MCP `kebab__ask` streaming 은 v0.5+ 별도 검토 (이 spec 의 scope 아님).
## Behavior contract
### Stream event taxonomy
3 variant 로 confined. `kind` discriminator + `ts` 타임스탬프 + variant 별 페이로드.
1. **`retrieval_done`** — pipeline 의 retrieve + stale-stamp 직후 1회. 페이로드는 `hits: search_hit.v1[]` (fb-32 의 `indexed_at` / `stale` 포함).
2. **`token`** — LLM 의 `TokenChunk::Token` 매 도착 시. 페이로드는 `delta: string` + `turn_index: integer | null` (multi-turn ask 의 `Answer.turn_index` 와 일치).
3. **`final`** — 모든 token 수신 + citation extract / validate 완료 후 1회. 페이로드는 `answer: answer.v1` (스키마 v1 통째).
terminal event = `final`. 모든 ask 는 `final` 또는 (cancel 경로) 0개 event 로 끝남 — 후자는 ndjson 흐름이 중간에 끊긴 형태.
### CLI flag
`kebab ask --stream` (boolean flag, default off). `--json` 와 독립:
| flag 조합 | stderr | stdout |
|----------|--------|--------|
| (없음) | (없음) | plain text answer + 근거 블록 |
| `--json` | (없음) | `answer.v1` 1회 |
| `--stream` | ndjson `answer_event.v1` events | `answer.v1` 1회 (final stdout line) |
| `--stream --json` | 동일 (stream 이 dominant) | 동일 |
backwards-compat: `--stream` 미사용 시 모든 동작 보존.
### Output stream
- ndjson event → **stderr**. 매 줄 한 event, `serde_json::to_string` + `writeln!`.
- final `answer.v1`**stdout**. 기존 final-only consumer 가 stdout 만 파싱해도 호환.
- 선례: `ingest_progress.v1` 가 stderr ndjson + stdout `ingest_report.v1` final 패턴 사용.
### Cancel semantics
`kebab ask --stream` 의 stdout/stderr 가 외부에서 닫힘 (예: agent 가 SIGPIPE / `head -c 1` / connection close):
1. CLI main thread 의 `writeln!(stderr, ...)``io::ErrorKind::BrokenPipe` 반환.
2. CLI 가 receiver 폐기 (rx drop).
3. background thread 의 `pipeline.ask``stream_sink.send(StreamEvent::Token { .. })``SendError` 반환.
4. pipeline 의 token loop — 현재 `let _ = sink.send(t)` 로 swallow 하지만 본 task 에서 cancel 분기 추가: `SendError` 감지 시 LLM stream `break`, `finish_reason = FinishReason::Cancelled`, `RefusalReason::LlmStreamAborted` 로 Answer 채움, `answers` 테이블에 partial answer + cancel 사유 기록.
5. CLI background thread join → cancel 사유 명시한 Answer return → CLI 종료. stdout 은 이미 닫혀 final answer.v1 출력 시도해도 BrokenPipe 무시.
`io::ErrorKind::BrokenPipe` 만 cancel 처리. 그 외 IoError 는 fatal — `error.v1` stderr emit + exit 2.
### Wire schema delta
신규 `docs/wire-schema/v1/answer_event.schema.json`:
```json
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://kb.local/wire/v1/answer_event.schema.json",
"title": "AnswerEvent v1",
"description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr. Discriminated by `kind`. Terminal: `final`. Final stdout line is `answer.v1` for backwards compat.",
"type": "object",
"required": ["schema_version", "kind", "ts"],
"properties": {
"schema_version": { "const": "answer_event.v1" },
"kind": { "enum": ["retrieval_done", "token", "final"] },
"ts": { "type": "string", "format": "date-time" },
"hits": { "type": "array", "description": "retrieval_done: search_hit.v1[]" },
"delta": { "type": "string", "description": "token: incremental string chunk" },
"turn_index": { "type": ["integer", "null"], "minimum": 0,
"description": "token: matches Answer.turn_index" },
"answer": { "type": "object", "description": "final: complete answer.v1 payload" }
}
}
```
기존 `answer.v1` / `search_hit.v1` / `citation.v1` 변경 없음.
### Domain API change
`kebab-rag::pipeline`:
```rust
#[derive(Clone, Debug)]
pub enum StreamEvent {
RetrievalDone { hits: Vec<SearchHit> },
Token { delta: String, turn_index: Option<u32> },
Final { answer: Answer },
}
pub struct AskOpts {
// ... 기존 필드
/// p9-fb-33: was `Option<Sender<String>>`. Now carries discriminated
/// events so callers can distinguish retrieval / per-token / final.
pub stream_sink: Option<std::sync::mpsc::Sender<StreamEvent>>,
}
```
- internal API breaking. consumer = TUI worker + (없을 시) MCP. TUI 만 갱신.
- non-streaming consumer (`stream_sink: None`) 는 무영향.
## Allowed / forbidden dependencies
각 crate 기존 deps 유지. `mpsc::Sender` 는 std. 신규 dep 없음.
- `kebab-core``StreamEvent` 정의 안 함 (도메인 type 가 wire 변환과 분리되어 있고, StreamEvent 는 pipeline 의 communication channel — kebab-rag 안 위치 적절).
- `kebab-cli` 는 wire 변환 코드 (`wire::wire_answer_event(&StreamEvent) -> Value`) 추가 — `kebab-cli/src/wire.rs` 의 기존 패턴 따라.
- UI crate (kebab-tui) 가 직접 retriever / store 호출 X — `kebab-app` facade 통과만.
## Components
### kebab-rag::pipeline
- `enum StreamEvent` 신규 정의 (`pub`).
- `AskOpts.stream_sink` 타입 변경.
- `RagPipeline::ask`:
- retrieve + stale-stamp 직후 `if let Some(sink) = &opts.stream_sink { let _ = sink.send(StreamEvent::RetrievalDone { hits: hits.clone() }); }` 발사. cancel 시 즉시 break out (이때는 LLM 도 안 부름).
- token loop: `sink.send(StreamEvent::Token { delta: t, turn_index: opts.turn_index })`. SendError → cancel 분기.
- 끝에서 `Final { answer: built_answer.clone() }` 발사.
- cancel 분기:
```rust
// p9-fb-33: SendError → caller (CLI) closed the receiver,
// probably due to BrokenPipe on stdout. Stop generation, mark
// refusal, persist partial answer.
if matches!(send_result, Err(_)) {
finish_reason = FinishReason::Cancelled;
break;
}
```
finish_reason = Cancelled 일 때 grounded=false + RefusalReason::LlmStreamAborted.
### kebab-app
- `AskOpts` re-export 만 (이미 public). `StreamEvent` 도 `pub use`.
- `App::ask` / `ask_with_session` 변경 없음 (opts 통과).
### kebab-cli
- `Cmd::Ask` 에 `#[arg(long)] stream: bool` 추가.
- `--stream` 분기:
```rust
if cli.json && !stream || !cli.json && !stream {
// 기존 final-only path
} else if stream {
let (tx, rx) = std::sync::mpsc::channel::<StreamEvent>();
let cfg2 = cfg.clone();
let q = query.clone();
let opts2 = AskOpts { stream_sink: Some(tx), ..opts };
let handle = std::thread::spawn(move || {
kebab_app::ask_with_config(cfg2, &q, opts2)
});
let mut stderr = std::io::stderr().lock();
let mut cancelled = false;
for ev in rx {
let v = wire::wire_answer_event(&ev);
let line = serde_json::to_string(&v)?;
if let Err(e) = writeln!(stderr, "{line}") {
if e.kind() == std::io::ErrorKind::BrokenPipe {
cancelled = true;
break;
}
return Err(e.into());
}
}
drop(stderr);
let result = handle.join().expect("ask thread panic");
let ans = result?;
// final stdout line
let mut stdout = std::io::stdout().lock();
let _ = writeln!(stdout, "{}", serde_json::to_string(&wire::wire_answer(&ans))?);
// cancel 또는 refusal 시 exit 1
if !ans.grounded { return Err(RefusalSignal.into()); }
Ok(())
}
```
- `wire::wire_answer_event(&StreamEvent) -> Value` 추가 — discriminated by variant, schema_version 태그.
### kebab-tui
- ask worker 가 받던 `Sender<String>` → `Sender<StreamEvent>`.
- worker thread 의 receive loop:
- `StreamEvent::Token { delta, .. }` → 기존 token 누적 path 그대로.
- `StreamEvent::RetrievalDone { hits }` → minimal 안에선 ignore (citation 은 final 도착 후 표시 — fb-22 에서 살펴봄).
- `StreamEvent::Final { answer }` → 이미 `App::ask` return 으로 받으므로 무시 가능 (또는 sanity check).
- snapshot 영향 없음 (token concat 결과 동일).
### kebab-mcp
변경 없음. `stream_sink: None` 유지. 향후 v0.5+ 에서 rmcp progress notification 채택 검토.
## Test plan
| kind | description |
|------|-------------|
| unit (kebab-rag) | `StreamEvent` serde round-trip — RetrievalDone / Token / Final 각각 한 줄 ndjson |
| unit (kebab-rag) | pipeline.ask + MockLm + sink: 발사 순서 = `RetrievalDone` 1회 → `Token`* → `Final` 1회 |
| unit (kebab-rag) | sink SendError (rx drop) → LLM loop 즉시 break + Answer.refusal_reason = `LlmStreamAborted` + answers row 기록 |
| unit (kebab-rag) | RetrievalDone 의 hits 가 Final.answer.citations 의 부분집합 (LLM 이 마커 안 쓴 hit 도 RetrievalDone 에 포함) |
| 통합 (kebab-cli) | `kebab ask --stream` stderr 가 valid ndjson — schema_version/kind/ts 모두 정상 |
| 통합 (kebab-cli) | `kebab ask --stream --json` stdout 마지막 줄이 `answer.v1` 통째 |
| 통합 (kebab-cli) | `kebab ask --json` (no --stream) 동작 무변경 — stdout final-only |
| 통합 (kebab-cli) | stdout 닫힘 시뮬 (`kebab ask --stream | head -c 1`) → process 정상 종료 + answers row 의 refusal_reason = LlmStreamAborted |
| 통합 (wire-schema) | answer_event.schema.json validate — RetrievalDone/Token/Final 샘플 |
| 통합 (kebab-tui) | 기존 ask snapshot 모두 통과 (token concat 결과 동일) |
LLM 의존: pipeline unit test 는 MockLm 활용 (이미 `crates/kebab-rag/tests/common/mod.rs` 의 `CountingLm` 패턴). CLI 통합 test 는 Ollama 필요 → `#[ignore]` gate.
## Implementation steps (high-level)
1. wire schema 신규 `answer_event.schema.json`.
2. `kebab-rag::pipeline::StreamEvent` enum 정의 + `AskOpts.stream_sink` 타입 변경.
3. `RagPipeline::ask`:
- RetrievalDone 발사 추가.
- token loop sink.send 의 SendError → cancel 분기.
- Final 발사 추가.
4. `kebab-app` re-exports 갱신.
5. `kebab-tui` worker 의 `Sender<String>` → `Sender<StreamEvent>` 변환.
6. `kebab-cli`:
- `--stream` flag.
- `wire::wire_answer_event` 헬퍼.
- background thread + main thread receive loop.
7. 단위 + 통합 테스트.
8. README + SMOKE — `--stream` 사용 예시.
9. tasks/INDEX.md / spec status flip.
10. `integrations/claude-code/kebab/SKILL.md` — agent 가 ndjson stream 을 어떻게 소비하는지 한 단락.
## Risks / notes
- **TUI sink 타입 breaking**: 1 곳만 수정. 기존 token 누적 path 는 `StreamEvent::Token { delta, .. }` 만 매치하면 동일 동작. snapshot 영향 없음.
- **`Final` event 의 Answer clone**: streaming path 만 부담. non-streaming caller 무영향.
- **BrokenPipe vs 일반 IoError**: `io::ErrorKind::BrokenPipe` 만 cancel. 그 외는 `error.v1` stderr emit + exit 2.
- **ndjson 줄 단위**: serde_json::to_string + writeln! 충분. embedded newline 은 serde 가 escape.
- **partial markdown safety**: out of scope. agent 책임.
- **multi-turn token_index**: streaming 과 fb-15 multi-turn 의 상호작용. 새 turn 마다 streaming 재시작이 자연스러움 (`Token.turn_index` 가 각 ask 호출 단위로 일관).
## Documentation updates (implementation PR 동시)
- `README.md` — Quick start 또는 명령 표에 `--stream` 한 줄.
- `docs/SMOKE.md` — `kebab ask --stream` walkthrough (실행 예시 + agent 가 stderr 파싱하는 패턴 한 단락).
- `tasks/p9/p9-fb-33-streaming-ask.md` — `status: open → completed`, design/plan 링크 추가.
- `tasks/INDEX.md` — fb-33 행 ✅ 표시.
- `integrations/claude-code/kebab/SKILL.md` — `--stream` 멘션 (CLI fallback 섹션).
- `tasks/HOTFIXES.md` — internal API breaking (AskOpts.stream_sink 타입 변경) 결정 로그 (선택, 머지 후 의문 발생 시).

View File

@@ -0,0 +1,17 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://kb.local/wire/v1/answer_event.schema.json",
"title": "AnswerEvent v1",
"description": "Streaming event emitted by `kebab ask --stream`. One event per line on stderr (ndjson). Discriminated by `kind`. The success path closes with `final`; refusal paths (score_gate / no_chunks) emit only `retrieval_done` and rely on the stdout `answer.v1` line for the canonical signal. Cancel paths (BrokenPipe) may emit any prefix and stop. Final stdout line is always `answer.v1` for backwards compat (see ingest_progress.v1 precedent).",
"type": "object",
"required": ["schema_version", "kind", "ts"],
"properties": {
"schema_version": { "const": "answer_event.v1" },
"kind": { "enum": ["retrieval_done", "token", "final"] },
"ts": { "type": "string", "format": "date-time" },
"hits": { "type": "array", "description": "retrieval_done: search_hit.v1[]" },
"delta": { "type": "string", "description": "token: incremental string chunk" },
"turn_index": { "type": ["integer", "null"], "minimum": 0, "description": "token: matches Answer.turn_index" },
"answer": { "type": "object", "description": "final: complete answer.v1 payload" }
}
}

View File

@@ -75,10 +75,13 @@ If MCP tools aren't in scope (host without MCP support, or `mcp.json` not config
kebab search "<query>" --mode hybrid --json 2>/dev/null
kebab ask "<question>" --json 2>/dev/null
kebab ask "<question>" --session <stable-id> --json 2>/dev/null
kebab ask "<question>" --stream # ndjson answer_event.v1 on stderr, final answer.v1 on stdout
```
Same wire shapes as MCP. CLI pays cold start (~1-2s) per call — prefer MCP when available.
`--stream` (p9-fb-33) emits `retrieval_done``token`* → `final` events on stderr while the answer streams; the final stdout line is the standard `answer.v1` for backwards compat. Use when you need progressive token consumption; otherwise the default non-streaming path is simpler. Refusal paths (score-gate / no-chunks) emit `retrieval_done` then no `token`/`final` — read stdout `answer.v1` for the canonical refusal signal.
## MCP host config
Register `kebab mcp` once in your host's MCP config. For Claude Code, edit `~/.claude/mcp.json`:

View File

@@ -14,6 +14,19 @@ historical contract that was implemented; this file accumulates the
deltas so phase 5+ readers can find the live behavior without diffing
git history.
## 2026-05-09 — p9-fb-33: AskOpts.stream_sink type widened to StreamEvent
**무엇이 바뀌었나**: `kebab_rag::AskOpts.stream_sink` 의 타입이 `Option<mpsc::Sender<String>>` 에서 `Option<mpsc::Sender<StreamEvent>>` 로 변경됨. `kebab_app::StreamEvent` 가 새 re-export.
**Spec contract 와의 관계**: `answer_event.v1` (신규 wire schema) 가 단일 sink 로 3 stage (retrieval_done / token / final) 를 운반하도록 강제하면서 자연스럽게 in-process sink 의 type 폭이 넓어진 부산물. spec `docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md` 의 "Domain API change" 절에서 미리 명시. consumer = TUI worker 한 곳 (이번 PR 에서 같이 갱신). 외부 consumer 없음.
**의식적 결정**:
- single sink 로 retrieval / token / final 세 stage 를 모두 운반하기 위한 필수 타입 변경.
- 기존 `Sender<String>` 으로는 retrieval / final 단계를 표현할 방법이 없음.
- internal API 라 wire schema 와 다름 — `answer_event.v1` 는 신규 schema (additive minor at wire layer).
**영향 받는 consumer**: `kebab-tui::ask::spawn_ask_worker` (PR #124 에서 동시 갱신). 외부 통합 없음.
## 2026-05-09 — p9-fb-32: search_hit.v1 / citation.v1 required-field expansion
**무엇이 바뀌었나**: `search_hit.v1``citation.v1``required` 배열에 `indexed_at` (RFC3339) + `stale` (bool) 두 필드가 추가됨. `schema_version` 은 그대로 (`search_hit.v1` / `citation.v1`).

View File

@@ -121,7 +121,7 @@ P0~P5 는 직렬. P6~P9 는 P5 이후 병렬 가능.
### 🎯 0.4.0 — agent surface refinement (additive only)
- [p9-fb-32 stale doc indicator](p9/p9-fb-32-stale-doc-indicator.md) — ✅ 머지 + v0.4.0 cut 후보 (2026-05-09)
- [p9-fb-33 streaming ask (ndjson delta)](p9/p9-fb-33-streaming-ask.md) — ⏳ 미구현, brainstorm 필요
- [p9-fb-33 streaming ask (ndjson delta)](p9/p9-fb-33-streaming-ask.md) — ✅ 머지 + v0.5.0 cut 후보 (2026-05-09)
- [p9-fb-34 output budget controls](p9/p9-fb-34-output-budget-controls.md) — ⏳ 미구현, brainstorm 필요
- [p9-fb-35 verbatim fetch](p9/p9-fb-35-verbatim-fetch.md) — ⏳ 미구현, brainstorm 필요
- [p9-fb-36 search filter args](p9/p9-fb-36-search-filters.md) — ⏳ 미구현, brainstorm 필요

View File

@@ -3,8 +3,8 @@ phase: P9
component: kebab-cli + kebab-app + wire-schema
task_id: p9-fb-33
title: "Streaming ask (ndjson delta) — agent token 즉시 소비"
status: open
target_version: 0.4.0
status: completed
target_version: 0.5.0
depends_on: []
unblocks: []
contract_source: ../../docs/superpowers/specs/2026-04-27-kebab-final-form-design.md
@@ -14,7 +14,10 @@ source_feedback: 사용자 도그푸딩 2026-05-06 — agent 가 token 도착
# p9-fb-33 — Streaming ask (ndjson delta)
> **백로그 only — 미구현.** 본 spec 은 도그푸딩 피드백 skeleton. 구현 착수 전 [superpowers:brainstorming](../../docs/superpowers/) 으로 설계 단계 선행 필요. delta event 형식 / final-only fallback / TUI vs CLI 차이 brainstorm 후 확정.
> **구현 완료.** 본 spec 은 구현 시점의 frozen 상태. post-merge deviation 은 [HOTFIXES.md](../HOTFIXES.md) 참조 — live source of truth.
상세 설계: `docs/superpowers/specs/2026-05-09-p9-fb-33-streaming-ask-design.md`.
구현 계획: `docs/superpowers/plans/2026-05-09-p9-fb-33-streaming-ask.md`.
## 증상 / 동기