diff --git a/crates/kebab-config/src/lib.rs b/crates/kebab-config/src/lib.rs index 8e3c04e..862f347 100644 --- a/crates/kebab-config/src/lib.rs +++ b/crates/kebab-config/src/lib.rs @@ -189,9 +189,13 @@ pub struct RagCfg { #[serde(default = "default_multi_hop_max_depth")] pub multi_hop_max_depth: u32, /// p9-fb-41: cap on how many sub-queries the LLM may emit in a - /// single decompose / decide call. Mirrors - /// [`MULTI_HOP_MAX_SUB_QUERIES_DEFAULT`] in kebab-rag — the - /// const is the hard floor while this is the runtime knob. + /// single decompose / decide call. This is the *prompt-side + /// soft hint* — the value the pipeline injects into the + /// decompose / decide prompts so the LLM knows what to aim for. + /// kebab-rag enforces a separate compile-time hard ceiling + /// (`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`, currently 10) as a + /// safety net against misbehaving models — if you raise this + /// knob above the hard cap, bump the const in the same PR. /// Default `5`. #[serde(default = "default_multi_hop_max_sub_queries_per_iter")] pub multi_hop_max_sub_queries_per_iter: u32, diff --git a/crates/kebab-core/src/answer.rs b/crates/kebab-core/src/answer.rs index cb2c3ce..78afa91 100644 --- a/crates/kebab-core/src/answer.rs +++ b/crates/kebab-core/src/answer.rs @@ -79,8 +79,18 @@ pub struct HopRecord { /// the final iter is the synthesize call. pub iter: u32, pub kind: HopKind, - /// Sub-queries the LLM emitted at this iter. For the synthesize - /// hop this is empty (no sub-queries — just the final answer). + /// Sub-queries associated with this hop. The meaning depends on + /// `kind`: + /// + /// - [`HopKind::Decompose`]: the initial sub-queries the LLM + /// broke the original user query into. These drive the + /// `iter=1` retrieval round. + /// - [`HopKind::Decide`]: the *new* sub-queries the LLM + /// emitted to drive the next retrieval round. Empty when the + /// LLM signalled stop OR when `forced_stop = true` (cap hit + /// or parse-degraded). + /// - [`HopKind::Synthesize`]: always empty — the final hop + /// produces the user-visible answer, not more sub-queries. #[serde(default)] pub sub_queries: Vec, /// Number of *new* chunks the retrieval round contributed to the @@ -98,6 +108,13 @@ pub struct HopRecord { /// Wall-clock latency of the LLM call for this hop, in /// milliseconds. Useful for cost / latency analysis when a /// `kebab eval` run records `Answer.hops`. + /// + /// `0` is overloaded: it means "no LLM call happened at this + /// hop" when (a) the hop was a Decide skipped due to + /// `forced_stop` (depth-cap or pool-cap fired before the LLM + /// was asked) or (b) the pool was empty before any decide + /// could run. Treat `0` as "absent or instantaneous" rather + /// than as a genuine measurement. pub llm_call_ms: u32, } diff --git a/crates/kebab-rag/src/pipeline.rs b/crates/kebab-rag/src/pipeline.rs index 9aa2528..c689844 100644 --- a/crates/kebab-rag/src/pipeline.rs +++ b/crates/kebab-rag/src/pipeline.rs @@ -309,10 +309,10 @@ impl RagPipeline { // ── 2. Score gate ────────────────────────────────────────────────── if hits.is_empty() { - return self.refuse_no_chunks(query, &opts, k_effective, started); + return self.refuse_no_chunks(query, &opts, k_effective, started, None); } if top_score < self.config.rag.score_gate { - return self.refuse_score_gate(query, &opts, &hits, k_effective, started); + return self.refuse_score_gate(query, &opts, &hits, k_effective, started, None); } // ── 3. Pack context ──────────────────────────────────────────────── @@ -330,7 +330,7 @@ impl RagPipeline { "kb-rag: all retrieved chunks were unfetchable from the store; \ falling back to NoChunks refusal" ); - return self.refuse_no_chunks(query, &opts, k_effective, started); + return self.refuse_no_chunks(query, &opts, k_effective, started, None); } // ── 4. Render prompt ─────────────────────────────────────────────── @@ -697,6 +697,16 @@ impl RagPipeline { if forced_stop || pool.is_empty() { (Vec::new(), 0) } else { + // Snippet-based preview: each pool entry contributes + // its `SearchHit.snippet` (already truncated upstream + // by the retriever). `max_pool_chunks` acts as the + // implicit cap on this string's length — the loop + // breaks before we accumulate more pool entries. + // We intentionally do NOT route this through + // `pack_context` (no full chunk text fetch, no + // marker numbering): decide only needs gist to + // judge sufficiency, and full text is reserved for + // the terminal synthesize call. let preview = pool .iter() .enumerate() @@ -711,13 +721,14 @@ impl RagPipeline { depth_remaining, &opts, )?; - match decide_result { - Some(qs) if !qs.is_empty() => (qs, ms), - // Empty array OR parse failure → stop signal. - // Parse failure is NOT a refusal — graceful - // degrade to early synthesize (per spec §9). - _ => (Vec::new(), ms), - } + // `parse_decompose_response` post-condition: when + // it returns `Some(qs)`, `qs` is guaranteed + // non-empty (and trimmed + hard-capped). `None` + // covers both "parse failure" and "empty array + // after trim" — both mean stop. Parse failure is + // NOT a refusal here (spec §9 — graceful degrade + // to early synthesize on the decide hop only). + (decide_result.unwrap_or_default(), ms) }; hops.push(HopRecord { @@ -756,11 +767,27 @@ impl RagPipeline { let top_score = pool.first().map(|h| h.retrieval.fusion_score).unwrap_or(0.0); // ── 3. Score gate / no chunks ────────────────────────────────────── + // PR-3b-ii: forward the partial hop trace into the refusal so + // a `--multi-hop` user can still see which decompose / decide + // signals fired before the score-gate / no-chunks bailout. if pool.is_empty() { - return self.refuse_no_chunks(query, &opts, k_effective, started); + return self.refuse_no_chunks( + query, + &opts, + k_effective, + started, + Some(hops), + ); } if top_score < self.config.rag.score_gate { - return self.refuse_score_gate(query, &opts, &pool, k_effective, started); + return self.refuse_score_gate( + query, + &opts, + &pool, + k_effective, + started, + Some(hops), + ); } // ── 4. Pack context ──────────────────────────────────────────────── @@ -772,7 +799,13 @@ impl RagPipeline { pool_size = pool.len(), "kb-rag: multi-hop pool chunks all unfetchable; falling back to NoChunks" ); - return self.refuse_no_chunks(query, &opts, k_effective, started); + return self.refuse_no_chunks( + query, + &opts, + k_effective, + started, + Some(hops), + ); } // ── 5. Synthesize prompt ─────────────────────────────────────────── @@ -1262,12 +1295,20 @@ impl RagPipeline { /// Refusal path for empty hits — `RefusalReason::NoChunks`. No LLM /// call. The persisted row records `chunks_returned = 0`. + /// + /// `hops` is `None` on the single-pass path; the multi-hop path + /// (PR-3b-ii) forwards the partial hop trace accumulated up to + /// the refusal point so a `--multi-hop` user can still see which + /// decompose / decide signals fired before retrieval came up + /// empty. The trace is wire-additive (`Answer.hops` already + /// `skip_serializing_if = None`). fn refuse_no_chunks( &self, query: &str, opts: &AskOpts, k_effective: usize, started: std::time::Instant, + hops: Option>, ) -> Result { let trace_id = mint_trace_id(query, 0.0, &self.llm.model_ref().id); let elapsed_ms = u32::try_from(started.elapsed().as_millis()).unwrap_or(u32::MAX); @@ -1298,11 +1339,12 @@ impl RagPipeline { created_at: OffsetDateTime::now_utc(), conversation_id: opts.conversation_id.clone(), turn_index: opts.turn_index, - // p9-fb-41 Step 2 of PR-3: every Answer literal carries - // `hops`. Single-pass + refusal paths leave it `None`; - // only the multi-hop happy path will set `Some(...)` in - // Step 5 once the decide loop populates a hop trace. - hops: None, + // p9-fb-41 PR-3b-ii: single-pass callers pass `None`; + // `ask_multi_hop` forwards the partial hop trace it + // built up to the refusal point. Either way `Answer.hops` + // stays `skip_serializing_if = None`, so single-pass + // wire output is unchanged. + hops, }; if let Err(e) = self.docs.put_answer(&answer, query, None) { tracing::warn!(target: "kebab-rag", error = %e, "kb-rag: put_answer (NoChunks) failed"); @@ -1313,6 +1355,11 @@ impl RagPipeline { /// Refusal path for top-1 below the gate — `RefusalReason::ScoreGate`. /// No LLM call. Lists up to three near-miss candidates verbatim in /// `answer` so the user gets actionable context. + /// + /// `hops` is `None` on the single-pass path; the multi-hop path + /// (PR-3b-ii) forwards the partial hop trace accumulated before + /// the gate refusal. See [`Self::refuse_no_chunks`] for the + /// shared rationale. fn refuse_score_gate( &self, query: &str, @@ -1320,6 +1367,7 @@ impl RagPipeline { hits: &[SearchHit], k_effective: usize, started: std::time::Instant, + hops: Option>, ) -> Result { let top_score = hits[0].retrieval.fusion_score; let gate = self.config.rag.score_gate; @@ -1385,11 +1433,8 @@ impl RagPipeline { created_at: OffsetDateTime::now_utc(), conversation_id: opts.conversation_id.clone(), turn_index: opts.turn_index, - // p9-fb-41 Step 2 of PR-3: every Answer literal carries - // `hops`. Single-pass + refusal paths leave it `None`; - // only the multi-hop happy path will set `Some(...)` in - // Step 5 once the decide loop populates a hop trace. - hops: None, + // p9-fb-41 PR-3b-ii: see refuse_no_chunks' identical comment. + hops, }; if let Err(e) = self.docs.put_answer(&answer, query, None) { tracing::warn!(target: "kebab-rag", error = %e, "kb-rag: put_answer (ScoreGate) failed"); @@ -1443,11 +1488,27 @@ fn compute_stale( /// (design §9) can tell the two paths apart in `eval_runs.config_snapshot_json`. pub(crate) const PROMPT_TEMPLATE_VERSION_MULTI_HOP: &str = "rag-multi-hop-v1"; -/// Max sub-questions the decompose call may emit per iteration. PR-2 -/// ships with a fixed depth=2 pipeline (decompose once + synthesize), -/// so this cap acts on the single decompose call. PR-3 (dynamic iter) -/// will reuse the same cap for the per-iter decide call too. -pub(crate) const MULTI_HOP_MAX_SUB_QUERIES_DEFAULT: usize = 5; +/// Hard parse-side cap on how many sub-question strings +/// [`parse_decompose_response`] will accept from a single LLM response, +/// regardless of `RagCfg.multi_hop_max_sub_queries_per_iter`. +/// +/// This is intentionally a *defensive* compile-time cap, not the +/// user-tunable knob: +/// +/// - `RagCfg.multi_hop_max_sub_queries_per_iter` (default 5) is the +/// *prompt-side soft hint* — the value the pipeline injects into the +/// decompose / decide prompts so the LLM knows what to aim for. +/// Users can raise it via config / env. +/// - `MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP` (this const) is the +/// *parse-side hard ceiling* — a misbehaving model that emits 100 +/// sub-questions gets cropped here so the retrieve loop does not +/// spawn an unbounded number of search calls per iter. +/// +/// In practice this is generous (the soft hint is 5 by default, the +/// hard cap is 10) so user-tunable expansion to ~10 stays unaffected. +/// If a future config knob exceeds this number, raise the const in +/// the same PR. +pub(crate) const MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP: usize = 10; const MULTI_HOP_DECOMPOSE_SYSTEM_PROMPT: &str = "당신은 사용자의 질문을 다단계 검색에 필요한 sub-question 들로 분해하는 도구다.\n- multi-hop 정보가 필요한 경우 독립적으로 검색 가능한 sub-question 들로 분해한다.\n- 각 sub-question 은 자기 자신만으로 의미가 통해야 한다 (대명사 / \"위 답변\" 같은 reference 금지).\n- 원본이 이미 단순하면 원본 그대로 1 개만 반환한다.\n- 응답은 JSON array of strings 만 출력한다. 다른 prose / markdown fence / 설명 금지."; @@ -1590,13 +1651,21 @@ fn mint_trace_id(query: &str, top_score: f32, model_id: &str) -> TraceId { /// into a vector of sub-question strings. Strips a leading markdown /// code-fence (```json ... ```), then deserializes as a JSON array of /// strings, then trims each entry + drops empties + caps at -/// [`MULTI_HOP_MAX_SUB_QUERIES_DEFAULT`]. +/// [`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`]. /// /// Returns `None` when: /// - parse fails outright (not a JSON array of strings), /// - the array deserializes but is empty after trim/drop, /// -/// in which case the caller surfaces `RefusalReason::MultiHopDecomposeFailed`. +/// in which case the caller surfaces `RefusalReason::MultiHopDecomposeFailed` +/// (for the initial decompose hop) or treats the signal as +/// "synthesize now" (for the decide hop — see +/// [`RagPipeline::multi_hop_decide`]). +/// +/// `Some(non_empty)` is the only success shape: the post-conditions +/// guarantee at least one trimmed non-empty entry, capped at +/// [`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`]. Callers therefore do +/// not need a defensive `if !qs.is_empty()` guard. fn parse_decompose_response(raw: &str) -> Option> { let stripped = strip_markdown_json_fence(raw.trim()); let arr: Vec = serde_json::from_str(stripped).ok()?; @@ -1604,7 +1673,7 @@ fn parse_decompose_response(raw: &str) -> Option> { .into_iter() .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) - .take(MULTI_HOP_MAX_SUB_QUERIES_DEFAULT) + .take(MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP) .collect(); if cleaned.is_empty() { None @@ -1696,12 +1765,15 @@ mod tests { } #[test] - fn parse_decompose_response_caps_at_max_sub_queries() { - // 7 entries; cap is MULTI_HOP_MAX_SUB_QUERIES_DEFAULT (=5). - let raw = r#"["a","b","c","d","e","f","g"]"#; + fn parse_decompose_response_caps_at_hard_cap() { + // 12 entries; parse-side hard cap = 10. Pins the cap and the + // truncation order (first-N keep, tail drop) so a future + // refactor can't accidentally relax the safety ceiling or + // re-order the take/filter chain. + let raw = r#"["a","b","c","d","e","f","g","h","i","j","k","l"]"#; let out = parse_decompose_response(raw).unwrap(); - assert_eq!(out.len(), MULTI_HOP_MAX_SUB_QUERIES_DEFAULT); - assert_eq!(out, vec!["a", "b", "c", "d", "e"]); + assert_eq!(out.len(), MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP); + assert_eq!(out, vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]); } #[test] diff --git a/crates/kebab-rag/tests/common/mod.rs b/crates/kebab-rag/tests/common/mod.rs index d7d64b1..2efffdb 100644 --- a/crates/kebab-rag/tests/common/mod.rs +++ b/crates/kebab-rag/tests/common/mod.rs @@ -206,6 +206,47 @@ impl Retriever for MockRetriever { } } +/// p9-fb-41 PR-3b-ii: scripted retriever. Returns a different +/// `Vec` per `search` call from a pre-supplied sequence, +/// so a multi-hop test can simulate "iter 1 returns chunk A, iter 2 +/// returns chunks A+B" (pool dedup) or "different sub-queries hit +/// different docs". Exhaustion returns an empty `Vec` (no panic) — +/// the pipeline already handles "no hits this round" gracefully via +/// the dedup loop, and a panic would conflate "test forgot a row" +/// with "pipeline made an unexpected extra retrieval call". +/// +/// Use [`ScriptedRetriever::calls`] to assert the expected number +/// of retrievals occurred. +pub struct ScriptedRetriever { + hits_per_call: Vec>, + next: std::sync::atomic::AtomicUsize, +} + +impl ScriptedRetriever { + pub fn new(hits_per_call: Vec>) -> Self { + Self { + hits_per_call, + next: std::sync::atomic::AtomicUsize::new(0), + } + } + + pub fn calls(&self) -> usize { + self.next.load(std::sync::atomic::Ordering::SeqCst) + } +} + +impl Retriever for ScriptedRetriever { + fn search(&self, _q: &SearchQuery) -> anyhow::Result> { + let idx = self + .next + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(self.hits_per_call.get(idx).cloned().unwrap_or_default()) + } + fn index_version(&self) -> IndexVersion { + IndexVersion("test-iv".to_string()) + } +} + /// Pad a short prefix to the 32-hex shape `kebab_core` newtypes expect. pub fn id32(prefix: &str) -> String { let mut s = prefix.to_string(); @@ -215,3 +256,128 @@ pub fn id32(prefix: &str) -> String { s.truncate(32); s } + +/// p9-fb-41 PR-3b-ii: scripted language model. Returns a different +/// canned response per `generate_stream` call from a pre-supplied +/// `Vec`. Mirrors `MockLanguageModel`'s streaming contract +/// (one `TokenChunk::Token` per Unicode scalar, terminal `Done` with +/// `canned_usage`, stop-string truncation honoured) but lets a test +/// distinguish the decompose / per-iter decide / synthesize LLM calls +/// of `RagPipeline::ask_multi_hop` — each can return a different +/// payload (`["q1","q2"]`, `[]`, `"final answer [#1]"`, etc.). +/// +/// Internally `Arc> + AtomicUsize` so the type is +/// `Send + Sync` and can be wrapped in `Arc`. +/// Tests can read `calls()` for an assertion on the expected LLM +/// call count. +/// +/// Exhaustion (more calls than scripted responses) panics — tests +/// that need an "infinite" final response can supply a longer +/// script; the panic message names the call index so the test +/// failure points at the missing entry. +pub struct ScriptedLm { + model_id: String, + provider: String, + context_tokens: usize, + /// Canned responses in call order. Index `i` is returned on the + /// `i`-th `generate_stream` call (0-based). + responses: Vec, + /// 0-based index of the next response to return on `generate_stream`. + next: std::sync::atomic::AtomicUsize, + canned_finish: kebab_core::FinishReason, + canned_usage: kebab_core::TokenUsage, +} + +impl ScriptedLm { + /// Build a scripted LM with the default model_id/provider used by + /// the rest of the test suite (`mock-lm` / `mock`) and the + /// MockLanguageModel-equivalent canned usage. Call `with_*` + /// builders if a test needs to override the defaults. + pub fn new(responses: Vec<&str>) -> Self { + Self { + model_id: "mock-lm".to_string(), + provider: "mock".to_string(), + context_tokens: 32_768, + responses: responses.into_iter().map(str::to_string).collect(), + next: std::sync::atomic::AtomicUsize::new(0), + canned_finish: kebab_core::FinishReason::Stop, + canned_usage: kebab_core::TokenUsage { + prompt_tokens: 10, + completion_tokens: 5, + latency_ms: 7, + }, + } + } + + /// Total `generate_stream` invocations so far. Tests use this to + /// assert "exactly N LLM calls happened" without scanning the + /// HopRecord trace (the trace is the user-visible signal; this + /// is the lower-level call counter). + pub fn calls(&self) -> usize { + self.next.load(std::sync::atomic::Ordering::SeqCst) + } + + /// Earliest byte position of any non-empty stop string in + /// `canned`. Same precedence rule as `MockLanguageModel`: + /// `Iterator::min` returns the first equal element, so ties + /// break by `stop` declaration order. `str::find` returns a + /// UTF-8 char boundary by contract, so the resulting prefix + /// slice is sound. + fn apply_stop<'a>(canned: &'a str, stop: &[String]) -> (&'a str, bool) { + let earliest = stop + .iter() + .filter(|s| !s.is_empty()) + .filter_map(|s| canned.find(s.as_str())) + .min(); + match earliest { + Some(idx) => (&canned[..idx], true), + None => (canned, false), + } + } +} + +impl kebab_core::LanguageModel for ScriptedLm { + fn model_ref(&self) -> kebab_core::ModelRef { + kebab_core::ModelRef { + id: self.model_id.clone(), + provider: self.provider.clone(), + dimensions: None, + } + } + + fn context_tokens(&self) -> usize { + self.context_tokens + } + + fn generate_stream( + &self, + req: kebab_core::GenerateRequest, + ) -> anyhow::Result< + Box> + Send>, + > { + let idx = self + .next + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let canned = self.responses.get(idx).unwrap_or_else(|| { + panic!( + "ScriptedLm exhausted: call #{idx} requested but only {} responses scripted", + self.responses.len() + ) + }); + let (truncated, stop_hit) = Self::apply_stop(canned, &req.stop); + let mut chunks: Vec = truncated + .chars() + .map(|c| kebab_core::TokenChunk::Token(c.to_string())) + .collect(); + let finish_reason = if stop_hit { + kebab_core::FinishReason::Stop + } else { + self.canned_finish.clone() + }; + chunks.push(kebab_core::TokenChunk::Done { + finish_reason, + usage: self.canned_usage.clone(), + }); + Ok(Box::new(chunks.into_iter().map(Ok))) + } +} diff --git a/crates/kebab-rag/tests/multi_hop.rs b/crates/kebab-rag/tests/multi_hop.rs new file mode 100644 index 0000000..687f3cf --- /dev/null +++ b/crates/kebab-rag/tests/multi_hop.rs @@ -0,0 +1,348 @@ +//! p9-fb-41 PR-3b-ii: integration tests for the dynamic multi-hop +//! decide loop in [`RagPipeline::ask_multi_hop`]. +//! +//! Each test uses [`ScriptedLm`] to drive a different LLM response +//! per call (decompose → 0..N decide → synthesize) and, where the +//! scenario requires, [`ScriptedRetriever`] to drive different hit +//! lists per retrieval round. The test fixture stays mock-only — +//! no Ollama / fastembed / LanceDB. +//! +//! Coverage: +//! +//! 1. `decide_stop_triggers_synthesize` — decide returns `[]`, +//! pipeline transitions straight to synthesize. +//! 2. `decide_continue_adds_more_chunks` — decide returns +//! `["q2"]`, iter 2 retrieves and grows the pool. +//! 3. `max_depth_force_stops` — `multi_hop_max_depth = 1` forces +//! `forced_stop = true` on the depth-1 decide hop and skips the +//! decide LLM call. +//! 4. `pool_chunks_dedup_by_chunk_id` — two sub-queries return the +//! same chunk; pool dedups by `chunk_id`. +//! 5. `decide_parse_failure_falls_through_to_synthesize` — decide +//! LLM emits non-JSON garbage; pipeline graceful-degrades to +//! synthesize (NOT a refusal). + +mod common; + +use std::sync::Arc; + +use common::{RagEnv, ScriptedLm, ScriptedRetriever, id32, mk_hit}; +use kebab_core::{HopKind, LanguageModel, RefusalReason, Retriever, SearchMode}; +use kebab_rag::{AskOpts, RagPipeline}; + +/// Default `AskOpts` for multi-hop tests: deterministic seed, +/// lexical mode (so the test crate doesn't need to wire up an +/// embedder), and `multi_hop: true` to route through +/// `ask_multi_hop`. +fn multi_hop_opts() -> AskOpts { + AskOpts { + k: 5, + explain: false, + mode: SearchMode::Lexical, + temperature: Some(0.0), + seed: Some(0), + stream_sink: None, + history: Vec::new(), + conversation_id: None, + turn_index: None, + multi_hop: true, + } +} + +// ── 1. decide returns [] → synthesize immediately ───────────────────────── + +#[test] +fn multi_hop_decide_stop_triggers_synthesize() { + let env = RagEnv::new(); + let cid = id32("c1"); + let did = id32("d1"); + env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]); + let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])]; + let retriever = Arc::new(ScriptedRetriever::new(vec![hits])); + let retriever_handle = retriever.clone(); + let retriever_dyn: Arc = retriever; + + // Three LLM calls in order: decompose → decide → synthesize. + let lm = Arc::new(ScriptedLm::new(vec![ + r#"["q1"]"#, + r#"[]"#, + "answer body [#1]", + ])); + let lm_handle = lm.clone(); + let lm_dyn: Arc = lm; + let pipeline = + RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone()); + + let answer = pipeline.ask("compound", multi_hop_opts()).unwrap(); + + assert!(answer.grounded, "decide-stop synthesize must be grounded"); + assert_eq!(answer.refusal_reason, None); + assert_eq!( + lm_handle.calls(), + 3, + "decompose + decide + synthesize = 3 LLM calls" + ); + assert_eq!( + retriever_handle.calls(), + 1, + "single sub-query → single retrieval" + ); + + let hops = answer.hops.expect("multi-hop happy path stamps Some(hops)"); + assert_eq!(hops.len(), 3, "[Decompose, Decide(stop), Synthesize]"); + assert_eq!(hops[0].kind, HopKind::Decompose); + assert_eq!(hops[0].sub_queries, vec!["q1"]); + assert_eq!(hops[1].kind, HopKind::Decide); + assert!( + hops[1].sub_queries.is_empty(), + "decide stop signal → empty sub_queries on the HopRecord" + ); + assert!( + !hops[1].forced_stop, + "LLM stop signal is NOT a forced_stop (forced_stop = cap-driven only)" + ); + assert_eq!(hops[2].kind, HopKind::Synthesize); +} + +// ── 2. decide ["q2"] → iter 2 retrieves and grows the pool ──────────────── + +#[test] +fn multi_hop_decide_continue_adds_more_chunks() { + let env = RagEnv::new(); + let cid1 = id32("c1"); + let did1 = id32("d1"); + let cid2 = id32("c2"); + let did2 = id32("d2"); + env.seed_chunk(&cid1, &did1, "notes/a.md", "Chunk one.", &["A"]); + env.seed_chunk(&cid2, &did2, "notes/b.md", "Chunk two.", &["B"]); + // iter 1 retrieves chunk 1; iter 2 retrieves chunk 2 (different + // chunk_id → pool grows). + let retriever = Arc::new(ScriptedRetriever::new(vec![ + vec![mk_hit(1, &cid1, &did1, "notes/a.md", 0.85, &["A"])], + vec![mk_hit(1, &cid2, &did2, "notes/b.md", 0.80, &["B"])], + ])); + let retriever_handle = retriever.clone(); + let retriever_dyn: Arc = retriever; + + let lm = Arc::new(ScriptedLm::new(vec![ + r#"["q1"]"#, + r#"["q2"]"#, + r#"[]"#, + "synthesized [#1] [#2]", + ])); + let lm_handle = lm.clone(); + let lm_dyn: Arc = lm; + let pipeline = + RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone()); + + let answer = pipeline.ask("compound", multi_hop_opts()).unwrap(); + + assert!(answer.grounded); + assert_eq!(answer.refusal_reason, None); + assert_eq!( + lm_handle.calls(), + 4, + "decompose + 2 decide + synthesize = 4 LLM calls" + ); + assert_eq!( + retriever_handle.calls(), + 2, + "iter 1 retrieves q1, iter 2 retrieves q2" + ); + assert_eq!( + answer.retrieval.chunks_returned, 2, + "pool accumulates one new chunk per iter" + ); + + let hops = answer.hops.expect("happy path stamps hops"); + assert_eq!(hops.len(), 4, "[Decompose, Decide(continue), Decide(stop), Synthesize]"); + assert_eq!(hops[0].kind, HopKind::Decompose); + assert_eq!(hops[1].kind, HopKind::Decide); + assert_eq!(hops[1].sub_queries, vec!["q2"], "iter 1 decide emits q2"); + assert_eq!( + hops[1].context_chunks_added, 1, + "iter 1 retrieve added chunk 1" + ); + assert_eq!(hops[2].kind, HopKind::Decide); + assert!(hops[2].sub_queries.is_empty(), "iter 2 decide signals stop"); + assert_eq!( + hops[2].context_chunks_added, 1, + "iter 2 retrieve added chunk 2" + ); + assert_eq!(hops[3].kind, HopKind::Synthesize); +} + +// ── 3. max_depth=1 → forced_stop, decide LLM call skipped ───────────────── + +#[test] +fn multi_hop_max_depth_force_stops() { + let env = RagEnv::new(); + let cid = id32("c1"); + let did = id32("d1"); + env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]); + let mut cfg = env.config.clone(); + // depth 1 means: iter 1 is the last iter, so the per-iter + // `depth_force_stop = iter >= max_depth` fires and the decide + // LLM call is skipped entirely. + cfg.rag.multi_hop_max_depth = 1; + + let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])]; + let retriever = Arc::new(ScriptedRetriever::new(vec![hits])); + let retriever_handle = retriever.clone(); + let retriever_dyn: Arc = retriever; + + // Only 2 LLM calls scripted — decompose + synthesize. If the + // pipeline tries to call decide (a bug), ScriptedLm panics on + // exhaustion and the test fails loudly with the call index. + let lm = Arc::new(ScriptedLm::new(vec![ + r#"["q1"]"#, + "answer [#1]", + ])); + let lm_handle = lm.clone(); + let lm_dyn: Arc = lm; + let pipeline = RagPipeline::new(cfg, retriever_dyn, lm_dyn, env.sqlite.clone()); + + let answer = pipeline.ask("q", multi_hop_opts()).unwrap(); + + assert!(answer.grounded); + assert_eq!( + lm_handle.calls(), + 2, + "depth-cap skips decide → only decompose + synthesize" + ); + assert_eq!(retriever_handle.calls(), 1); + + let hops = answer.hops.expect("happy path stamps hops"); + assert_eq!(hops.len(), 3, "[Decompose, Decide(forced_stop), Synthesize]"); + assert_eq!(hops[1].kind, HopKind::Decide); + assert!( + hops[1].forced_stop, + "depth cap must surface forced_stop=true on the Decide hop" + ); + assert!( + hops[1].sub_queries.is_empty(), + "skipped decide carries no sub_queries" + ); + assert_eq!( + hops[1].llm_call_ms, 0, + "skipped decide records 0ms — no LLM call happened" + ); +} + +// ── 4. dedup: two sub-queries hit same chunk_id, pool keeps 1 ───────────── + +#[test] +fn multi_hop_pool_chunks_dedup_by_chunk_id() { + let env = RagEnv::new(); + let cid = id32("c1"); + let did = id32("d1"); + env.seed_chunk(&cid, &did, "notes/a.md", "Shared chunk text.", &["X"]); + // Both sub-queries retrieve the same chunk_id — dedup must + // keep exactly one pool entry. + let shared_hit = mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["X"]); + let retriever = Arc::new(ScriptedRetriever::new(vec![ + vec![shared_hit.clone()], + vec![shared_hit], + ])); + let retriever_handle = retriever.clone(); + let retriever_dyn: Arc = retriever; + + let lm = Arc::new(ScriptedLm::new(vec![ + r#"["q1", "q2"]"#, + r#"[]"#, + "merged answer [#1]", + ])); + let lm_handle = lm.clone(); + let lm_dyn: Arc = lm; + let pipeline = + RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone()); + + let answer = pipeline.ask("q", multi_hop_opts()).unwrap(); + + assert!(answer.grounded); + assert_eq!( + retriever_handle.calls(), + 2, + "two sub-queries → two retrieval calls" + ); + assert_eq!( + answer.retrieval.chunks_returned, 1, + "dedup by chunk_id keeps pool at 1" + ); + assert_eq!(answer.citations.len(), 1, "only one chunk cited as [#1]"); + assert_eq!(answer.citations[0].marker.as_deref(), Some("[1]")); + assert_eq!( + lm_handle.calls(), + 3, + "decompose + decide + synthesize = 3" + ); + + let hops = answer.hops.expect("happy path stamps hops"); + assert_eq!(hops.len(), 3, "[Decompose, Decide, Synthesize]"); + assert_eq!(hops[0].sub_queries, vec!["q1", "q2"]); + assert_eq!( + hops[1].context_chunks_added, 1, + "dedup reduces 2 retrievals → 1 new pool entry" + ); +} + +// ── 5. decide parse failure → graceful synthesize (NOT a refusal) ───────── + +#[test] +fn multi_hop_decide_parse_failure_falls_through_to_synthesize() { + let env = RagEnv::new(); + let cid = id32("c1"); + let did = id32("d1"); + env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]); + let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])]; + let retriever = Arc::new(ScriptedRetriever::new(vec![hits])); + let retriever_dyn: Arc = retriever; + + // Decide LLM emits non-JSON garbage. Spec §9: this is NOT a + // refusal — pipeline graceful-degrades to synthesize as if the + // decide had returned `[]`. Only the *initial* decompose's + // parse failure is a refusal (MultiHopDecomposeFailed). + let lm = Arc::new(ScriptedLm::new(vec![ + r#"["q1"]"#, + "definitely not a JSON array", + "answer [#1]", + ])); + let lm_handle = lm.clone(); + let lm_dyn: Arc = lm; + let pipeline = + RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone()); + + let answer = pipeline.ask("q", multi_hop_opts()).unwrap(); + + assert!( + answer.grounded, + "decide parse failure must NOT block synthesis" + ); + assert_eq!( + answer.refusal_reason, None, + "decide parse failure is graceful degrade, not refusal" + ); + assert_ne!( + answer.refusal_reason, + Some(RefusalReason::MultiHopDecomposeFailed), + "MultiHopDecomposeFailed is reserved for the initial decompose hop" + ); + assert_eq!( + lm_handle.calls(), + 3, + "decompose + (garbage) decide + synthesize" + ); + + let hops = answer.hops.expect("happy path stamps hops"); + assert_eq!(hops.len(), 3, "[Decompose, Decide(parse-fail→stop), Synthesize]"); + assert_eq!(hops[1].kind, HopKind::Decide); + assert!( + hops[1].sub_queries.is_empty(), + "parse failure → empty sub_queries (same shape as LLM stop)" + ); + assert!( + !hops[1].forced_stop, + "parse-degraded decide is not a cap-driven forced_stop — \ + flag stays false even though we synthesize early" + ); +}