feat(rag): StreamEvent enum + switch AskOpts.stream_sink (fb-33)

3-variant discriminated enum (RetrievalDone / Token / Final).
AskOpts.stream_sink now carries StreamEvent. Other crates fail
to compile until subsequent tasks adapt their call sites.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
th-kim0823
2026-05-09 14:38:54 +09:00
parent 0ca9b1d5c3
commit 31475f0312

View File

@@ -67,6 +67,35 @@ struct PackedCitation {
/// prompt section the LLM will see (system + query + packed context).
type PackedContext = (String, Vec<PackedCitation>, usize);
/// p9-fb-33: streaming events the pipeline forwards into
/// [`AskOpts::stream_sink`] when present. Discriminated on `kind`
/// to match the wire `answer_event.v1` schema. Three variants:
///
/// - `RetrievalDone` — emitted once after retrieval + stale-stamp.
/// - `Token` — emitted per `TokenChunk::Token` from the LM.
/// - `Final` — emitted once after the full Answer is built (before
/// persistence). Always the terminal event on the success path.
///
/// On caller-side cancel (receiver dropped), the pipeline observes
/// the `SendError` from the next `Token` send and breaks the LM
/// loop — see `RagPipeline::ask` cancel branch. In that case
/// `Final` is NOT emitted (the answer still gets persisted with
/// `RefusalReason::LlmStreamAborted`).
#[derive(Clone, Debug, serde::Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum StreamEvent {
RetrievalDone {
hits: Vec<SearchHit>,
},
Token {
delta: String,
turn_index: Option<u32>,
},
Final {
answer: Answer,
},
}
// ── AskOpts ─────────────────────────────────────────────────────────────────
/// Caller-supplied knobs for one [`RagPipeline::ask`] invocation.
@@ -92,11 +121,10 @@ pub struct AskOpts {
pub temperature: Option<f32>,
/// Override `config.models.llm.seed` for this call.
pub seed: Option<u64>,
/// Optional sink: every `TokenChunk::Token` produced by the LM is
/// forwarded synchronously. A dropped receiver does NOT abort the
/// pipeline — `SendError` is silently swallowed and generation
/// continues so the `Answer` row still gets persisted.
pub stream_sink: Option<std::sync::mpsc::Sender<String>>,
/// Optional sink: every staged event (`RetrievalDone`, `Token`,
/// `Final`) is forwarded synchronously. A dropped receiver
/// triggers cancel — see `RagPipeline::ask` for the break path.
pub stream_sink: Option<std::sync::mpsc::Sender<StreamEvent>>,
/// p9-fb-15: prior turns of the same conversation. Empty for
/// single-shot ask. The pipeline prepends a serialized `[이전
/// 대화]` block to the user prompt and uses the most-recent
@@ -997,3 +1025,91 @@ mod compute_stale_mirror_tests {
assert!(!compute_stale(future, now(), 30));
}
}
#[cfg(test)]
mod stream_event_serde_tests {
use super::*;
use kebab_core::{
AnswerRetrievalSummary, ChunkId, ChunkerVersion, Citation,
DocumentId, IndexVersion, ModelRef, RetrievalDetail, SearchHit, SearchMode,
TokenUsage, TraceId,
};
use kebab_core::asset::WorkspacePath;
use kebab_core::versions::PromptTemplateVersion;
use time::macros::datetime;
fn mk_hit() -> SearchHit {
SearchHit {
rank: 1,
chunk_id: ChunkId("c1".into()),
doc_id: DocumentId("d1".into()),
doc_path: WorkspacePath::new("a.md".into()).unwrap(),
heading_path: vec!["H".into()],
section_label: None,
snippet: "s".into(),
citation: Citation::Line {
path: WorkspacePath::new("a.md".into()).unwrap(),
start: 1,
end: 1,
section: None,
},
retrieval: RetrievalDetail {
method: SearchMode::Lexical,
fusion_score: 0.5,
lexical_score: Some(0.5),
vector_score: None,
lexical_rank: Some(1),
vector_rank: None,
},
index_version: IndexVersion("v1".into()),
embedding_model: None,
chunker_version: ChunkerVersion("c@1".into()),
indexed_at: datetime!(2026-05-09 12:00:00 UTC),
stale: false,
}
}
#[test]
fn stream_event_token_serializes_with_kind_discriminator() {
let ev = StreamEvent::Token { delta: "안녕".into(), turn_index: Some(0) };
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v["kind"], "token");
assert_eq!(v["delta"], "안녕");
assert_eq!(v["turn_index"], 0);
}
#[test]
fn stream_event_retrieval_done_serializes_hits() {
let ev = StreamEvent::RetrievalDone { hits: vec![mk_hit()] };
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v["kind"], "retrieval_done");
assert_eq!(v["hits"].as_array().unwrap().len(), 1);
}
#[test]
fn stream_event_final_serializes_answer() {
let answer = Answer {
answer: "x".into(),
citations: vec![],
grounded: true,
refusal_reason: None,
model: ModelRef { id: "m".into(), provider: "p".into(), dimensions: None },
embedding: None,
prompt_template_version: PromptTemplateVersion("rag-v1".into()),
retrieval: AnswerRetrievalSummary {
trace_id: TraceId("t".into()),
mode: SearchMode::Hybrid,
k: 10, score_gate: 0.3, top_score: 0.5,
chunks_returned: 1, chunks_used: 1,
},
usage: TokenUsage { prompt_tokens: 0, completion_tokens: 0, latency_ms: 0 },
created_at: datetime!(2026-05-09 12:00:00 UTC),
conversation_id: None,
turn_index: None,
};
let ev = StreamEvent::Final { answer };
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v["kind"], "final");
assert!(v["answer"].is_object());
}
}